Fix worker cancellation checking

- Pass queue_id to workers for proper cancellation detection
- Workers now check correct job id for cancellation status
- Workers receive effective stop commands via database polling
parent b2f2786e
......@@ -528,6 +528,9 @@ class ClusterMaster:
'start_time': time.time(),
'job_data': job_data
}
# Add queue_id to job data for worker cancellation checking
job_data['queue_id'] = queue_id
self.worker_jobs[worker_key].append(job_id)
self.worker_vram_usage[worker_key] += vram_required
......@@ -596,6 +599,9 @@ class ClusterMaster:
'start_time': time.time(),
'job_data': job_data
}
# Add queue_id to job data for worker cancellation checking
job_data['queue_id'] = queue_id
self.worker_jobs[worker_key].append(job_id)
self.worker_vram_usage[worker_key] += vram_required
......
......@@ -185,7 +185,7 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None):
for i, (frame_path, ts) in enumerate(frames):
# Check for cancellation
if job_id and check_job_cancelled(job_id):
if queue_id and check_job_cancelled(queue_id):
# Clean up and return cancelled message
for fp, _ in frames[i:]:
try:
......@@ -209,7 +209,7 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None):
shutil.rmtree(output_dir)
# Check for cancellation before summary
if job_id and check_job_cancelled(job_id):
if queue_id and check_job_cancelled(queue_id):
return "Job cancelled by user"
# Generate summary
......@@ -226,7 +226,7 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id=None):
return result
else:
# Check for cancellation before processing image
if job_id and check_job_cancelled(job_id):
if queue_id and check_job_cancelled(queue_id):
return "Job cancelled by user"
result = analyze_single_image(media_path, full_prompt, model)
......@@ -262,7 +262,7 @@ def worker_process(backend_type: str):
prompt = data.get('prompt', 'Describe this image.')
model_path = data.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
interval = data.get('interval', 10)
job_id = data.get('job_id') # Extract job_id for cancellation checking
queue_id = data.get('queue_id') # Extract queue_id for cancellation checking
print(f"DEBUG: Starting analysis of {media_path} with model {model_path} for job {message.msg_id}")
result = analyze_media(media_path, prompt, model_path, interval, job_id)
print(f"DEBUG: Analysis completed for job {message.msg_id}")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment