Add job cancellation communication

- Cluster master sends cancel_job messages to backend/client when jobs are cancelled
- Add _handle_cancel_job to process cancel confirmations from clients
- Workers can be notified faster to stop processing and free resources
parent 15dd9ddc
...@@ -256,6 +256,9 @@ class ClusterMaster: ...@@ -256,6 +256,9 @@ class ClusterMaster:
elif msg_type == 'job_file_request': elif msg_type == 'job_file_request':
await self._handle_job_file_request(message) await self._handle_job_file_request(message)
elif msg_type == 'cancel_job':
await self._handle_cancel_job(message)
return {'type': 'error', 'message': 'Unknown message type'} return {'type': 'error', 'message': 'Unknown message type'}
def _handle_auth(self, message: Dict[str, Any], websocket: websockets.WebSocketServerProtocol) -> Dict[str, Any]: def _handle_auth(self, message: Dict[str, Any], websocket: websockets.WebSocketServerProtocol) -> Dict[str, Any]:
...@@ -843,6 +846,18 @@ class ClusterMaster: ...@@ -843,6 +846,18 @@ class ClusterMaster:
# TODO: Store result for retrieval by web interface # TODO: Store result for retrieval by web interface
# For now, this would need integration with the queue system # For now, this would need integration with the queue system
async def _handle_cancel_job(self, message: Dict[str, Any]) -> None:
"""Handle cancel job request from client."""
client_id = self._get_client_by_websocket(message.get('_websocket'))
job_id = message.get('job_id')
if client_id and job_id:
print(f"Client {client_id} requested to cancel job {job_id}")
# The job status is already set to cancelled in the database
# Clean up any active tracking
if job_id in self.active_jobs:
self.complete_job(job_id)
async def _handle_job_file_request(self, message: Dict[str, Any]) -> None: async def _handle_job_file_request(self, message: Dict[str, Any]) -> None:
"""Handle request from worker to send result files back.""" """Handle request from worker to send result files back."""
client_id = self._get_client_by_websocket(message.get('_websocket')) client_id = self._get_client_by_websocket(message.get('_websocket'))
...@@ -1326,6 +1341,17 @@ class ClusterMaster: ...@@ -1326,6 +1341,17 @@ class ClusterMaster:
print(f"No suitable worker found for job {job['id']}, re-queuing") print(f"No suitable worker found for job {job['id']}, re-queuing")
update_queue_status(job['id'], 'queued', error='No suitable worker found, re-queued') update_queue_status(job['id'], 'queued', error='No suitable worker found, re-queued')
# Check for cancelled jobs that need to be stopped
cancelled_jobs = []
for job_id, job_info in self.active_jobs.items():
from .database import get_queue_status
queue_status = get_queue_status(job_info.get('job_id'))
if queue_status and queue_status['status'] == 'cancelled':
cancelled_jobs.append(job_id)
for job_id in cancelled_jobs:
await self._cancel_job_processing(job_id)
await asyncio.sleep(5) # Poll every 5 seconds await asyncio.sleep(5) # Poll every 5 seconds
except KeyboardInterrupt: except KeyboardInterrupt:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment