Fix job restart flow

- Restarted jobs set to 'queued' status
- Cluster master looks for 'queued' jobs, sets to 'processing', then assigns
- Proper job lifecycle: queued -> processing -> assigned -> completed/failed
parent 21d81a1c
...@@ -1292,7 +1292,7 @@ class ClusterMaster: ...@@ -1292,7 +1292,7 @@ class ClusterMaster:
from .database import get_db_connection from .database import get_db_connection
conn = get_db_connection() conn = get_db_connection()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute('SELECT * FROM processing_queue WHERE status = ? AND (job_id IS NULL OR job_id = ?)', ('processing', '')) cursor.execute('SELECT * FROM processing_queue WHERE status = ?', ('queued',))
jobs = cursor.fetchall() jobs = cursor.fetchall()
conn.close() conn.close()
...@@ -1305,6 +1305,10 @@ class ClusterMaster: ...@@ -1305,6 +1305,10 @@ class ClusterMaster:
for job_row in jobs: for job_row in jobs:
job = dict(job_row) job = dict(job_row)
# Set status to processing before assigning
from .database import update_queue_status
update_queue_status(job['id'], 'processing')
job['data'] = json.loads(job['data']) if job['data'] else None job['data'] = json.loads(job['data']) if job['data'] else None
process_type = 'analysis' if job['request_type'] == 'analyze' else ('training' if job['request_type'] == 'train' else job['request_type']) process_type = 'analysis' if job['request_type'] == 'analyze' else ('training' if job['request_type'] == 'train' else job['request_type'])
if should_print_status: if should_print_status:
...@@ -1313,16 +1317,13 @@ class ClusterMaster: ...@@ -1313,16 +1317,13 @@ class ClusterMaster:
if worker_key: if worker_key:
job_id = await self.assign_job_to_worker(worker_key, job['data']) job_id = await self.assign_job_to_worker(worker_key, job['data'])
if job_id: if job_id:
from .database import update_queue_status
update_queue_status(job['id'], 'processing', {'job_id': job_id, 'status': 'Assigned to worker'}, job_id=job_id) update_queue_status(job['id'], 'processing', {'job_id': job_id, 'status': 'Assigned to worker'}, job_id=job_id)
print(f"Assigned job {job['id']} to worker {worker_key} with job_id {job_id}") print(f"Assigned job {job['id']} to worker {worker_key} with job_id {job_id}")
else: else:
print(f"Failed to assign job {job['id']} to worker {worker_key}, re-queuing") print(f"Failed to assign job {job['id']} to worker {worker_key}, re-queuing")
from .database import update_queue_status
update_queue_status(job['id'], 'queued', error='Failed to assign to worker, re-queued') update_queue_status(job['id'], 'queued', error='Failed to assign to worker, re-queued')
else: else:
print(f"No suitable worker found for job {job['id']}, re-queuing") print(f"No suitable worker found for job {job['id']}, re-queuing")
from .database import update_queue_status
update_queue_status(job['id'], 'queued', error='No suitable worker found, re-queued') update_queue_status(job['id'], 'queued', error='No suitable worker found, re-queued')
await asyncio.sleep(5) # Poll every 5 seconds await asyncio.sleep(5) # Poll every 5 seconds
......
...@@ -95,8 +95,8 @@ class QueueManager: ...@@ -95,8 +95,8 @@ class QueueManager:
if job['status'] != 'cancelled': if job['status'] != 'cancelled':
return False return False
# Reset retry_count and update status to processing (so cluster master picks it up) # Reset retry_count and update status to queued (so cluster master picks it up)
update_queue_status(queue_id, 'processing', retry_count=0, job_id='') update_queue_status(queue_id, 'queued', retry_count=0)
return True return True
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment