Implement progress tracking for jobs: add progress updates in worker, backend...

Implement progress tracking for jobs: add progress updates in worker, backend handlers, and database updates
parent e8a68109
...@@ -102,6 +102,21 @@ def handle_web_message(message: Message, client_sock=None) -> Message: ...@@ -102,6 +102,21 @@ def handle_web_message(message: Message, client_sock=None) -> Message:
'analysis_backend': get_analysis_backend(), 'analysis_backend': get_analysis_backend(),
'training_backend': get_training_backend() 'training_backend': get_training_backend()
}) })
elif message.msg_type == 'get_progress':
# Get progress from database
job_id = message.data.get('job_id')
if job_id:
from .database import get_queue_status
job = get_queue_status(job_id)
if job:
progress_data = {
'job_id': job_id,
'progress': job.get('progress', 0),
'message': job.get('progress_message', ''),
'stage': 'processing'
}
return Message('progress', message.msg_id, progress_data)
return Message('progress_pending', message.msg_id, {'status': 'no_progress'})
elif message.msg_type == 'get_result': elif message.msg_type == 'get_result':
# Check for pending result # Check for pending result
msg_id = message.data.get('request_id') msg_id = message.data.get('request_id')
...@@ -142,6 +157,14 @@ def handle_worker_message(message: Message, client_sock) -> None: ...@@ -142,6 +157,14 @@ def handle_worker_message(message: Message, client_sock) -> None:
# Update progress timestamp to reset timeout # Update progress timestamp to reset timeout
job_progress_times[job_id] = time.time() job_progress_times[job_id] = time.time()
print(f"PING received for job {job_id} - resetting timeout") print(f"PING received for job {job_id} - resetting timeout")
elif message.msg_type == 'progress':
# Update job progress in database
from .database import update_job_progress
job_id = message.data.get('job_id')
progress = message.data.get('progress', 0)
progress_message = message.data.get('message', '')
if job_id:
update_job_progress(job_id, progress, progress_message)
elif message.msg_type in ['analyze_response', 'train_response']: elif message.msg_type in ['analyze_response', 'train_response']:
# Store result for web to poll # Store result for web to poll
pending_results[message.msg_id] = message pending_results[message.msg_id] = message
......
...@@ -257,6 +257,8 @@ def init_db(conn) -> None: ...@@ -257,6 +257,8 @@ def init_db(conn) -> None:
used_tokens INT DEFAULT 0, used_tokens INT DEFAULT 0,
job_id VARCHAR(100), job_id VARCHAR(100),
retry_count INT DEFAULT 0, retry_count INT DEFAULT 0,
progress INT DEFAULT 0,
progress_message TEXT,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
''') ''')
...@@ -279,7 +281,9 @@ def init_db(conn) -> None: ...@@ -279,7 +281,9 @@ def init_db(conn) -> None:
estimated_tokens INTEGER DEFAULT 0, estimated_tokens INTEGER DEFAULT 0,
used_tokens INTEGER DEFAULT 0, used_tokens INTEGER DEFAULT 0,
job_id TEXT, job_id TEXT,
retry_count INTEGER DEFAULT 0 retry_count INTEGER DEFAULT 0,
progress INTEGER DEFAULT 0,
progress_message TEXT
) )
''') ''')
...@@ -654,6 +658,26 @@ def init_db(conn) -> None: ...@@ -654,6 +658,26 @@ def init_db(conn) -> None:
# Column might already exist # Column might already exist
pass pass
# Add progress column to processing_queue table if it doesn't exist
try:
if config['type'] == 'mysql':
cursor.execute('ALTER TABLE processing_queue ADD COLUMN progress INT DEFAULT 0')
else:
cursor.execute('ALTER TABLE processing_queue ADD COLUMN progress INTEGER DEFAULT 0')
except:
# Column might already exist
pass
# Add progress_message column to processing_queue table if it doesn't exist
try:
if config['type'] == 'mysql':
cursor.execute('ALTER TABLE processing_queue ADD COLUMN progress_message TEXT')
else:
cursor.execute('ALTER TABLE processing_queue ADD COLUMN progress_message TEXT')
except:
# Column might already exist
pass
# Add updated_at column to processing_queue table if it doesn't exist # Add updated_at column to processing_queue table if it doesn't exist
try: try:
# First check if column exists # First check if column exists
...@@ -1317,6 +1341,22 @@ def delete_queue_item(queue_id: int, user_id: int = None) -> bool: ...@@ -1317,6 +1341,22 @@ def delete_queue_item(queue_id: int, user_id: int = None) -> bool:
return success return success
def update_job_progress(job_id: str, progress: int, progress_message: str = None) -> bool:
"""Update job progress and message."""
conn = get_db_connection()
cursor = conn.cursor()
if progress_message:
cursor.execute('UPDATE processing_queue SET progress = ?, progress_message = ?, updated_at = CURRENT_TIMESTAMP WHERE job_id = ?', (progress, progress_message, job_id))
else:
cursor.execute('UPDATE processing_queue SET progress = ?, updated_at = CURRENT_TIMESTAMP WHERE job_id = ?', (progress, job_id))
conn.commit()
success = cursor.rowcount > 0
conn.close()
return success
def clean_processing_queue() -> int: def clean_processing_queue() -> int:
"""Clean the processing queue by removing all queued and failed jobs. Returns number of items deleted.""" """Clean the processing queue by removing all queued and failed jobs. Returns number of items deleted."""
conn = get_db_connection() conn = get_db_connection()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment