Add job cancellation functionality to history interface

- Added cancel_job method to QueueManager for cancelling running jobs
- Added /job/<id>/cancel route in web.py for cancelling jobs via POST
- Updated history.html template to show:
  - Cancel button for processing jobs (orange button)
  - Delete button for cancelled jobs (red button)
  - Cancelled status styling (gray background)
- Added JavaScript updateJobActions function for dynamic action updates
- Modified worker_analysis.py to check for job cancellation during processing:
  - Added check_job_cancelled function to query database
  - Modified analyze_media to check cancellation before each frame and summary
  - Workers now stop processing and return 'Job cancelled by user' message
- Updated queue.py to pass job_id in data sent to workers for cancellation checking
- Job cancellation works for both local and distributed workers

Users can now cancel running analysis jobs from the history page, and cancelled jobs can be deleted from history.
parent f048470d
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
.status-processing { background: #dbeafe; color: #2563eb; } .status-processing { background: #dbeafe; color: #2563eb; }
.status-completed { background: #d1fae5; color: #065f46; } .status-completed { background: #d1fae5; color: #065f46; }
.status-failed { background: #fee2e2; color: #dc2626; } .status-failed { background: #fee2e2; color: #dc2626; }
.status-cancelled { background: #f3f4f6; color: #6b7280; }
.job-tokens { font-weight: 600; color: #667eea; text-align: center; } .job-tokens { font-weight: 600; color: #667eea; text-align: center; }
.no-jobs { text-align: center; padding: 3rem; color: #6b7280; } .no-jobs { text-align: center; padding: 3rem; color: #6b7280; }
.job-progress { color: #64748b; font-size: 0.8rem; } .job-progress { color: #64748b; font-size: 0.8rem; }
...@@ -95,6 +96,9 @@ ...@@ -95,6 +96,9 @@
spinner.remove(); spinner.remove();
} }
} }
// Update actions based on status change
updateJobActions(jobUpdate.id, jobUpdate.status);
} }
// Update tokens if completed // Update tokens if completed
...@@ -118,6 +122,89 @@ ...@@ -118,6 +122,89 @@
} }
} }
function updateJobActions(jobId, status) {
const jobRow = document.querySelector(`[data-job-id="${jobId}"]`);
if (!jobRow) return;
const actionsElement = jobRow.querySelector('.job-actions');
if (!actionsElement) return;
// Clear existing actions
actionsElement.innerHTML = '';
if (status === 'completed') {
const viewLink = document.createElement('a');
viewLink.href = `/job_result/${jobId}`;
viewLink.className = 'view-result-link';
viewLink.textContent = 'View Result';
actionsElement.appendChild(viewLink);
} else if (status === 'processing') {
const cancelForm = document.createElement('form');
cancelForm.method = 'post';
cancelForm.action = `/job/${jobId}/cancel`;
cancelForm.style.display = 'inline';
cancelForm.style.marginRight = '0.5rem';
cancelForm.onsubmit = () => confirm('Are you sure you want to cancel this running job?');
const cancelBtn = document.createElement('button');
cancelBtn.type = 'submit';
cancelBtn.className = 'cancel-btn';
cancelBtn.style.background = '#f59e0b';
cancelBtn.style.color = 'white';
cancelBtn.style.border = 'none';
cancelBtn.style.padding = '0.25rem 0.5rem';
cancelBtn.style.borderRadius = '4px';
cancelBtn.style.fontSize = '0.8rem';
cancelBtn.style.cursor = 'pointer';
cancelBtn.textContent = 'Cancel';
cancelForm.appendChild(cancelBtn);
actionsElement.appendChild(cancelForm);
} else if (status === 'cancelled') {
const deleteForm = document.createElement('form');
deleteForm.method = 'post';
deleteForm.action = `/job/${jobId}/delete`;
deleteForm.style.display = 'inline';
deleteForm.onsubmit = () => confirm('Are you sure you want to delete this cancelled job?');
const deleteBtn = document.createElement('button');
deleteBtn.type = 'submit';
deleteBtn.className = 'delete-btn';
deleteBtn.style.background = '#dc2626';
deleteBtn.style.color = 'white';
deleteBtn.style.border = 'none';
deleteBtn.style.padding = '0.25rem 0.5rem';
deleteBtn.style.borderRadius = '4px';
deleteBtn.style.fontSize = '0.8rem';
deleteBtn.style.cursor = 'pointer';
deleteBtn.textContent = 'Delete';
deleteForm.appendChild(deleteBtn);
actionsElement.appendChild(deleteForm);
} else if (status === 'queued') {
const deleteForm = document.createElement('form');
deleteForm.method = 'post';
deleteForm.action = `/job/${jobId}/delete`;
deleteForm.style.display = 'inline';
deleteForm.onsubmit = () => confirm('Are you sure you want to delete this queued job?');
const deleteBtn = document.createElement('button');
deleteBtn.type = 'submit';
deleteBtn.className = 'delete-btn';
deleteBtn.style.background = '#dc2626';
deleteBtn.style.color = 'white';
deleteBtn.style.border = 'none';
deleteBtn.style.padding = '0.25rem 0.5rem';
deleteBtn.style.borderRadius = '4px';
deleteBtn.style.fontSize = '0.8rem';
deleteBtn.style.cursor = 'pointer';
deleteBtn.textContent = 'Delete';
deleteForm.appendChild(deleteBtn);
actionsElement.appendChild(deleteForm);
}
}
// Initialize completed jobs tracking // Initialize completed jobs tracking
function initializeCompletedJobs() { function initializeCompletedJobs() {
const jobRows = document.querySelectorAll('[data-job-id]'); const jobRows = document.querySelectorAll('[data-job-id]');
...@@ -162,8 +249,17 @@ ...@@ -162,8 +249,17 @@
<div class="job-actions"> <div class="job-actions">
{% if job.status == 'completed' %} {% if job.status == 'completed' %}
<a href="/job_result/{{ job.id }}" class="view-result-link">View Result</a> <a href="/job_result/{{ job.id }}" class="view-result-link">View Result</a>
{% elif job.status == 'processing' and job.result %} {% elif job.status == 'processing' %}
<form method="post" action="/job/{{ job.id }}/cancel" style="display: inline; margin-right: 0.5rem;" onsubmit="return confirm('Are you sure you want to cancel this running job?')">
<button type="submit" class="cancel-btn" style="background: #f59e0b; color: white; border: none; padding: 0.25rem 0.5rem; border-radius: 4px; font-size: 0.8rem; cursor: pointer;">Cancel</button>
</form>
{% if job.result %}
<div class="job-progress">{{ job.result.get('status', 'Processing...') }}</div> <div class="job-progress">{{ job.result.get('status', 'Processing...') }}</div>
{% endif %}
{% elif job.status == 'cancelled' %}
<form method="post" action="/job/{{ job.id }}/delete" style="display: inline;" onsubmit="return confirm('Are you sure you want to delete this cancelled job?')">
<button type="submit" class="delete-btn" style="background: #dc2626; color: white; border: none; padding: 0.25rem 0.5rem; border-radius: 4px; font-size: 0.8rem; cursor: pointer;">Delete</button>
</form>
{% elif job.status == 'queued' %} {% elif job.status == 'queued' %}
<form method="post" action="/job/{{ job.id }}/delete" style="display: inline;" onsubmit="return confirm('Are you sure you want to delete this queued job?')"> <form method="post" action="/job/{{ job.id }}/delete" style="display: inline;" onsubmit="return confirm('Are you sure you want to delete this queued job?')">
<button type="submit" class="delete-btn" style="background: #dc2626; color: white; border: none; padding: 0.25rem 0.5rem; border-radius: 4px; font-size: 0.8rem; cursor: pointer;">Delete</button> <button type="submit" class="delete-btn" style="background: #dc2626; color: white; border: none; padding: 0.25rem 0.5rem; border-radius: 4px; font-size: 0.8rem; cursor: pointer;">Delete</button>
......
...@@ -65,6 +65,29 @@ class QueueManager: ...@@ -65,6 +65,29 @@ class QueueManager:
from .database import delete_queue_item from .database import delete_queue_item
return delete_queue_item(queue_id, user_id) return delete_queue_item(queue_id, user_id)
def cancel_job(self, queue_id: int, user_id: int = None) -> bool:
"""Cancel a running job."""
# Get job status first
job = get_queue_status(queue_id)
if not job:
return False
# Check ownership if user_id provided
if user_id and job['user_id'] != user_id:
return False
# Only allow cancellation of queued or processing jobs
if job['status'] not in ['queued', 'processing']:
return False
# Update status to cancelled
update_queue_status(queue_id, 'cancelled')
# If job was processing, we might need to notify workers to stop
# For now, just mark as cancelled - workers should check status periodically
return True
def _process_queue(self) -> None: def _process_queue(self) -> None:
"""Background thread to process queued jobs.""" """Background thread to process queued jobs."""
while self.running: while self.running:
...@@ -172,11 +195,15 @@ class QueueManager: ...@@ -172,11 +195,15 @@ class QueueManager:
# Send job to the appropriate client # Send job to the appropriate client
if client_id in cluster_master.client_sockets: if client_id in cluster_master.client_sockets:
# Add job_id to data for cancellation checking
job_data = job['data'].copy()
job_data['job_id'] = job['id']
message = { message = {
'type': 'job_request', 'type': 'job_request',
'job_id': job['id'], 'job_id': job['id'],
'request_type': job['request_type'], 'request_type': job['request_type'],
'data': job['data'] 'data': job_data
} }
cluster_master.client_sockets[client_id].sendall( cluster_master.client_sockets[client_id].sendall(
json.dumps(message).encode('utf-8') + b'\n' json.dumps(message).encode('utf-8') + b'\n'
...@@ -200,10 +227,14 @@ class QueueManager: ...@@ -200,10 +227,14 @@ class QueueManager:
# Send to backend for processing # Send to backend for processing
from .backend import handle_web_message from .backend import handle_web_message
# Add job_id to data for cancellation checking
job_data = job['data'].copy()
job_data['job_id'] = job['id']
message = Message( message = Message(
msg_type=job['request_type'], msg_type=job['request_type'],
msg_id=str(job['id']), msg_id=str(job['id']),
data=job['data'] data=job_data
) )
# For now, simulate processing # For now, simulate processing
......
...@@ -344,6 +344,21 @@ def delete_job(job_id): ...@@ -344,6 +344,21 @@ def delete_job(job_id):
return redirect(url_for('history')) return redirect(url_for('history'))
@app.route('/job/<int:job_id>/cancel', methods=['POST'])
@login_required
def cancel_job(job_id):
"""Cancel a running job."""
user = get_current_user_session()
from .queue import queue_manager
if queue_manager.cancel_job(job_id, user['id']):
flash('Job cancelled successfully!', 'success')
else:
flash('Failed to cancel job or access denied.', 'error')
return redirect(url_for('history'))
@app.route('/job_result/<int:job_id>') @app.route('/job_result/<int:job_id>')
@login_required @login_required
def job_result(job_id): def job_result(job_id):
......
...@@ -154,7 +154,16 @@ def analyze_single_image(image_path, prompt, model): ...@@ -154,7 +154,16 @@ def analyze_single_image(image_path, prompt, model):
return model.generate({"messages": messages}, max_new_tokens=128) return model.generate({"messages": messages}, max_new_tokens=128)
def analyze_media(media_path, prompt, model_path, interval=10): def check_job_cancelled(job_id):
"""Check if a job has been cancelled."""
try:
from .database import get_queue_status
job = get_queue_status(job_id)
return job and job['status'] == 'cancelled'
except:
return False
def analyze_media(media_path, prompt, model_path, interval=10, job_id=None):
"""Analyze media using dynamic model loading.""" """Analyze media using dynamic model loading."""
torch.cuda.empty_cache() torch.cuda.empty_cache()
...@@ -175,6 +184,22 @@ def analyze_media(media_path, prompt, model_path, interval=10): ...@@ -175,6 +184,22 @@ def analyze_media(media_path, prompt, model_path, interval=10):
descriptions = [] descriptions = []
for i, (frame_path, ts) in enumerate(frames): for i, (frame_path, ts) in enumerate(frames):
# Check for cancellation
if job_id and check_job_cancelled(job_id):
# Clean up and return cancelled message
for fp, _ in frames[i:]:
try:
os.unlink(fp)
except:
pass
if output_dir:
try:
import shutil
shutil.rmtree(output_dir)
except:
pass
return "Job cancelled by user"
desc = analyze_single_image(frame_path, full_prompt, model) desc = analyze_single_image(frame_path, full_prompt, model)
descriptions.append(f"At {ts:.2f}s: {desc}") descriptions.append(f"At {ts:.2f}s: {desc}")
os.unlink(frame_path) os.unlink(frame_path)
...@@ -183,6 +208,10 @@ def analyze_media(media_path, prompt, model_path, interval=10): ...@@ -183,6 +208,10 @@ def analyze_media(media_path, prompt, model_path, interval=10):
import shutil import shutil
shutil.rmtree(output_dir) shutil.rmtree(output_dir)
# Check for cancellation before summary
if job_id and check_job_cancelled(job_id):
return "Job cancelled by user"
# Generate summary # Generate summary
if model.supports_vision(): if model.supports_vision():
# Use vision model for summary # Use vision model for summary
...@@ -196,6 +225,10 @@ def analyze_media(media_path, prompt, model_path, interval=10): ...@@ -196,6 +225,10 @@ def analyze_media(media_path, prompt, model_path, interval=10):
result = f"Frame Descriptions:\n" + "\n".join(descriptions) + f"\n\nSummary:\n{summary}" result = f"Frame Descriptions:\n" + "\n".join(descriptions) + f"\n\nSummary:\n{summary}"
return result return result
else: else:
# Check for cancellation before processing image
if job_id and check_job_cancelled(job_id):
return "Job cancelled by user"
result = analyze_single_image(media_path, full_prompt, model) result = analyze_single_image(media_path, full_prompt, model)
torch.cuda.empty_cache() torch.cuda.empty_cache()
return result return result
...@@ -225,7 +258,8 @@ def worker_process(backend_type: str): ...@@ -225,7 +258,8 @@ def worker_process(backend_type: str):
prompt = data.get('prompt', 'Describe this image.') prompt = data.get('prompt', 'Describe this image.')
model_path = data.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct') model_path = data.get('model_path', 'Qwen/Qwen2.5-VL-7B-Instruct')
interval = data.get('interval', 10) interval = data.get('interval', 10)
result = analyze_media(media_path, prompt, model_path, interval) job_id = data.get('job_id') # Extract job_id for cancellation checking
result = analyze_media(media_path, prompt, model_path, interval, job_id)
# Release model reference (don't unload yet, per requirements) # Release model reference (don't unload yet, per requirements)
release_model(model_path) release_model(model_path)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment