Commit 9c04b15c authored by sumpfralle's avatar sumpfralle

fixed cleanup of jobs


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@743 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent 470bf2ee
...@@ -93,7 +93,7 @@ def init_threading(number_of_processes=None, enable_server=False, remote=None, r ...@@ -93,7 +93,7 @@ def init_threading(number_of_processes=None, enable_server=False, remote=None, r
if not __multiprocessing: if not __multiprocessing:
__manager == None __manager == None
log.info("Disabled parallel processing") log.info("Disabled parallel processing")
elif not enable_server: elif not enable_server and not run_server:
__manager == None __manager == None
log.info("Enabled %d parallel local processes" % __num_of_processes) log.info("Enabled %d parallel local processes" % __num_of_processes)
else: else:
...@@ -166,7 +166,7 @@ def init_threading(number_of_processes=None, enable_server=False, remote=None, r ...@@ -166,7 +166,7 @@ def init_threading(number_of_processes=None, enable_server=False, remote=None, r
def cleanup(): def cleanup():
global __manager, __closing global __manager, __closing
if __multiprocessing: if __multiprocessing and __closing:
__closing.set(True) __closing.set(True)
def _spawn_daemon(manager, number_of_processes, worker_uuid_list): def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
...@@ -213,8 +213,6 @@ def _handle_tasks(tasks, results, stats, cache): ...@@ -213,8 +213,6 @@ def _handle_tasks(tasks, results, stats, cache):
stats.add_process_time(name, time.time() - start_time) stats.add_process_time(name, time.time() - start_time)
except Queue.Empty: except Queue.Empty:
break break
#print stats.get_stats()
#print
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
...@@ -244,6 +242,11 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -244,6 +242,11 @@ def run_in_parallel_remote(func, args_list, unordered=False,
cache.add(cache_id, cache_args) cache.add(cache_id, cache_args)
tasks_queue.put((job_id, func, normal_args, cache_id)) tasks_queue.put((job_id, func, normal_args, cache_id))
stats.add_queueing_time(__task_source_uuid, time.time() - start_time) stats.add_queueing_time(__task_source_uuid, time.time() - start_time)
def job_cleanup():
# remove the previously added cached items
for key in previous_cache_values.keys():
cache.remove(key)
print stats.get_stats()
for index in range(len(args_list)): for index in range(len(args_list)):
try: try:
result_job_id = None result_job_id = None
...@@ -253,7 +256,6 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -253,7 +256,6 @@ def run_in_parallel_remote(func, args_list, unordered=False,
yield result yield result
elif result_job_id in __finished_jobs: elif result_job_id in __finished_jobs:
# throw this result of an job away # throw this result of an job away
print "Throwing away an old result: %s" % result_job_id
pass pass
else: else:
# put the result back to the queue for the next manager # put the result back to the queue for the next manager
...@@ -262,16 +264,19 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -262,16 +264,19 @@ def run_in_parallel_remote(func, args_list, unordered=False,
time.sleep(0.5 + random.random()) time.sleep(0.5 + random.random())
except GeneratorExit: except GeneratorExit:
# catch this specific (silent) exception and flush the task queue # catch this specific (silent) exception and flush the task queue
while not tasks_queue.empty(): queue_len = tasks.qsize()
tasks_queue.get(timeout=0.1) # remove all remaining tasks with the current job id
# remove the previously added cached items for index in queue_len:
for key in previous_cache_values.keys(): this_job_id, func, normal_args, cache_id = tasks_queue.get(timeout=0.1)
cache.remove(key) if job_id != this_job_id:
tasks.put((this_job_id, func, normal_args, cache_id))
__finished_jobs.append(job_id) __finished_jobs.append(job_id)
while len(__finished_jobs) > 10: while len(__finished_jobs) > 10:
__finished_jobs.pop(0) __finished_jobs.pop(0)
job_cleanup()
# re-raise the GeneratorExit exception to finish destruction # re-raise the GeneratorExit exception to finish destruction
raise raise
job_cleanup()
else: else:
for args in args_list: for args in args_list:
yield func(args) yield func(args)
...@@ -365,7 +370,6 @@ class ProcessDataCache(object): ...@@ -365,7 +370,6 @@ class ProcessDataCache(object):
self.cache = {} self.cache = {}
def add(self, name, value): def add(self, name, value):
print "New cache item: %s" % str(name)
self.cache[name] = value self.cache[name] = value
def get(self, name): def get(self, name):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment