Commit 8fbdc061 authored by sumpfralle's avatar sumpfralle

added a window for viewing the current pool of workers including statistics


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@761 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent 5a892d7e
...@@ -40,6 +40,7 @@ import pycam.Toolpath.MotionGrid ...@@ -40,6 +40,7 @@ import pycam.Toolpath.MotionGrid
# this requires ODE - we import it later, if necessary # this requires ODE - we import it later, if necessary
#import pycam.Simulation.ODEBlocks #import pycam.Simulation.ODEBlocks
import gtk import gtk
import gobject
import webbrowser import webbrowser
import ConfigParser import ConfigParser
import urllib import urllib
...@@ -212,6 +213,7 @@ class ProjectGui: ...@@ -212,6 +213,7 @@ class ProjectGui:
("GeneralSettings", self.toggle_preferences_window, None, "<Control>p"), ("GeneralSettings", self.toggle_preferences_window, None, "<Control>p"),
("Toggle3DView", self.toggle_3d_view, None, "<Control><Shift>v"), ("Toggle3DView", self.toggle_3d_view, None, "<Control><Shift>v"),
("ToggleLogWindow", self.toggle_log_window, None, "<Control>l"), ("ToggleLogWindow", self.toggle_log_window, None, "<Control>l"),
("ToggleProcessPoolWindow", self.toggle_process_pool_window, None, None),
("HelpIntroduction", self.show_help, "Introduction", "F1"), ("HelpIntroduction", self.show_help, "Introduction", "F1"),
("HelpSupportedFormats", self.show_help, "SupportedFormats", None), ("HelpSupportedFormats", self.show_help, "SupportedFormats", None),
("HelpModelTransformations", self.show_help, "ModelTransformations", None), ("HelpModelTransformations", self.show_help, "ModelTransformations", None),
...@@ -228,7 +230,8 @@ class ProjectGui: ...@@ -228,7 +230,8 @@ class ProjectGui:
("BugTracker", self.show_help, "http://sourceforge.net/tracker/?group_id=237831&atid=1104176", None), ("BugTracker", self.show_help, "http://sourceforge.net/tracker/?group_id=237831&atid=1104176", None),
("FeatureRequest", self.show_help, "http://sourceforge.net/tracker/?group_id=237831&atid=1104179", None)): ("FeatureRequest", self.show_help, "http://sourceforge.net/tracker/?group_id=237831&atid=1104179", None)):
item = self.gui.get_object(objname) item = self.gui.get_object(objname)
if objname in ("Toggle3DView", "ToggleLogWindow"): if objname in ("Toggle3DView", "ToggleLogWindow",
"ToggleProcessPoolWindow"):
action = "toggled" action = "toggled"
else: else:
action = "activate" action = "activate"
...@@ -269,6 +272,14 @@ class ProjectGui: ...@@ -269,6 +272,14 @@ class ProjectGui:
self.gui.get_object("LogWindowCopyToClipboard").connect("clicked", self.gui.get_object("LogWindowCopyToClipboard").connect("clicked",
self.copy_log_to_clipboard) self.copy_log_to_clipboard)
self.log_model = self.gui.get_object("LogWindowList") self.log_model = self.gui.get_object("LogWindowList")
# "process pool" window
self.process_pool_window = self.gui.get_object("ProcessPoolWindow")
self.process_pool_window.set_default_size(500, 400)
self.process_pool_window.connect("delete-event", self.toggle_process_pool_window, False)
self.process_pool_window.connect("destroy", self.toggle_process_pool_window, False)
self.gui.get_object("ProcessPoolWindowClose").connect("clicked", self.toggle_process_pool_window, False)
self.gui.get_object("ProcessPoolRefreshInterval").set_value(3)
self.process_pool_model = self.gui.get_object("ProcessPoolStatisticsModel")
# set defaults # set defaults
self.model = None self.model = None
self.toolpath = pycam.Toolpath.ToolpathList() self.toolpath = pycam.Toolpath.ToolpathList()
...@@ -1440,6 +1451,45 @@ class ProjectGui: ...@@ -1440,6 +1451,45 @@ class ProjectGui:
# don't destroy the window with a "destroy" event # don't destroy the window with a "destroy" event
return True return True
@gui_activity_guard
def toggle_process_pool_window(self, widget=None, value=None, action=None):
toggle_process_pool_checkbox = self.gui.get_object("ToggleProcessPoolWindow")
checkbox_state = toggle_process_pool_checkbox.get_active()
if value is None:
new_state = checkbox_state
else:
if action is None:
new_state = value
else:
new_state = action
if new_state:
is_available = pycam.Utils.threading.is_pool_available()
self.gui.get_object("ProcessPoolDisabledText").set_visible(
not is_available)
self.gui.get_object("ProcessPoolStatisticsBox").set_visible(
is_available)
interval = int(max(1, self.gui.get_object(
"ProcessPoolRefreshInterval").get_value()))
gobject.timeout_add_seconds(interval,
self.update_process_pool_statistics)
self.process_pool_window.show()
else:
self.process_pool_window.hide()
toggle_process_pool_checkbox.set_active(new_state)
# don't destroy the window with a "destroy" event
return True
def update_process_pool_statistics(self):
stats = pycam.Utils.threading.get_pool_statistics()
model = self.process_pool_model
model.clear()
for item in stats:
model.append(item)
self.gui.get_object("ProcessPoolConnectedWorkersValue").set_text(
str(len(stats)))
# don't repeat, if the window is hidden
return self.gui.get_object("ToggleProcessPoolWindow").get_active()
@gui_activity_guard @gui_activity_guard
def toggle_3d_view(self, widget=None, value=None): def toggle_3d_view(self, widget=None, value=None):
toggle_3d_checkbox = self.gui.get_object("Toggle3DView") toggle_3d_checkbox = self.gui.get_object("Toggle3DView")
......
...@@ -59,6 +59,13 @@ def run_in_parallel(*args, **kwargs): ...@@ -59,6 +59,13 @@ def run_in_parallel(*args, **kwargs):
else: else:
return run_in_parallel_remote(*args, **kwargs) return run_in_parallel_remote(*args, **kwargs)
def is_pool_available():
return not __manager is None
def get_pool_statistics():
global __manager
return __manager.statistics().get_worker_statistics()
def init_threading(number_of_processes=None, enable_server=False, remote=None, run_server=False, def init_threading(number_of_processes=None, enable_server=False, remote=None, run_server=False,
server_credentials=""): server_credentials=""):
global __multiprocessing, __num_of_processes, __manager, __closing, __task_source_uuid global __multiprocessing, __num_of_processes, __manager, __closing, __task_source_uuid
...@@ -214,14 +221,15 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list): ...@@ -214,14 +221,15 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
task_name = "task-%s" % str(task_id) task_name = "task-%s" % str(task_id)
worker = __multiprocessing.Process( worker = __multiprocessing.Process(
name=task_name, target=_handle_tasks, name=task_name, target=_handle_tasks,
args=(tasks, results, stats, cache, __closing)) args=(tasks, results, stats, cache,
__closing))
worker.start() worker.start()
workers.append(worker) workers.append(worker)
# wait until all workers are finished # wait until all workers are finished
for worker in workers: for worker in workers:
worker.join() worker.join()
else: else:
time.sleep(0.2) time.sleep(1.0)
except KeyboardInterrupt: except KeyboardInterrupt:
log.info("Spawner daemon killed by keyboard interrupt") log.info("Spawner daemon killed by keyboard interrupt")
# set the "closing" flag and just exit # set the "closing" flag and just exit
...@@ -236,9 +244,13 @@ def _handle_tasks(tasks, results, stats, cache, closing): ...@@ -236,9 +244,13 @@ def _handle_tasks(tasks, results, stats, cache, closing):
local_cache = {} local_cache = {}
timeout_limit = 60 timeout_limit = 60
timeout_counter = 0 timeout_counter = 0
last_worker_notification = 0
log.debug("Worker thread started: %s" % name) log.debug("Worker thread started: %s" % name)
try: try:
while (timeout_counter < timeout_limit) and not closing.get(): while (timeout_counter < timeout_limit) and not closing.get():
if last_worker_notification + 30 < time.time():
stats.worker_notification(name)
last_worker_notification = time.time()
try: try:
start_time = time.time() start_time = time.time()
job_id, task_id, func, args = tasks.get(timeout=1.0) job_id, task_id, func, args = tasks.get(timeout=1.0)
...@@ -294,8 +306,6 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -294,8 +306,6 @@ def run_in_parallel_remote(func, args_list, unordered=False,
tasks_queue.put((job_id, index, func, result_args)) tasks_queue.put((job_id, index, func, result_args))
stats.add_queueing_time(__task_source_uuid, time.time() - start_time) stats.add_queueing_time(__task_source_uuid, time.time() - start_time)
log.debug("Added %d tasks for job %s" % (len(args_list), job_id)) log.debug("Added %d tasks for job %s" % (len(args_list), job_id))
def job_cleanup():
print stats.get_stats()
result_buffer = {} result_buffer = {}
index = 0 index = 0
while index < len(args_list): while index < len(args_list):
...@@ -347,11 +357,9 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -347,11 +357,9 @@ def run_in_parallel_remote(func, args_list, unordered=False,
# don't keep more than 10 old job ids # don't keep more than 10 old job ids
while len(__finished_jobs) > 10: while len(__finished_jobs) > 10:
__finished_jobs.pop(0) __finished_jobs.pop(0)
job_cleanup()
# re-raise the GeneratorExit exception to finish destruction # re-raise the GeneratorExit exception to finish destruction
raise raise
log.debug("Parallel processing finished: %s" % job_id) log.debug("Parallel processing finished: %s" % job_id)
job_cleanup()
else: else:
for args in args_list: for args in args_list:
yield func(args) yield func(args)
...@@ -409,14 +417,23 @@ class OneProcess(object): ...@@ -409,14 +417,23 @@ class OneProcess(object):
class ProcessStatistics(object): class ProcessStatistics(object):
EXPIRY_TIMER = 120
def __init__(self): def __init__(self):
self.processes = {} self.processes = {}
self.queues = {} self.queues = {}
self.workers = {}
def __str__(self): def __str__(self):
return os.linesep.join([str(item) return os.linesep.join([str(item)
for item in self.processes.values() + self.queues.values()]) for item in self.processes.values() + self.queues.values()])
def _refresh_workers(self):
oldest_valid = time.time() - self.EXPIRY_TIMER
for key, timestamp in self.workers.iteritems():
if timestamp < oldest_valid:
del self.workers[key]
def get_stats(self): def get_stats(self):
return str(self) return str(self)
...@@ -438,6 +455,26 @@ class ProcessStatistics(object): ...@@ -438,6 +455,26 @@ class ProcessStatistics(object):
self.queues[name].transfer_count += 1 self.queues[name].transfer_count += 1
self.queues[name].transfer_time += amount self.queues[name].transfer_time += amount
def worker_notification(self, name):
timestamp = time.time()
self.workers[name] = timestamp
def get_worker_statistics(self):
self._refresh_workers()
now = time.time()
result = []
for key in self.workers:
if key in self.processes:
one_process = self.processes[key]
last_notification = int(now - self.workers[key])
num_of_tasks = one_process.process_count
process_time = one_process.process_time
avg_process_time = process_time / num_of_tasks
avg_transfer_time = one_process.transfer_time / num_of_tasks
result.append((key, last_notification, num_of_tasks,
process_time, avg_process_time, avg_transfer_time))
return result
# TODO: implement an expiry time for cache items # TODO: implement an expiry time for cache items
class ProcessDataCache(object): class ProcessDataCache(object):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment