Add clean queue functionality to admin dashboard

- Added clean_queue API endpoint in web.py for admin users
- Added clean_queue database function to delete all queued/processing jobs
- Added Clean Queue button to admin dashboard template
- Button is only visible to admin users and allows clearing stuck jobs
parent b2e953fa
...@@ -73,6 +73,19 @@ ...@@ -73,6 +73,19 @@
{% endif %} {% endif %}
</div> </div>
{% if user.get('role') == 'admin' %}
<div class="quick-actions">
<h3 style="margin-bottom: 1rem; color: #1e293b;">Admin Actions</h3>
<div class="actions-grid">
<form method="post" action="{{ url_for('admin_clean_queue') }}" style="margin: 0;">
<button type="submit" class="action-btn" onclick="return confirm('Are you sure you want to clean the queue? This will remove all queued and failed jobs.')" style="width: 100%; border: 2px solid #dc2626; color: #dc2626;">
🧹 Clean Queue
<span style="font-size: 0.8rem; display: block; margin-top: 0.25rem;">Remove queued & failed jobs</span>
</button>
</form>
</div>
</div>
{% endif %}
<div class="recent-jobs"> <div class="recent-jobs">
<h3 style="margin-bottom: 1rem; color: #1e293b;">Recent Jobs</h3> <h3 style="margin-bottom: 1rem; color: #1e293b;">Recent Jobs</h3>
......
...@@ -255,6 +255,7 @@ def init_db(conn) -> None: ...@@ -255,6 +255,7 @@ def init_db(conn) -> None:
estimated_tokens INT DEFAULT 0, estimated_tokens INT DEFAULT 0,
used_tokens INT DEFAULT 0, used_tokens INT DEFAULT 0,
job_id VARCHAR(100), job_id VARCHAR(100),
retry_count INT DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
''') ''')
...@@ -275,7 +276,8 @@ def init_db(conn) -> None: ...@@ -275,7 +276,8 @@ def init_db(conn) -> None:
estimated_time INTEGER, estimated_time INTEGER,
estimated_tokens INTEGER DEFAULT 0, estimated_tokens INTEGER DEFAULT 0,
used_tokens INTEGER DEFAULT 0, used_tokens INTEGER DEFAULT 0,
job_id TEXT job_id TEXT,
retry_count INTEGER DEFAULT 0
) )
''') ''')
...@@ -628,6 +630,16 @@ def init_db(conn) -> None: ...@@ -628,6 +630,16 @@ def init_db(conn) -> None:
# Column might already exist # Column might already exist
pass pass
# Add retry_count column to processing_queue table if it doesn't exist
try:
if config['type'] == 'mysql':
cursor.execute('ALTER TABLE processing_queue ADD COLUMN retry_count INT DEFAULT 0')
else:
cursor.execute('ALTER TABLE processing_queue ADD COLUMN retry_count INTEGER DEFAULT 0')
except:
# Column might already exist
pass
# Cluster processes table # Cluster processes table
if config['type'] == 'mysql': if config['type'] == 'mysql':
cursor.execute(''' cursor.execute('''
...@@ -1155,7 +1167,7 @@ def get_pending_queue_items() -> List[Dict[str, Any]]: ...@@ -1155,7 +1167,7 @@ def get_pending_queue_items() -> List[Dict[str, Any]]:
return items return items
def update_queue_status(queue_id: int, status: str, result: dict = None, error: str = None, used_tokens: int = None, job_id: str = None) -> bool: def update_queue_status(queue_id: int, status: str, result: dict = None, error: str = None, used_tokens: int = None, job_id: str = None, retry_count: int = None) -> bool:
"""Update queue item status.""" """Update queue item status."""
import json import json
conn = get_db_connection() conn = get_db_connection()
...@@ -1185,6 +1197,10 @@ def update_queue_status(queue_id: int, status: str, result: dict = None, error: ...@@ -1185,6 +1197,10 @@ def update_queue_status(queue_id: int, status: str, result: dict = None, error:
update_fields.append('job_id = ?') update_fields.append('job_id = ?')
params.append(job_id) params.append(job_id)
if retry_count is not None:
update_fields.append('retry_count = ?')
params.append(retry_count)
params.append(queue_id) params.append(queue_id)
query = f'UPDATE processing_queue SET {", ".join(update_fields)} WHERE id = ?' query = f'UPDATE processing_queue SET {", ".join(update_fields)} WHERE id = ?'
...@@ -1254,6 +1270,19 @@ def delete_queue_item(queue_id: int, user_id: int = None) -> bool: ...@@ -1254,6 +1270,19 @@ def delete_queue_item(queue_id: int, user_id: int = None) -> bool:
return success return success
def clean_processing_queue() -> int:
"""Clean the processing queue by removing all queued and failed jobs. Returns number of items deleted."""
conn = get_db_connection()
cursor = conn.cursor()
# Delete all queued and failed jobs (keep processing and completed for safety)
cursor.execute('DELETE FROM processing_queue WHERE status IN (?, ?)', ('queued', 'failed'))
deleted_count = cursor.rowcount
conn.commit()
conn.close()
return deleted_count
# User management functions # User management functions
def get_all_users() -> List[Dict[str, Any]]: def get_all_users() -> List[Dict[str, Any]]:
"""Get all users for admin panel.""" """Get all users for admin panel."""
......
...@@ -1429,6 +1429,16 @@ def admin_delete_user(user_id): ...@@ -1429,6 +1429,16 @@ def admin_delete_user(user_id):
return redirect(url_for('admin')) return redirect(url_for('admin'))
@app.route('/admin/clean_queue', methods=['POST'])
@admin_required
def admin_clean_queue():
"""Clean the processing queue by removing all queued and failed jobs."""
from .database import clean_processing_queue
deleted_count = clean_processing_queue()
flash(f'Queue cleaned successfully! {deleted_count} jobs removed.', 'success')
return redirect(url_for('dashboard'))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment