33Provides normalization and helper functions for job status tracking.
44"""
55
6+ from comfy_api .internal import prune_dict
7+
68
79class JobStatus :
810 """Job status constants."""
@@ -54,18 +56,14 @@ def normalize_queue_item(item, status):
5456 extra_pnginfo = extra_data .get ('extra_pnginfo' ) or {}
5557 workflow_id = extra_pnginfo .get ('workflow' , {}).get ('id' )
5658
57- return {
59+ return prune_dict ( {
5860 'id' : prompt_id ,
5961 'status' : status ,
6062 'priority' : priority ,
6163 'create_time' : create_time ,
62- 'execution_error' : None ,
63- 'execution_start_time' : None ,
64- 'execution_end_time' : None ,
6564 'outputs_count' : 0 ,
66- 'preview_output' : None ,
6765 'workflow_id' : workflow_id ,
68- }
66+ })
6967
7068
7169def normalize_history_item (prompt_id , history_item , include_outputs = False ):
@@ -104,18 +102,18 @@ def normalize_history_item(prompt_id, history_item, include_outputs=False):
104102 if event_name == 'execution_error' :
105103 execution_error = event_data
106104
107- job = {
105+ job = prune_dict ( {
108106 'id' : prompt_id ,
109107 'status' : status ,
110108 'priority' : priority ,
111109 'create_time' : create_time ,
112- 'execution_error' : execution_error ,
113110 'execution_start_time' : execution_start_time ,
114111 'execution_end_time' : execution_end_time ,
112+ 'execution_error' : execution_error ,
115113 'outputs_count' : outputs_count ,
116114 'preview_output' : preview_output ,
117115 'workflow_id' : workflow_id ,
118- }
116+ })
119117
120118 if include_outputs :
121119 job ['outputs' ] = outputs
@@ -210,7 +208,7 @@ def get_job(prompt_id, running, queued, history):
210208 return None
211209
212210
213- def get_all_jobs (running , queued , history , status_filter = None , sort_by = "created_at" , sort_order = "desc" , limit = None , offset = 0 ):
211+ def get_all_jobs (running , queued , history , status_filter = None , workflow_id = None , sort_by = "created_at" , sort_order = "desc" , limit = None , offset = 0 ):
214212 """
215213 Get all jobs (running, pending, completed) with filtering and sorting.
216214
@@ -219,6 +217,7 @@ def get_all_jobs(running, queued, history, status_filter=None, sort_by="created_
219217 queued: List of pending queue items
220218 history: Dict of history items keyed by prompt_id
221219 status_filter: List of statuses to include (from JobStatus.ALL)
220+ workflow_id: Filter by workflow ID
222221 sort_by: Field to sort by ('created_at', 'execution_duration')
223222 sort_order: 'asc' or 'desc'
224223 limit: Maximum number of items to return
@@ -248,6 +247,9 @@ def get_all_jobs(running, queued, history, status_filter=None, sort_by="created_
248247 if (is_failed and include_failed ) or (not is_failed and include_completed ):
249248 jobs .append (normalize_history_item (prompt_id , history_item ))
250249
250+ if workflow_id :
251+ jobs = [j for j in jobs if j .get ('workflow_id' ) == workflow_id ]
252+
251253 jobs = apply_sorting (jobs , sort_by , sort_order )
252254
253255 total_count = len (jobs )
0 commit comments