Commit c0ec5abd authored by sumpfralle's avatar sumpfralle

improved stability of distributed processing:

* a worker thread puts every new task into a "pending" queue immediately after accepting it
* the "pending" check is checked regularly for old items -> they are reinjected into the task queue
* the risk of losing a task is now close to zero


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@949 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent f81bc9a5
...@@ -20,6 +20,7 @@ Version 0.4.1 - UNRELEASED ...@@ -20,6 +20,7 @@ Version 0.4.1 - UNRELEASED
* optional visualization of toolpath direction * optional visualization of toolpath direction
* visibility of 3D view items is now configurable in the 3D window * visibility of 3D view items is now configurable in the 3D window
* via a button and via the context menu * via a button and via the context menu
* improved stability of remote processing: disconnected nodes should not cause problems anymore
Version 0.4.0.1 - 2010-10-24 Version 0.4.0.1 - 2010-10-24
* disabled parallel processing for Windows standalone executable * disabled parallel processing for Windows standalone executable
......
...@@ -118,6 +118,16 @@ def get_pool_statistics(): ...@@ -118,6 +118,16 @@ def get_pool_statistics():
else: else:
return __manager.statistics().get_worker_statistics() return __manager.statistics().get_worker_statistics()
def get_task_statistics():
global __manager
result = {}
if not __manager is None:
result["tasks"] = __manager.tasks().qsize()
result["results"] = __manager.results().qsize()
result["pending"] = __manager.pending_tasks().length()
result["cache"] = __manager.cache().length()
return result
def init_threading(number_of_processes=None, enable_server=False, remote=None, def init_threading(number_of_processes=None, enable_server=False, remote=None,
run_server=False, server_credentials="", local_port=DEFAULT_PORT): run_server=False, server_credentials="", local_port=DEFAULT_PORT):
global __multiprocessing, __num_of_processes, __manager, __closing, __task_source_uuid global __multiprocessing, __num_of_processes, __manager, __closing, __task_source_uuid
...@@ -231,15 +241,18 @@ def init_threading(number_of_processes=None, enable_server=False, remote=None, ...@@ -231,15 +241,18 @@ def init_threading(number_of_processes=None, enable_server=False, remote=None,
results_queue = multiprocessing.Queue() results_queue = multiprocessing.Queue()
statistics = ProcessStatistics() statistics = ProcessStatistics()
cache = ProcessDataCache() cache = ProcessDataCache()
pending_tasks = PendingTasks()
TaskManager.register("tasks", callable=lambda: tasks_queue) TaskManager.register("tasks", callable=lambda: tasks_queue)
TaskManager.register("results", callable=lambda: results_queue) TaskManager.register("results", callable=lambda: results_queue)
TaskManager.register("statistics", callable=lambda: statistics) TaskManager.register("statistics", callable=lambda: statistics)
TaskManager.register("cache", callable=lambda: cache) TaskManager.register("cache", callable=lambda: cache)
TaskManager.register("pending_tasks", callable=lambda: pending_tasks)
else: else:
TaskManager.register("tasks") TaskManager.register("tasks")
TaskManager.register("results") TaskManager.register("results")
TaskManager.register("statistics") TaskManager.register("statistics")
TaskManager.register("cache") TaskManager.register("cache")
TaskManager.register("pending_tasks")
__manager = TaskManager(address=address, authkey=server_credentials) __manager = TaskManager(address=address, authkey=server_credentials)
# run the local server, connect to a remote one or begin serving # run the local server, connect to a remote one or begin serving
try: try:
...@@ -301,6 +314,7 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list): ...@@ -301,6 +314,7 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
results = manager.results() results = manager.results()
stats = manager.statistics() stats = manager.statistics()
cache = manager.cache() cache = manager.cache()
pending_tasks = manager.pending_tasks()
log.debug("Spawner daemon started with %d processes" % number_of_processes) log.debug("Spawner daemon started with %d processes" % number_of_processes)
log.debug("Registering %d worker threads: %s" \ log.debug("Registering %d worker threads: %s" \
% (len(worker_uuid_list), worker_uuid_list)) % (len(worker_uuid_list), worker_uuid_list))
...@@ -320,7 +334,7 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list): ...@@ -320,7 +334,7 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
worker = __multiprocessing.Process( worker = __multiprocessing.Process(
name=task_name, target=_handle_tasks, name=task_name, target=_handle_tasks,
args=(tasks, results, stats, cache, args=(tasks, results, stats, cache,
__closing)) pending_tasks, __closing))
worker.start() worker.start()
workers.append(worker) workers.append(worker)
# wait until all workers are finished # wait until all workers are finished
...@@ -336,7 +350,7 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list): ...@@ -336,7 +350,7 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
# the connection was closed # the connection was closed
log.info("Spawner daemon lost connection to server") log.info("Spawner daemon lost connection to server")
def _handle_tasks(tasks, results, stats, cache, closing): def _handle_tasks(tasks, results, stats, cache, pending_tasks, closing):
global __multiprocessing global __multiprocessing
name = __multiprocessing.current_process().name name = __multiprocessing.current_process().name
local_cache = ProcessDataCache() local_cache = ProcessDataCache()
...@@ -349,52 +363,59 @@ def _handle_tasks(tasks, results, stats, cache, closing): ...@@ -349,52 +363,59 @@ def _handle_tasks(tasks, results, stats, cache, closing):
if last_worker_notification + 30 < time.time(): if last_worker_notification + 30 < time.time():
stats.worker_notification(name) stats.worker_notification(name)
last_worker_notification = time.time() last_worker_notification = time.time()
start_time = time.time()
try: try:
start_time = time.time() job_id, task_id, func, args = tasks.get(timeout=0.2)
job_id, task_id, func, args = tasks.get(timeout=1.0)
# reset the timeout counter, if we found another item in the queue
timeout_counter = 0
real_args = []
for arg in args:
if isinstance(arg, ProcessDataCacheItemID):
try:
value = local_cache.get(arg)
except KeyError:
# TODO: we will break hard, if the item is expired
value = cache.get(arg)
local_cache.add(arg, value)
real_args.append(value)
elif isinstance(arg, list) and [True for item in arg \
if isinstance(item, ProcessDataCacheItemID)]:
# check if any item in the list is cacheable
args_list = []
for item in arg:
if isinstance(item, ProcessDataCacheItemID):
try:
value = local_cache.get(item)
except KeyError:
value = cache.get(item)
local_cache.add(item, value)
args_list.append(value)
else:
args_list.append(item)
real_args.append(args_list)
else:
real_args.append(arg)
stats.add_transfer_time(name, time.time() - start_time)
start_time = time.time()
results.put((job_id, task_id, func(real_args)))
stats.add_process_time(name, time.time() - start_time)
except Queue.Empty: except Queue.Empty:
time.sleep(1.0) time.sleep(1.8)
timeout_counter += 1 timeout_counter += 1
continue
# TODO: if the client aborts/disconnects between "tasks.get" and
# "pending_tasks.add", the task is lost. We should better use some
# backup.
pending_tasks.add(job_id, task_id, (func, args))
log.debug("Worker %s processes %s / %s" % (name, job_id, task_id))
# reset the timeout counter, if we found another item in the queue
timeout_counter = 0
real_args = []
for arg in args:
if isinstance(arg, ProcessDataCacheItemID):
try:
value = local_cache.get(arg)
except KeyError:
# TODO: we will break hard, if the item is expired
value = cache.get(arg)
local_cache.add(arg, value)
real_args.append(value)
elif isinstance(arg, list) and [True for item in arg \
if isinstance(item, ProcessDataCacheItemID)]:
# check if any item in the list is cacheable
args_list = []
for item in arg:
if isinstance(item, ProcessDataCacheItemID):
try:
value = local_cache.get(item)
except KeyError:
value = cache.get(item)
local_cache.add(item, value)
args_list.append(value)
else:
args_list.append(item)
real_args.append(args_list)
else:
real_args.append(arg)
stats.add_transfer_time(name, time.time() - start_time)
start_time = time.time()
results.put((job_id, task_id, func(real_args)))
pending_tasks.remove(job_id, task_id)
stats.add_process_time(name, time.time() - start_time)
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
log.debug("Worker thread finished after %d seconds of inactivity: %s" \ log.debug("Worker thread finished after %d seconds of inactivity: %s" \
% (timeout_counter, name)) % (timeout_counter, name))
def run_in_parallel_remote(func, args_list, unordered=False, def run_in_parallel_remote(func, args_list, unordered=False,
disable_multiprocessing=False): disable_multiprocessing=False, callback=None):
global __multiprocessing, __num_of_processes, __manager, __task_source_uuid, __finished_jobs global __multiprocessing, __num_of_processes, __manager, __task_source_uuid, __finished_jobs
if __multiprocessing is None: if __multiprocessing is None:
# threading was not configured before # threading was not configured before
...@@ -406,7 +427,11 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -406,7 +427,11 @@ def run_in_parallel_remote(func, args_list, unordered=False,
results_queue = __manager.results() results_queue = __manager.results()
remote_cache = __manager.cache() remote_cache = __manager.cache()
stats = __manager.statistics() stats = __manager.statistics()
pending_tasks = __manager.pending_tasks()
# add all tasks of this job to the queue
for index, args in enumerate(args_list): for index, args in enumerate(args_list):
if callback:
callback()
start_time = time.time() start_time = time.time()
result_args = [] result_args = []
for arg in args: for arg in args:
...@@ -417,21 +442,22 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -417,21 +442,22 @@ def run_in_parallel_remote(func, args_list, unordered=False,
log.debug("Adding cache item for job %s: %s - %s" % (job_id, arg.uuid, arg.__class__)) log.debug("Adding cache item for job %s: %s - %s" % (job_id, arg.uuid, arg.__class__))
remote_cache.add(data_uuid, arg) remote_cache.add(data_uuid, arg)
result_args.append(data_uuid) result_args.append(data_uuid)
elif isinstance(arg, (list, set, tuple)) \ elif isinstance(arg, (list, set, tuple)):
and ([True for item in arg if hasattr(item, "uuid")]): # a list with - maybe containing cacheable items
# a list with at least one cacheable item
new_arg_list = [] new_arg_list = []
for item in arg: for item in arg:
if hasattr(item, "uuid"): try:
data_uuid = ProcessDataCacheItemID(item.uuid) data_uuid = ProcessDataCacheItemID(item.uuid)
if not remote_cache.contains(data_uuid): except AttributeError:
log.debug("Adding cache item from list for " \ # non-cacheable item
+ "job %s: %s - %s" \
% (job_id, item.uuid, item.__class__))
remote_cache.add(data_uuid, item)
new_arg_list.append(data_uuid)
else:
new_arg_list.append(item) new_arg_list.append(item)
continue
if not remote_cache.contains(data_uuid):
log.debug("Adding cache item from list for " \
+ "job %s: %s - %s" \
% (job_id, item.uuid, item.__class__))
remote_cache.add(data_uuid, item)
new_arg_list.append(data_uuid)
result_args.append(new_arg_list) result_args.append(new_arg_list)
else: else:
result_args.append(arg) result_args.append(arg)
...@@ -440,63 +466,112 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -440,63 +466,112 @@ def run_in_parallel_remote(func, args_list, unordered=False,
log.debug("Added %d tasks for job %s" % (len(args_list), job_id)) log.debug("Added %d tasks for job %s" % (len(args_list), job_id))
result_buffer = {} result_buffer = {}
index = 0 index = 0
while index < len(args_list): cancelled = False
# wait for all results of this job
while (index < len(args_list)) and not cancelled:
if callback and callback():
# cancel requested
cancelled = True
break
# re-inject stale tasks if necessary
stale_task = pending_tasks.get_stale_task()
if stale_task:
stale_job_id, stale_task_id = stale_task[:2]
if stale_job_id in __finished_jobs:
log.debug("Throwing away stale task of an old " + \
"job: %s" % stale_job_id)
pending_tasks.remove(stale_job_id, stale_task_id)
elif stale_job_id == job_id:
log.debug("Reinjecting stale task: %s / %s" % \
(job_id, stale_task_id))
stale_func, stale_args = stale_task[2]
tasks_queue.put((job_id, stale_task_id, stale_func,
stale_args))
pending_tasks.remove(job_id, stale_task_id)
else:
# non-local task
log.debug("Ignoring stale non-local task: %s / %s" \
% (stale_job_id, stale_task_id))
try: try:
result_job_id = None result_job_id, task_id, result = results_queue.get(
while result_job_id != job_id: timeout=1.0)
result_job_id, task_id, result = results_queue.get() except Queue.Empty:
if result_job_id == job_id: time.sleep(1.0)
if unordered: continue
# just return the values in any order if result_job_id == job_id:
log.debug("Received the result of a task: %s / %s" % \
(job_id, task_id))
try:
if unordered:
# just return the values in any order
yield result
index += 1
else:
# return the results in order (based on task_id)
if task_id == index:
yield result yield result
index += 1 index += 1
else: while index in result_buffer.keys():
# return the results in order (based on task_id) yield result_buffer[index]
if task_id == index: del result_buffer[index]
yield result
index += 1 index += 1
while index in result_buffer.keys(): else:
yield result_buffer[index] result_buffer[task_id] = result
del result_buffer[index] except GeneratorExit:
index += 1 # This exception is triggered when the caller stops
else: # requesting more items from the generator.
result_buffer[task_id] = result log.debug("Parallel processing cancelled: %s" % job_id)
elif result_job_id in __finished_jobs: _cleanup_job(job_id, tasks_queue, pending_tasks,
# throw away this result of an old job __finished_jobs)
log.debug("Throwing away a result of an old task: %s" % result_job_id) # re-raise the GeneratorExit exception to finish destruction
pass raise
else: elif result_job_id in __finished_jobs:
log.debug("Skipping result of non-local task: %s" % result_job_id) # throw away this result of an old job
# put the result back to the queue for the next manager log.debug("Throwing away one result of an old job: %s" % \
results_queue.put((result_job_id, task_id, result)) result_job_id)
# wait for up to 0.2 seconds before trying again else:
time.sleep(random.random() / 5) log.debug("Skipping result of non-local job: %s" % \
except GeneratorExit: result_job_id)
log.debug("Parallel processing canceled: %s" % job_id) # put the result back to the queue for the next manager
# catch this specific (silent) exception and flush the task queue results_queue.put((result_job_id, task_id, result))
queue_len = tasks_queue.qsize() # wait a little bit to get some idle CPU cycles
# remove all remaining tasks with the current job id time.sleep(0.2)
removed_job_counter = 0 _cleanup_job(job_id, tasks_queue, pending_tasks, __finished_jobs)
for index in range(queue_len): if cancelled:
this_job_id, task_id, func, args = tasks_queue.get(timeout=0.1) log.debug("Parallel processing cancelled: %s" % job_id)
if this_job_id != job_id: else:
tasks_queue.put((this_job_id, task_id, func, args)) log.debug("Parallel processing finished: %s" % job_id)
else:
removed_job_counter += 1
if removed_job_counter > 0:
log.debug("Removed %d remaining tasks for %s" % (removed_job_counter, job_id))
__finished_jobs.append(job_id)
# don't keep more than 10 old job ids
while len(__finished_jobs) > 10:
__finished_jobs.pop(0)
# re-raise the GeneratorExit exception to finish destruction
raise
log.debug("Parallel processing finished: %s" % job_id)
else: else:
for args in args_list: for args in args_list:
yield func(args) yield func(args)
def run_in_parallel_local(func, args, unordered=False, disable_multiprocessing=False): def _cleanup_job(job_id, tasks_queue, pending_tasks, finished_jobs):
# flush the task queue
queue_len = tasks_queue.qsize()
# remove all remaining tasks with the current job id
removed_job_counter = 0
for index in range(queue_len):
try:
this_job_id, task_id, func, args = tasks_queue.get(timeout=0.1)
except Queue.Empty:
break
if this_job_id != job_id:
tasks_queue.put((this_job_id, task_id, func, args))
else:
removed_job_counter += 1
if removed_job_counter > 0:
log.debug("Removed %d remaining tasks for %s" % (removed_job_counter,
job_id))
# remove all stale tasks
pending_tasks.remove(job_id)
# limit the number of stored finished jobs
finished_jobs.append(job_id)
while len(finished_jobs) > 30:
finished_jobs.pop(0)
def run_in_parallel_local(func, args, unordered=False,
disable_multiprocessing=False, callback=None):
global __multiprocessing, __num_of_processes global __multiprocessing, __num_of_processes
if __multiprocessing is None: if __multiprocessing is None:
# threading was not configured before # threading was not configured before
...@@ -512,9 +587,15 @@ def run_in_parallel_local(func, args, unordered=False, disable_multiprocessing=F ...@@ -512,9 +587,15 @@ def run_in_parallel_local(func, args, unordered=False, disable_multiprocessing=F
# directly. It would somehow loose the focus and just hang infinitely. # directly. It would somehow loose the focus and just hang infinitely.
# Thus we wrap our own generator around it. # Thus we wrap our own generator around it.
for result in imap_func(func, args): for result in imap_func(func, args):
if callback and callback():
# cancel requested
break
yield result yield result
else: else:
for arg in args: for arg in args:
if callback and callback():
# cancel requested
break
yield func(arg) yield func(arg)
...@@ -620,6 +701,57 @@ class ProcessStatistics(object): ...@@ -620,6 +701,57 @@ class ProcessStatistics(object):
return result return result
class PendingTasks(object):
def __init__(self, stale_timeout=300):
# we assume that multiprocessing was imported before
import multiprocessing
self._lock = multiprocessing.Lock()
self._jobs = {}
self._stale_timeout = stale_timeout
# necessary in case of a lost connection
self._lock_timeout = 3
def add(self, job_id, task_id, info):
# no acquire and release: be as quick as possible (avoid lost tasks)
self._jobs[(job_id, task_id)] = (time.time(), info)
def remove(self, job_id, task_id=None):
self._lock.acquire(block=True, timeout=self._lock_timeout)
if task_id is None:
# remove all tasks of this job
remove_keys = []
for key in self._jobs:
if key[0] == job_id:
remove_keys.append(key)
for key in remove_keys:
del self._jobs[key]
else:
# remove only a specific task
if (job_id, task_id) in self._jobs:
del self._jobs[(job_id, task_id)]
self._lock.release()
def get_stale_task(self):
self._lock.acquire(block=True, timeout=self._lock_timeout)
stale_start_time = time.time() - self._stale_timeout
stale_tasks = []
for (job_id, task_id), (start_time, info) in self._jobs.iteritems():
if start_time < stale_start_time:
stale_tasks.append((job_id, task_id, info))
if stale_tasks:
# pick a random task - otherwise some old tasks stop everything
result_index = random.randrange(0, len(stale_tasks))
result = stale_tasks[result_index]
else:
result = None
self._lock.release()
return result
def length(self):
return len(self._jobs)
class ProcessDataCache(object): class ProcessDataCache(object):
def __init__(self, timeout=600): def __init__(self, timeout=600):
...@@ -667,6 +799,9 @@ class ProcessDataCache(object): ...@@ -667,6 +799,9 @@ class ProcessDataCache(object):
self.expire_cache_items() self.expire_cache_items()
return self.cache[name][0] return self.cache[name][0]
def length(self):
return len(self.cache)
class ProcessDataCacheItemID(object): class ProcessDataCacheItemID(object):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment