Fix job cancellation cleanup

- Store queue_id in active_jobs tracking
- Properly detect cancelled jobs by checking queue status
- Clean up worker resources when jobs are cancelled
- Workers become available for new jobs after cancellation
parent 83825820
......@@ -499,7 +499,7 @@ class ClusterMaster:
return max(1, cuda_count + rocm_count)
async def assign_job_to_worker(self, worker_key: str, job_data: dict) -> Optional[str]:
async def assign_job_to_worker(self, worker_key: str, job_data: dict, queue_id: int = None) -> Optional[str]:
"""Assign a job to a worker and handle file/model transfer."""
from .models import estimate_model_vram_requirements
import uuid
......@@ -522,6 +522,7 @@ class ClusterMaster:
self.active_jobs[job_id] = {
'worker_key': worker_key,
'client_id': client_id,
'queue_id': queue_id,
'model_path': model_path,
'vram_required': vram_required,
'start_time': time.time(),
......@@ -589,6 +590,7 @@ class ClusterMaster:
self.active_jobs[job_id] = {
'worker_key': worker_key,
'client_id': "local",
'queue_id': queue_id,
'model_path': model_path,
'vram_required': vram_required,
'start_time': time.time(),
......@@ -1330,7 +1332,7 @@ class ClusterMaster:
print(f"Job {job['id']} waiting for available workers")
worker_key = self.select_worker_for_job(process_type, job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct'), job['data'])
if worker_key:
job_id = await self.assign_job_to_worker(worker_key, job['data'])
job_id = await self.assign_job_to_worker(worker_key, job['data'], queue_id=job['id'])
if job_id:
update_queue_status(job['id'], 'processing', {'job_id': job_id, 'status': 'Assigned to worker'}, job_id=job_id)
print(f"Assigned job {job['id']} to worker {worker_key} with job_id {job_id}")
......@@ -1345,7 +1347,7 @@ class ClusterMaster:
cancelled_jobs = []
for job_id, job_info in self.active_jobs.items():
from .database import get_queue_status
queue_status = get_queue_status(job_info.get('job_id'))
queue_status = get_queue_status(job_info.get('queue_id'))
if queue_status and queue_status['status'] == 'cancelled':
cancelled_jobs.append(job_id)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment