Skip to content

Commit c8a1d2e

Browse files
committed
refactor query params to top get_job(s) doc, add remove_sensitive_from_queue
1 parent ed61899 commit c8a1d2e

File tree

2 files changed

+42
-15
lines changed

2 files changed

+42
-15
lines changed

comfy_execution/jobs.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,11 @@ def is_previewable(media_type: str, item: dict) -> bool:
6464

6565

6666
def normalize_queue_item(item: tuple, status: str) -> dict:
67-
"""Convert queue item tuple to unified job dict."""
68-
priority, prompt_id, _, extra_data, _ = item[:5]
67+
"""Convert queue item tuple to unified job dict.
68+
69+
Expects item with sensitive data already removed (5 elements).
70+
"""
71+
priority, prompt_id, _, extra_data, _ = item
6972
create_time, workflow_id = _extract_job_metadata(extra_data)
7073

7174
return prune_dict({
@@ -79,9 +82,12 @@ def normalize_queue_item(item: tuple, status: str) -> dict:
7982

8083

8184
def normalize_history_item(prompt_id: str, history_item: dict, include_outputs: bool = False) -> dict:
82-
"""Convert history item dict to unified job dict."""
85+
"""Convert history item dict to unified job dict.
86+
87+
History items have sensitive data already removed (prompt tuple has 5 elements).
88+
"""
8389
prompt_tuple = history_item['prompt']
84-
priority, _, prompt, extra_data, _ = prompt_tuple[:5]
90+
priority, _, prompt, extra_data, _ = prompt_tuple
8591
create_time, workflow_id = _extract_job_metadata(extra_data)
8692

8793
status_info = history_item.get('status', {})

server.py

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@
4848
if args.enable_manager:
4949
import comfyui_manager
5050

51+
52+
def _remove_sensitive_from_queue(queue: list) -> list:
53+
"""Remove sensitive data (index 5) from queue item tuples."""
54+
return [item[:5] for item in queue]
55+
56+
5157
async def send_socket_catch_exception(function, message):
5258
try:
5359
await function(message)
@@ -697,36 +703,48 @@ async def get_object_info_node(request):
697703

698704
@routes.get("/api/jobs")
699705
async def get_jobs(request):
700-
"""List all jobs with filtering, sorting, and pagination."""
706+
"""List all jobs with filtering, sorting, and pagination.
707+
708+
Query parameters:
709+
status: Filter by status (comma-separated): pending, in_progress, completed, failed
710+
workflow_id: Filter by workflow ID
711+
sort_by: Sort field: created_at (default), execution_duration
712+
sort_order: Sort direction: asc, desc (default)
713+
limit: Max items to return (positive integer)
714+
offset: Items to skip (non-negative integer, default 0)
715+
"""
701716
query = request.rel_url.query
702717

703-
status_param = query.get("status", None)
718+
status_param = query.get('status')
719+
workflow_id = query.get('workflow_id')
720+
sort_by = query.get('sort_by', 'created_at').lower()
721+
sort_order = query.get('sort_order', 'desc').lower()
722+
704723
status_filter = None
705724
if status_param:
706725
status_filter = [s.strip().lower() for s in status_param.split(',') if s.strip()]
707-
valid_statuses = set(JobStatus.ALL)
708-
invalid_statuses = [s for s in status_filter if s not in valid_statuses]
726+
invalid_statuses = [s for s in status_filter if s not in JobStatus.ALL]
709727
if invalid_statuses:
710728
return web.json_response(
711729
{"error": f"Invalid status value(s): {', '.join(invalid_statuses)}. Valid values: {', '.join(JobStatus.ALL)}"},
712730
status=400
713731
)
714732

715-
sort_by = query.get('sort_by', 'created_at').lower()
716733
if sort_by not in {'created_at', 'execution_duration'}:
717734
return web.json_response(
718735
{"error": "sort_by must be 'created_at' or 'execution_duration'"},
719736
status=400
720737
)
721738

722-
sort_order = query.get('sort_order', 'desc').lower()
723739
if sort_order not in {'asc', 'desc'}:
724740
return web.json_response(
725741
{"error": "sort_order must be 'asc' or 'desc'"},
726742
status=400
727743
)
728744

729745
limit = None
746+
747+
# If limit is provided, validate that it is a positive integer, else continue without a limit
730748
if 'limit' in query:
731749
try:
732750
limit = int(query.get('limit'))
@@ -753,11 +771,12 @@ async def get_jobs(request):
753771
status=400
754772
)
755773

756-
workflow_id = query.get('workflow_id', None)
757-
758774
running, queued = self.prompt_queue.get_current_queue_volatile()
759775
history = self.prompt_queue.get_history()
760776

777+
running = _remove_sensitive_from_queue(running)
778+
queued = _remove_sensitive_from_queue(queued)
779+
761780
jobs, total = get_all_jobs(
762781
running, queued, history,
763782
status_filter=status_filter,
@@ -793,6 +812,9 @@ async def get_job_by_id(request):
793812
running, queued = self.prompt_queue.get_current_queue_volatile()
794813
history = self.prompt_queue.get_history(prompt_id=job_id)
795814

815+
running = _remove_sensitive_from_queue(running)
816+
queued = _remove_sensitive_from_queue(queued)
817+
796818
job = get_job(job_id, running, queued, history)
797819
if job is None:
798820
return web.json_response(
@@ -825,9 +847,8 @@ async def get_history_prompt_id(request):
825847
async def get_queue(request):
826848
queue_info = {}
827849
current_queue = self.prompt_queue.get_current_queue_volatile()
828-
remove_sensitive = lambda queue: [x[:5] for x in queue]
829-
queue_info['queue_running'] = remove_sensitive(current_queue[0])
830-
queue_info['queue_pending'] = remove_sensitive(current_queue[1])
850+
queue_info['queue_running'] = _remove_sensitive_from_queue(current_queue[0])
851+
queue_info['queue_pending'] = _remove_sensitive_from_queue(current_queue[1])
831852
return web.json_response(queue_info)
832853

833854
@routes.post("/prompt")

0 commit comments

Comments
 (0)