Add logging for job assignment in cluster master

parent df7dfa12
......@@ -1004,10 +1004,14 @@ class ClusterMaster:
jobs = cursor.fetchall()
conn.close()
if jobs:
print(f"Found {len(jobs)} processing job(s) without assigned job_id")
for job_row in jobs:
job = dict(job_row)
job['data'] = json.loads(job['data']) if job['data'] else None
process_type = 'analysis' if job['request_type'] == 'analyze' else ('training' if job['request_type'] == 'train' else job['request_type'])
print(f"Attempting to assign job {job['id']} for {process_type}")
worker_key = self.select_worker_for_job(process_type, job['data'].get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct'), job['data'])
if worker_key:
job_id = self.assign_job_to_worker(worker_key, job['data'])
......@@ -1015,6 +1019,10 @@ class ClusterMaster:
from .database import update_queue_status
update_queue_status(job['id'], 'processing', {'job_id': job_id, 'status': 'Assigned to worker'}, job_id=job_id)
print(f"Assigned job {job['id']} to worker {worker_key} with job_id {job_id}")
else:
print(f"Failed to assign job {job['id']} to worker {worker_key}")
else:
print(f"No suitable worker found for job {job['id']}")
await asyncio.sleep(5) # Poll every 5 seconds
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment