Add missing _cancel_job_processing method

- Implements job cancellation by notifying workers
- Sends cancel messages to local backend or remote clients
- Cleans up cancelled job resources
parent a5920057
...@@ -632,6 +632,52 @@ class ClusterMaster: ...@@ -632,6 +632,52 @@ class ClusterMaster:
del self.active_jobs[job_id] del self.active_jobs[job_id]
return None return None
async def _cancel_job_processing(self, job_id: str) -> None:
"""Cancel a running job by notifying the worker."""
if job_id not in self.active_jobs:
return
job_info = self.active_jobs[job_id]
client_id = job_info['client_id']
worker_key = job_info['worker_key']
print(f"Cancelling job {job_id} on worker {worker_key}")
if client_id == "local":
# Send cancel to local backend
try:
from .comm import SocketCommunicator
from .config import get_backend_web_port
backend_comm = SocketCommunicator(host='localhost', port=get_backend_web_port(), comm_type='tcp')
backend_comm.connect()
cancel_message = Message(
msg_type='cancel_job',
msg_id=f'cancel_{job_id}',
data={'job_id': job_id}
)
backend_comm.send_message(cancel_message)
print(f"Sent cancel request for job {job_id} to local backend")
except Exception as e:
print(f"Failed to send cancel to local backend: {e}")
else:
# Send cancel to remote client
if client_id in self.client_websockets:
try:
await self.client_websockets[client_id].send(json.dumps({
'type': 'cancel_job',
'job_id': job_id
}))
print(f"Sent cancel request for job {job_id} to client {client_id}")
except Exception as e:
print(f"Failed to send cancel to client {client_id}: {e}")
if client_id in self.clients:
self._remove_client(client_id)
# Clean up the job
self.complete_job(job_id)
async def _monitor_job_result(self, job_id: str, process_type: str) -> None: async def _monitor_job_result(self, job_id: str, process_type: str) -> None:
"""Monitor for job result from local backend.""" """Monitor for job result from local backend."""
try: try:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment