Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
V
vidai
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
SexHackMe
vidai
Commits
5b39ec90
Commit
5b39ec90
authored
Oct 09, 2025
by
Stefy Lanza (nextime / spora )
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Make all debug prints conditional on get_debug() - suppress debug output when --debug not specified
parent
e6634929
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
68 additions
and
36 deletions
+68
-36
worker_analysis.py
vidai/worker_analysis.py
+50
-26
worker_training.py
vidai/worker_training.py
+18
-10
No files found.
vidai/worker_analysis.py
View file @
5b39ec90
...
...
@@ -183,7 +183,8 @@ def check_job_cancelled(job_id):
def
analyze_media
(
media_path
,
prompt
,
model_path
,
interval
=
10
,
job_id_int
=
None
,
comm
=
None
):
"""Analyze media using dynamic model loading."""
print
(
f
"DEBUG: Starting analyze_media for job {job_id_int}, media_path={media_path}"
)
if
get_debug
():
print
(
f
"DEBUG: Starting analyze_media for job {job_id_int}, media_path={media_path}"
)
# Send initial progress update
if
comm
:
...
...
@@ -201,9 +202,11 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
total_tokens
=
0
# Get model with reference counting
print
(
f
"DEBUG: Loading model {model_path} for job {job_id_int}"
)
if
get_debug
():
print
(
f
"DEBUG: Loading model {model_path} for job {job_id_int}"
)
model
=
get_or_load_model
(
model_path
)
print
(
f
"DEBUG: Model loaded for job {job_id_int}"
)
if
get_debug
():
print
(
f
"DEBUG: Model loaded for job {job_id_int}"
)
# Send progress update after model loading
if
comm
:
...
...
@@ -217,17 +220,20 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
print
(
f
"PROGRESS: Job {job_id_int} - 8
% -
Model loaded successfully"
)
# Get system prompt
print
(
f
"DEBUG: Retrieving system prompt for job {job_id_int}"
)
if
get_debug
():
print
(
f
"DEBUG: Retrieving system prompt for job {job_id_int}"
)
try
:
from
.config
import
get_system_prompt_content
system_prompt
=
get_system_prompt_content
()
full_prompt
=
system_prompt
+
" "
+
prompt
if
system_prompt
else
prompt
except
:
full_prompt
=
prompt
print
(
f
"DEBUG: Full prompt set for job {job_id_int}"
)
if
get_debug
():
print
(
f
"DEBUG: Full prompt set for job {job_id_int}"
)
if
is_video
(
media_path
):
print
(
f
"DEBUG: Detected video, extracting frames for job {job_id_int}"
)
if
get_debug
():
print
(
f
"DEBUG: Detected video, extracting frames for job {job_id_int}"
)
frames
,
output_dir
=
extract_frames
(
media_path
,
interval
,
optimize
=
True
)
total_frames
=
len
(
frames
)
...
...
@@ -245,7 +251,8 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
descriptions
=
[]
for
i
,
(
frame_path
,
ts
)
in
enumerate
(
frames
):
print
(
f
"DEBUG: Processing frame {i+1}/{total_frames} at {ts:.2f}s for job {job_id_int}"
)
if
get_debug
():
print
(
f
"DEBUG: Processing frame {i+1}/{total_frames} at {ts:.2f}s for job {job_id_int}"
)
# Send progress update before processing
if
comm
:
...
...
@@ -261,7 +268,8 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
# Check for cancellation
if
job_id_int
and
check_job_cancelled
(
job_id_int
):
print
(
f
"DEBUG: Job {job_id_int} cancelled during frame processing"
)
if
get_debug
():
print
(
f
"DEBUG: Job {job_id_int} cancelled during frame processing"
)
# Clean up and return cancelled message
for
fp
,
_
in
frames
[
i
:]:
try
:
...
...
@@ -278,7 +286,8 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
desc
,
tokens
=
analyze_single_image
(
frame_path
,
full_prompt
,
model
)
total_tokens
+=
tokens
print
(
f
"DEBUG: Frame {i+1} analyzed for job {job_id_int}"
)
if
get_debug
():
print
(
f
"DEBUG: Frame {i+1} analyzed for job {job_id_int}"
)
descriptions
.
append
(
f
"At {ts:.2f}s: {desc}"
)
os
.
unlink
(
frame_path
)
...
...
@@ -308,7 +317,8 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
import
shutil
shutil
.
rmtree
(
output_dir
)
print
(
f
"DEBUG: All frames processed, generating summary for job {job_id_int}"
)
if
get_debug
():
print
(
f
"DEBUG: All frames processed, generating summary for job {job_id_int}"
)
# Send progress update for summary generation
if
comm
:
...
...
@@ -323,7 +333,8 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
# Check for cancellation before summary
if
job_id_int
and
check_job_cancelled
(
job_id_int
):
print
(
f
"DEBUG: Job {job_id_int} cancelled before summary"
)
if
get_debug
():
print
(
f
"DEBUG: Job {job_id_int} cancelled before summary"
)
return
"Job cancelled by user"
,
total_tokens
# Generate summary
...
...
@@ -361,7 +372,8 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
summary_tokens
=
0
total_tokens
+=
summary_tokens
print
(
f
"DEBUG: Summary generated for job {job_id_int}"
)
if
get_debug
():
print
(
f
"DEBUG: Summary generated for job {job_id_int}"
)
# Send final progress update
if
comm
:
...
...
@@ -377,7 +389,8 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
result
=
f
"Frame Descriptions:
\n
"
+
"
\n
"
.
join
(
descriptions
)
+
f
"
\n\n
Summary:
\n
{summary}"
return
result
,
total_tokens
else
:
print
(
f
"DEBUG: Detected image, analyzing for job {job_id_int}"
)
if
get_debug
():
print
(
f
"DEBUG: Detected image, analyzing for job {job_id_int}"
)
# Send progress update for image analysis start
if
comm
:
...
...
@@ -392,7 +405,8 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
# Check for cancellation before processing image
if
job_id_int
and
check_job_cancelled
(
job_id_int
):
print
(
f
"DEBUG: Job {job_id_int} cancelled before image analysis"
)
if
get_debug
():
print
(
f
"DEBUG: Job {job_id_int} cancelled before image analysis"
)
return
"Job cancelled by user"
,
total_tokens
# Send progress update before model inference
...
...
@@ -408,7 +422,8 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
result
,
tokens
=
analyze_single_image
(
media_path
,
full_prompt
,
model
)
total_tokens
+=
tokens
print
(
f
"DEBUG: Image analysis completed for job {job_id_int}"
)
if
get_debug
():
print
(
f
"DEBUG: Image analysis completed for job {job_id_int}"
)
# Send progress update for completion
if
comm
:
...
...
@@ -436,32 +451,38 @@ def analyze_media(media_path, prompt, model_path, interval=10, job_id_int=None,
return
result
,
total_tokens
def
worker_process
(
backend_type
:
str
):
"""Main worker process."""
print
(
f
"DEBUG: Starting Analysis Worker for {backend_type}..."
)
print
(
f
"DEBUG: Worker PID: {os.getpid()}"
)
if
get_debug
():
print
(
f
"DEBUG: Starting Analysis Worker for {backend_type}..."
)
print
(
f
"DEBUG: Worker PID: {os.getpid()}"
)
# Workers use TCP for interprocess communication
comm
=
SocketCommunicator
(
host
=
'127.0.0.1'
,
port
=
get_backend_worker_port
(),
comm_type
=
'tcp'
)
print
(
f
"DEBUG: Worker connecting to {comm.host}:{comm.port}"
)
if
get_debug
():
print
(
f
"DEBUG: Worker connecting to {comm.host}:{comm.port}"
)
comm
.
connect
()
print
(
f
"Analysis Worker connected to backend"
)
if
get_debug
():
print
(
f
"Analysis Worker connected to backend"
)
# Register with backend
register_msg
=
Message
(
'register'
,
'register'
,
{
'type'
:
f
'analysis_{backend_type}'
})
comm
.
send_message
(
register_msg
)
print
(
f
"Analysis Worker registered as analysis_{backend_type}"
)
if
get_debug
():
print
(
f
"Analysis Worker registered as analysis_{backend_type}"
)
while
True
:
try
:
message
=
comm
.
receive_message
()
if
message
:
if
message
and
get_debug
()
:
print
(
f
"DEBUG: Worker {os.getpid()} received message: {message}"
)
if
message
and
message
.
msg_type
==
'analyze_request'
:
print
(
f
"DEBUG: Worker received analyze_request: {message.msg_id}"
)
if
get_debug
():
print
(
f
"DEBUG: Worker received analyze_request: {message.msg_id}"
)
data
=
message
.
data
media_path
=
data
.
get
(
'local_path'
,
data
.
get
(
'file_name'
,
''
))
if
not
media_path
:
result
=
'No media path provided'
print
(
f
"DEBUG: No media path provided for job {message.msg_id}"
)
if
get_debug
():
print
(
f
"DEBUG: No media path provided for job {message.msg_id}"
)
else
:
prompt
=
data
.
get
(
'prompt'
,
'Describe this image.'
)
model_path
=
data
.
get
(
'model_path'
,
'Qwen/Qwen2.5-VL-7B-Instruct'
)
...
...
@@ -469,16 +490,19 @@ def worker_process(backend_type: str):
job_id
=
message
.
msg_id
# Use message ID for job identification
job_id_int
=
int
(
message
.
msg_id
.
split
(
'_'
)[
1
])
# Extract integer job ID
print
(
f
"PROGRESS: Job {job_id_int} accepted - Starting analysis"
)
print
(
f
"DEBUG: Starting analysis of {media_path} with model {model_path} for job {job_id}"
)
if
get_debug
():
print
(
f
"DEBUG: Starting analysis of {media_path} with model {model_path} for job {job_id}"
)
result
,
tokens_used
=
analyze_media
(
media_path
,
prompt
,
model_path
,
interval
,
job_id_int
,
comm
)
print
(
f
"DEBUG: Analysis completed for job {message.msg_id}, used {tokens_used} tokens"
)
if
get_debug
():
print
(
f
"DEBUG: Analysis completed for job {message.msg_id}, used {tokens_used} tokens"
)
# Release model reference (don't unload yet, per requirements)
release_model
(
model_path
)
# Send result back
response
=
Message
(
'analyze_response'
,
message
.
msg_id
,
{
'result'
:
result
,
'tokens_used'
:
tokens_used
})
print
(
f
"DEBUG: Sending analyze_response for job {message.msg_id}"
)
if
get_debug
():
print
(
f
"DEBUG: Sending analyze_response for job {message.msg_id}"
)
comm
.
send_message
(
response
)
# If in cluster mode, also notify cluster master
...
...
vidai/worker_training.py
View file @
5b39ec90
...
...
@@ -27,11 +27,12 @@ import shutil
import
json
import
time
from
.comm
import
SocketCommunicator
,
Message
from
.config
import
get_comm_type
,
get_backend_worker_port
from
.config
import
get_comm_type
,
get_backend_worker_port
,
get_debug
def
train_model
(
train_path
,
output_model
,
description
):
"""Perform training."""
print
(
f
"DEBUG: Starting training with videotrain for output_model {output_model}"
)
if
get_debug
():
print
(
f
"DEBUG: Starting training with videotrain for output_model {output_model}"
)
desc_file
=
os
.
path
.
join
(
train_path
,
"description.txt"
)
with
open
(
desc_file
,
"w"
)
as
f
:
f
.
write
(
description
)
...
...
@@ -39,7 +40,8 @@ def train_model(train_path, output_model, description):
# Assume videotrain is available
cmd
=
[
"python"
,
"videotrain"
,
train_path
,
"--output_dir"
,
output_model
]
result
=
subprocess
.
run
(
cmd
,
capture_output
=
True
,
text
=
True
)
print
(
f
"DEBUG: Training subprocess completed with returncode {result.returncode}"
)
if
get_debug
():
print
(
f
"DEBUG: Training subprocess completed with returncode {result.returncode}"
)
if
result
.
returncode
==
0
:
return
"Training completed!"
else
:
...
...
@@ -47,7 +49,8 @@ def train_model(train_path, output_model, description):
def
worker_process
(
backend_type
:
str
):
"""Main worker process."""
print
(
f
"Starting Training Worker for {backend_type}..."
)
if
get_debug
():
print
(
f
"Starting Training Worker for {backend_type}..."
)
# Workers use TCP for interprocess communication
comm
=
SocketCommunicator
(
host
=
'127.0.0.1'
,
port
=
get_backend_worker_port
(),
comm_type
=
'tcp'
)
...
...
@@ -60,10 +63,11 @@ def worker_process(backend_type: str):
while
True
:
try
:
message
=
comm
.
receive_message
()
if
message
:
if
message
and
get_debug
()
:
print
(
f
"DEBUG: Worker {os.getpid()} received message: {message}"
)
if
message
and
message
.
msg_type
==
'train_request'
:
print
(
f
"DEBUG: Worker received train_request: {message.msg_id}"
)
if
get_debug
():
print
(
f
"DEBUG: Worker received train_request: {message.msg_id}"
)
data
=
message
.
data
output_model
=
data
.
get
(
'output_model'
,
'./VideoModel'
)
description
=
data
.
get
(
'description'
,
''
)
...
...
@@ -71,16 +75,20 @@ def worker_process(backend_type: str):
if
train_dir
and
os
.
path
.
isdir
(
train_dir
):
print
(
f
"PROGRESS: Job {message.msg_id} accepted - Starting training"
)
print
(
f
"DEBUG: Starting training for job {message.msg_id}"
)
if
get_debug
():
print
(
f
"DEBUG: Starting training for job {message.msg_id}"
)
result
=
train_model
(
train_dir
,
output_model
,
description
)
print
(
f
"PROGRESS: Job {message.msg_id} - 100
% -
Training completed"
)
print
(
f
"DEBUG: Training completed for job {message.msg_id}"
)
if
get_debug
():
print
(
f
"DEBUG: Training completed for job {message.msg_id}"
)
else
:
result
=
"No valid training directory provided"
print
(
f
"DEBUG: No valid training directory for job {message.msg_id}"
)
if
get_debug
():
print
(
f
"DEBUG: No valid training directory for job {message.msg_id}"
)
response
=
Message
(
'train_response'
,
message
.
msg_id
,
{
'message'
:
result
})
print
(
f
"DEBUG: Sending train_response for job {message.msg_id}"
)
if
get_debug
():
print
(
f
"DEBUG: Sending train_response for job {message.msg_id}"
)
comm
.
send_message
(
response
)
time
.
sleep
(
0.1
)
except
Exception
as
e
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment