Commit 17e0e4b6 authored by sumpfralle's avatar sumpfralle

cleaner handling of connection loss

implemented ordered/unordered task result delivery


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@748 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent 6ed95bf7
...@@ -83,8 +83,8 @@ def init_threading(number_of_processes=None, enable_server=False, remote=None, r ...@@ -83,8 +83,8 @@ def init_threading(number_of_processes=None, enable_server=False, remote=None, r
__num_of_processes = multiprocessing.cpu_count() __num_of_processes = multiprocessing.cpu_count()
else: else:
__multiprocessing = False __multiprocessing = False
elif (number_of_processes < 1) and (remote is None): elif (number_of_processes < 1) and (remote is None) and (enable_server is None):
# zero processes are allowed if we use a remote server # zero processes are allowed if we use a remote server or offer a server
__multiprocessing = False __multiprocessing = False
else: else:
__multiprocessing = multiprocessing __multiprocessing = multiprocessing
...@@ -168,7 +168,10 @@ def cleanup(): ...@@ -168,7 +168,10 @@ def cleanup():
global __manager, __closing global __manager, __closing
if __multiprocessing and __closing: if __multiprocessing and __closing:
log.debug("Shutting down process handler") log.debug("Shutting down process handler")
__closing.set(True) try:
__closing.set(True)
except EOFError:
log.debug("Connection to manager lost during cleanup")
# Only managers that were started via ".start()" implement a "shutdown". # Only managers that were started via ".start()" implement a "shutdown".
# Managers started via ".connect" may skip this. # Managers started via ".connect" may skip this.
if hasattr(__manager, "shutdown"): if hasattr(__manager, "shutdown"):
...@@ -204,9 +207,12 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list): ...@@ -204,9 +207,12 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
else: else:
time.sleep(0.2) time.sleep(0.2)
except KeyboardInterrupt: except KeyboardInterrupt:
log.debug("Spawner daemon killed by keyboard interrupt") log.info("Spawner daemon killed by keyboard interrupt")
# set the "closing" flag and just exit # set the "closing" flag and just exit
__closing.set(True) __closing.set(True)
except EOFError:
# the connection was closed
log.info("Spawner daemon lost connection to server")
def _handle_tasks(tasks, results, stats, cache, closing): def _handle_tasks(tasks, results, stats, cache, closing):
global __multiprocessing global __multiprocessing
...@@ -219,7 +225,7 @@ def _handle_tasks(tasks, results, stats, cache, closing): ...@@ -219,7 +225,7 @@ def _handle_tasks(tasks, results, stats, cache, closing):
while (timeout_counter < timeout_limit) and not closing.get(): while (timeout_counter < timeout_limit) and not closing.get():
try: try:
start_time = time.time() start_time = time.time()
job_id, func, args = tasks.get(timeout=1.0) job_id, task_id, func, args = tasks.get(timeout=1.0)
real_args = [] real_args = []
for arg in args: for arg in args:
if isinstance(arg, ProcessDataCacheItemID): if isinstance(arg, ProcessDataCacheItemID):
...@@ -231,7 +237,7 @@ def _handle_tasks(tasks, results, stats, cache, closing): ...@@ -231,7 +237,7 @@ def _handle_tasks(tasks, results, stats, cache, closing):
real_args.append(arg) real_args.append(arg)
stats.add_transfer_time(name, time.time() - start_time) stats.add_transfer_time(name, time.time() - start_time)
start_time = time.time() start_time = time.time()
results.put((job_id, func(*real_args))) results.put((job_id, task_id, func(*real_args)))
stats.add_process_time(name, time.time() - start_time) stats.add_process_time(name, time.time() - start_time)
except Queue.Empty: except Queue.Empty:
time.sleep(1.0) time.sleep(1.0)
...@@ -254,7 +260,7 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -254,7 +260,7 @@ def run_in_parallel_remote(func, args_list, unordered=False,
results_queue = __manager.results() results_queue = __manager.results()
remote_cache = __manager.cache() remote_cache = __manager.cache()
stats = __manager.statistics() stats = __manager.statistics()
for args in args_list: for index, args in enumerate(args_list):
start_time = time.time() start_time = time.time()
result_args = [] result_args = []
for arg in args: for arg in args:
...@@ -267,18 +273,34 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -267,18 +273,34 @@ def run_in_parallel_remote(func, args_list, unordered=False,
result_args.append(data_uuid) result_args.append(data_uuid)
else: else:
result_args.append(arg) result_args.append(arg)
tasks_queue.put((job_id, func, result_args)) tasks_queue.put((job_id, index, func, result_args))
stats.add_queueing_time(__task_source_uuid, time.time() - start_time) stats.add_queueing_time(__task_source_uuid, time.time() - start_time)
log.debug("Added %d tasks for job %s" % (len(args_list), job_id)) log.debug("Added %d tasks for job %s" % (len(args_list), job_id))
def job_cleanup(): def job_cleanup():
print stats.get_stats() print stats.get_stats()
for index in range(len(args_list)): result_buffer = {}
index = 0
while index < len(args_list):
try: try:
result_job_id = None result_job_id = None
while result_job_id != job_id: while result_job_id != job_id:
result_job_id, result = results_queue.get() result_job_id, task_id, result = results_queue.get()
if result_job_id == job_id: if result_job_id == job_id:
yield result if unordered:
# just return the values in any order
yield result
index += 1
else:
# return the results in order (based on task_id)
if task_id == index:
yield result
index += 1
while index in result_buffer.keys():
yield result_buffer[index]
del result_buffer[index]
index += 1
else:
result_buffer[task_id] = result
elif result_job_id in __finished_jobs: elif result_job_id in __finished_jobs:
# throw away this result of an old job # throw away this result of an old job
log.debug("Throwing away a result of an old task: %s" % result_job_id) log.debug("Throwing away a result of an old task: %s" % result_job_id)
...@@ -296,9 +318,9 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -296,9 +318,9 @@ def run_in_parallel_remote(func, args_list, unordered=False,
# remove all remaining tasks with the current job id # remove all remaining tasks with the current job id
removed_job_counter = 0 removed_job_counter = 0
for index in range(queue_len): for index in range(queue_len):
this_job_id, func, args = tasks_queue.get(timeout=0.1) this_job_id, task_id, func, args = tasks_queue.get(timeout=0.1)
if this_job_id != job_id: if this_job_id != job_id:
tasks_queue.put((this_job_id, func, args)) tasks_queue.put((this_job_id, task_id, func, args))
else: else:
removed_job_counter += 1 removed_job_counter += 1
if removed_job_counter > 0: if removed_job_counter > 0:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment