Add debug prints to worker and revert monitoring to TCP

- Worker now prints when receiving jobs and sending results
- Cluster master uses TCP polling for consistency with clients
parent 09a3589a
...@@ -628,41 +628,35 @@ class ClusterMaster: ...@@ -628,41 +628,35 @@ class ClusterMaster:
return None return None
async def _monitor_job_result(self, job_id: str, process_type: str) -> None: async def _monitor_job_result(self, job_id: str, process_type: str) -> None:
"""Monitor for job result by polling database.""" """Monitor for job result from local backend."""
try: try:
from .database import get_queue_by_job_id from .comm import SocketCommunicator
import json from .config import get_backend_web_port
# Poll for result # Poll for result
for _ in range(300): # Poll for up to 5 minutes (300 * 1s) for _ in range(300): # Poll for up to 5 minutes (300 * 1s)
try: try:
queue_entry = get_queue_by_job_id(job_id) backend_comm = SocketCommunicator(host='localhost', port=get_backend_web_port(), comm_type='tcp')
if queue_entry: backend_comm.connect()
status = queue_entry['status']
if status == 'completed':
result = json.loads(queue_entry['result']) if queue_entry['result'] else {}
print(f"Received result for job {job_id}")
# Handle result
await self._handle_job_result({
'job_id': job_id,
'result': result
})
# Clean up # Send get_result request
if job_id in self.pending_jobs: result_request = Message(
del self.pending_jobs[job_id] msg_type='get_result',
return msg_id=f'poll_{job_id}',
data={'request_id': job_id}
)
backend_comm.send_message(result_request)
elif status == 'failed': # Try to receive response
error = queue_entry.get('error', 'Unknown error') response = backend_comm.receive_message()
result = {'status': 'failed', 'error': error} if response and response.msg_type in ['analyze_response', 'train_response']:
print(f"Job {job_id} failed: {error}") result_data = response.data
print(f"Received result for job {job_id}")
# Handle failed result # Handle result
await self._handle_job_result({ await self._handle_job_result({
'job_id': job_id, 'job_id': job_id,
'result': result 'result': result_data
}) })
# Clean up # Clean up
...@@ -670,13 +664,13 @@ class ClusterMaster: ...@@ -670,13 +664,13 @@ class ClusterMaster:
del self.pending_jobs[job_id] del self.pending_jobs[job_id]
return return
elif status == 'processing': elif response and response.msg_type == 'result_pending':
# Still processing, wait and try again # Result not ready yet, wait and try again
await asyncio.sleep(1) await asyncio.sleep(1)
continue continue
except Exception as e: except Exception as e:
print(f"Error polling for job {job_id} status: {e}") print(f"Error polling for job {job_id} result: {e}")
await asyncio.sleep(1) await asyncio.sleep(1)
# Timeout - job took too long # Timeout - job took too long
......
...@@ -250,22 +250,27 @@ def worker_process(backend_type: str): ...@@ -250,22 +250,27 @@ def worker_process(backend_type: str):
try: try:
message = comm.receive_message() message = comm.receive_message()
if message and message.msg_type == 'analyze_request': if message and message.msg_type == 'analyze_request':
print(f"DEBUG: Worker received analyze_request: {message.msg_id}")
data = message.data data = message.data
media_path = data.get('local_path', data.get('file_name', '')) media_path = data.get('local_path', data.get('file_name', ''))
if not media_path: if not media_path:
result = 'No media path provided' result = 'No media path provided'
print(f"DEBUG: No media path provided for job {message.msg_id}")
else: else:
prompt = data.get('prompt', 'Describe this image.') prompt = data.get('prompt', 'Describe this image.')
model_path = data.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct') model_path = data.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
interval = data.get('interval', 10) interval = data.get('interval', 10)
job_id = data.get('job_id') # Extract job_id for cancellation checking job_id = data.get('job_id') # Extract job_id for cancellation checking
print(f"DEBUG: Starting analysis of {media_path} with model {model_path} for job {message.msg_id}")
result = analyze_media(media_path, prompt, model_path, interval, job_id) result = analyze_media(media_path, prompt, model_path, interval, job_id)
print(f"DEBUG: Analysis completed for job {message.msg_id}")
# Release model reference (don't unload yet, per requirements) # Release model reference (don't unload yet, per requirements)
release_model(model_path) release_model(model_path)
# Send result back # Send result back
response = Message('analyze_response', message.msg_id, {'result': result}) response = Message('analyze_response', message.msg_id, {'result': result})
print(f"DEBUG: Sending analyze_response for job {message.msg_id}")
comm.send_message(response) comm.send_message(response)
# If in cluster mode, also notify cluster master # If in cluster mode, also notify cluster master
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment