Commit 95b35bd0 authored by sumpfralle's avatar sumpfralle

added debug messages to parallel process handling

improved uuid handling of Model


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@745 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent 1e430f72
...@@ -180,7 +180,7 @@ class Model(BaseModel): ...@@ -180,7 +180,7 @@ class Model(BaseModel):
self._item_groups.append(self._triangles) self._item_groups.append(self._triangles)
self._export_function = pycam.Exporters.STLExporter.STLExporter self._export_function = pycam.Exporters.STLExporter.STLExporter
# marker for state of kdtree and uuid # marker for state of kdtree and uuid
self._dirty = True self._dirty = False
# enable/disable kdtree # enable/disable kdtree
self._use_kdtree = use_kdtree self._use_kdtree = use_kdtree
self._t_kdtree = None self._t_kdtree = None
...@@ -189,9 +189,7 @@ class Model(BaseModel): ...@@ -189,9 +189,7 @@ class Model(BaseModel):
@property @property
def uuid(self): def uuid(self):
if (self.__uuid is None) or self._dirty: if (self.__uuid is None) or self._dirty:
self.__uuid = str(uuid.uuid4()) self._update_caches()
if self._dirty:
self._update_kdtree()
return self.__uuid return self.__uuid
def append(self, item): def append(self, item):
...@@ -204,11 +202,12 @@ class Model(BaseModel): ...@@ -204,11 +202,12 @@ class Model(BaseModel):
def reset_cache(self): def reset_cache(self):
super(Model, self).reset_cache() super(Model, self).reset_cache()
# the triangle kdtree needs to be reset after transforming the model # the triangle kdtree needs to be reset after transforming the model
self._update_kdtree() self._update_caches()
def _update_kdtree(self): def _update_caches(self):
if self._use_kdtree: if self._use_kdtree:
self._t_kdtree = TriangleKdtree(self.triangles()) self._t_kdtree = TriangleKdtree(self.triangles())
self.__uuid = str(uuid.uuid4())
# the kdtree is up-to-date again # the kdtree is up-to-date again
self._dirty = False self._dirty = False
...@@ -220,7 +219,7 @@ class Model(BaseModel): ...@@ -220,7 +219,7 @@ class Model(BaseModel):
if self._use_kdtree: if self._use_kdtree:
# update the kdtree, if new triangles were added meanwhile # update the kdtree, if new triangles were added meanwhile
if self._dirty: if self._dirty:
self._update_kdtree() self._update_caches()
return self._t_kdtree.Search(minx, maxx, miny, maxy) return self._t_kdtree.Search(minx, maxx, miny, maxy)
return self._triangles return self._triangles
......
...@@ -167,7 +167,14 @@ def init_threading(number_of_processes=None, enable_server=False, remote=None, r ...@@ -167,7 +167,14 @@ def init_threading(number_of_processes=None, enable_server=False, remote=None, r
def cleanup(): def cleanup():
global __manager, __closing global __manager, __closing
if __multiprocessing and __closing: if __multiprocessing and __closing:
log.debug("Shutting down process handler")
__closing.set(True) __closing.set(True)
# Only managers that were started via ".start()" implement a "shutdown".
# Managers started via ".connect" may skip this.
if hasattr(__manager, "shutdown"):
# wait for the spawner and the worker threads to go down
time.sleep(1.5)
__manager.shutdown()
def _spawn_daemon(manager, number_of_processes, worker_uuid_list): def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
""" wait for items in the 'tasks' queue to appear and then spawn workers """ wait for items in the 'tasks' queue to appear and then spawn workers
...@@ -177,14 +184,18 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list): ...@@ -177,14 +184,18 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
results = manager.results() results = manager.results()
stats = manager.statistics() stats = manager.statistics()
cache = manager.cache() cache = manager.cache()
log.debug("Spawner daemon started with %d processes" % number_of_processes)
log.debug("Registering %d worker threads: %s" \
% (len(worker_uuid_list), worker_uuid_list))
try: try:
while not __closing.get(): while not __closing.get():
if not tasks.empty(): if not tasks.empty():
workers = [] workers = []
for task_id in worker_uuid_list: for task_id in worker_uuid_list:
task_name = "task-%s" % str(task_id)
worker = __multiprocessing.Process( worker = __multiprocessing.Process(
name="task-%s" % str(task_id), target=_handle_tasks, name=task_name, target=_handle_tasks,
args=(tasks, results, stats, cache)) args=(tasks, results, stats, cache, __closing))
worker.start() worker.start()
workers.append(worker) workers.append(worker)
# wait until all workers are finished # wait until all workers are finished
...@@ -193,16 +204,19 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list): ...@@ -193,16 +204,19 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
else: else:
time.sleep(0.2) time.sleep(0.2)
except KeyboardInterrupt: except KeyboardInterrupt:
log.debug("Spawner daemon killed by keyboard interrupt")
# set the "closing" flag and just exit # set the "closing" flag and just exit
__closing.set(True) __closing.set(True)
def _handle_tasks(tasks, results, stats, cache): def _handle_tasks(tasks, results, stats, cache, closing):
global __multiprocessing global __multiprocessing
name = __multiprocessing.current_process().name name = __multiprocessing.current_process().name
local_cache = {} local_cache = {}
timeout_counter = 60 timeout_limit = 60
timeout_counter = 0
log.debug("Worker thread started: %s" % name)
try: try:
while timeout_counter > 0: while (timeout_counter < timeout_limit) and not closing.get():
try: try:
start_time = time.time() start_time = time.time()
job_id, func, args = tasks.get(timeout=1.0) job_id, func, args = tasks.get(timeout=1.0)
...@@ -221,9 +235,11 @@ def _handle_tasks(tasks, results, stats, cache): ...@@ -221,9 +235,11 @@ def _handle_tasks(tasks, results, stats, cache):
stats.add_process_time(name, time.time() - start_time) stats.add_process_time(name, time.time() - start_time)
except Queue.Empty: except Queue.Empty:
time.sleep(1.0) time.sleep(1.0)
timeout_counter -= 1 timeout_counter += 1
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
log.debug("Worker thread finished after %d seconds of inactivity: %s" \
% (timeout_counter, name))
def run_in_parallel_remote(func, args_list, unordered=False, def run_in_parallel_remote(func, args_list, unordered=False,
disable_multiprocessing=False, host=None): disable_multiprocessing=False, host=None):
...@@ -232,12 +248,13 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -232,12 +248,13 @@ def run_in_parallel_remote(func, args_list, unordered=False,
# threading was not configured before # threading was not configured before
init_threading() init_threading()
if __multiprocessing and not disable_multiprocessing: if __multiprocessing and not disable_multiprocessing:
job_id = str(uuid.uuid1())
log.debug("Starting parallel tasks: %s" % job_id)
tasks_queue = __manager.tasks() tasks_queue = __manager.tasks()
results_queue = __manager.results() results_queue = __manager.results()
remote_cache = __manager.cache() remote_cache = __manager.cache()
stats = __manager.statistics() stats = __manager.statistics()
local_cache = {} local_cache = {}
job_id = str(uuid.uuid1())
for args in args_list: for args in args_list:
start_time = time.time() start_time = time.time()
result_args = [] result_args = []
...@@ -247,6 +264,9 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -247,6 +264,9 @@ def run_in_parallel_remote(func, args_list, unordered=False,
data_uuid = ProcessDataCacheItemID(arg.uuid) data_uuid = ProcessDataCacheItemID(arg.uuid)
if not data_uuid in local_cache.keys(): if not data_uuid in local_cache.keys():
local_cache[data_uuid] = arg local_cache[data_uuid] = arg
log.debug("Adding item to manager's local cache " \
+ "(job: %s): %s - %s" \
% (job_id, arg.uuid, arg.__class__))
if not remote_cache.contains(data_uuid): if not remote_cache.contains(data_uuid):
remote_cache.add(data_uuid, arg) remote_cache.add(data_uuid, arg)
result_args.append(data_uuid) result_args.append(data_uuid)
...@@ -254,6 +274,7 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -254,6 +274,7 @@ def run_in_parallel_remote(func, args_list, unordered=False,
result_args.append(arg) result_args.append(arg)
tasks_queue.put((job_id, func, result_args)) tasks_queue.put((job_id, func, result_args))
stats.add_queueing_time(__task_source_uuid, time.time() - start_time) stats.add_queueing_time(__task_source_uuid, time.time() - start_time)
log.debug("Added %d tasks for job %s" % (len(args_list), job_id))
def job_cleanup(): def job_cleanup():
print stats.get_stats() print stats.get_stats()
for index in range(len(args_list)): for index in range(len(args_list)):
...@@ -265,26 +286,36 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -265,26 +286,36 @@ def run_in_parallel_remote(func, args_list, unordered=False,
yield result yield result
elif result_job_id in __finished_jobs: elif result_job_id in __finished_jobs:
# throw away this result of an old job # throw away this result of an old job
log.debug("Throwing away a result of an old task: %s" % result_job_id)
pass pass
else: else:
log.debug("Skipping result of non-local task: %s" % result_job_id)
# put the result back to the queue for the next manager # put the result back to the queue for the next manager
results_queue.put((result_job_id, result)) results_queue.put((result_job_id, result))
# wait for 0.5 up to 1.5 seconds before trying again # wait for 0.5 up to 1.5 seconds before trying again
time.sleep(0.5 + random.random()) time.sleep(0.5 + random.random())
except GeneratorExit: except GeneratorExit:
log.debug("Parallel processing canceled: %s" % job_id)
# catch this specific (silent) exception and flush the task queue # catch this specific (silent) exception and flush the task queue
queue_len = tasks_queue.qsize() queue_len = tasks_queue.qsize()
# remove all remaining tasks with the current job id # remove all remaining tasks with the current job id
for index in queue_len: removed_job_counter = 0
for index in range(queue_len):
this_job_id, func, args = tasks_queue.get(timeout=0.1) this_job_id, func, args = tasks_queue.get(timeout=0.1)
if this_job_id != job_id: if this_job_id != job_id:
tasks_queue.put((this_job_id, func, args)) tasks_queue.put((this_job_id, func, args))
else:
removed_job_counter += 1
if removed_job_counter > 0:
log.debug("Removed %d remaining tasks for %s" % (removed_job_counter, job_id))
__finished_jobs.append(job_id) __finished_jobs.append(job_id)
# don't keep more than 10 old job ids
while len(__finished_jobs) > 10: while len(__finished_jobs) > 10:
__finished_jobs.pop(0) __finished_jobs.pop(0)
job_cleanup() job_cleanup()
# re-raise the GeneratorExit exception to finish destruction # re-raise the GeneratorExit exception to finish destruction
raise raise
log.debug("Parallel processing finished: %s" % job_id)
job_cleanup() job_cleanup()
else: else:
for args in args_list: for args in args_list:
...@@ -386,6 +417,7 @@ class ProcessDataCache(object): ...@@ -386,6 +417,7 @@ class ProcessDataCache(object):
def add(self, name, value): def add(self, name, value):
if isinstance(name, ProcessDataCacheItemID): if isinstance(name, ProcessDataCacheItemID):
name = name.value name = name.value
log.debug("Added cache item: %s - %s" % (name, type(value)))
self.cache[name] = value self.cache[name] = value
def get(self, name): def get(self, name):
...@@ -396,6 +428,7 @@ class ProcessDataCache(object): ...@@ -396,6 +428,7 @@ class ProcessDataCache(object):
def remove(self, name): def remove(self, name):
if isinstance(name, ProcessDataCacheItemID): if isinstance(name, ProcessDataCacheItemID):
name = name.value name = name.value
log.debug("Removed cache item: %s - %s" % (name, type(value)))
if name in self.cache: if name in self.cache:
del self.cache[name] del self.cache[name]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment