Commit 470bf2ee authored by sumpfralle's avatar sumpfralle

use a persistent id for each manager

use job IDs to distinguish between remaining cruft from old tasks and results of the current task


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@742 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent b2c5dc0b
...@@ -25,6 +25,7 @@ import pycam.Utils.log ...@@ -25,6 +25,7 @@ import pycam.Utils.log
#import multiprocessing #import multiprocessing
#from multiprocessing.managers import SyncManager #from multiprocessing.managers import SyncManager
import Queue import Queue
import random
import uuid import uuid
import time import time
import os import os
...@@ -34,6 +35,7 @@ DEFAULT_PORT = 1250 ...@@ -34,6 +35,7 @@ DEFAULT_PORT = 1250
log = pycam.Utils.log.get_logger() log = pycam.Utils.log.get_logger()
#TODO: create one or two classes for these functions (to get rid of the globals)
# possible values: # possible values:
# None: not initialized # None: not initialized
...@@ -46,6 +48,8 @@ __num_of_processes = None ...@@ -46,6 +48,8 @@ __num_of_processes = None
__manager = None __manager = None
__closing = None __closing = None
__task_source_uuid = None
__finished_jobs = []
def run_in_parallel(*args, **kwargs): def run_in_parallel(*args, **kwargs):
...@@ -57,7 +61,7 @@ def run_in_parallel(*args, **kwargs): ...@@ -57,7 +61,7 @@ def run_in_parallel(*args, **kwargs):
def init_threading(number_of_processes=None, enable_server=False, remote=None, run_server=False, def init_threading(number_of_processes=None, enable_server=False, remote=None, run_server=False,
server_credentials=""): server_credentials=""):
global __multiprocessing, __num_of_processes, __manager, __closing, run_in_parallel global __multiprocessing, __num_of_processes, __manager, __closing, __task_source_uuid
# only local -> no server settings allowed # only local -> no server settings allowed
if (not enable_server) and (not run_server): if (not enable_server) and (not run_server):
remote = None remote = None
...@@ -94,11 +98,11 @@ def init_threading(number_of_processes=None, enable_server=False, remote=None, r ...@@ -94,11 +98,11 @@ def init_threading(number_of_processes=None, enable_server=False, remote=None, r
log.info("Enabled %d parallel local processes" % __num_of_processes) log.info("Enabled %d parallel local processes" % __num_of_processes)
else: else:
# with multiprocessing # with multiprocessing
run_in_parallel = run_in_parallel_remote
log.info("Enabled %d parallel local processes" % __num_of_processes) log.info("Enabled %d parallel local processes" % __num_of_processes)
log.info("Allow remote processing") log.info("Allow remote processing")
# initialize the uuid list for all workers # initialize the uuid list for all workers
worker_uuid_list = [str(uuid.uuid1()) for index in range(__num_of_processes)] worker_uuid_list = [str(uuid.uuid1()) for index in range(__num_of_processes)]
__task_source_uuid = str(uuid.uuid1())
if remote is None: if remote is None:
address = ('', DEFAULT_PORT) address = ('', DEFAULT_PORT)
else: else:
...@@ -200,23 +204,23 @@ def _handle_tasks(tasks, results, stats, cache): ...@@ -200,23 +204,23 @@ def _handle_tasks(tasks, results, stats, cache):
while not tasks.empty(): while not tasks.empty():
try: try:
start_time = time.time() start_time = time.time()
func, args, cache_id = tasks.get(timeout=1.0) job_id, func, args, cache_id = tasks.get(timeout=1.0)
if not cache_id in local_cache.keys(): if not cache_id in local_cache.keys():
local_cache[cache_id] = cache.get(cache_id) local_cache[cache_id] = cache.get(cache_id)
stats.add_transfer_time(name, time.time() - start_time) stats.add_transfer_time(name, time.time() - start_time)
start_time = time.time() start_time = time.time()
results.put(func((args, local_cache[cache_id]))) results.put((job_id, func((args, local_cache[cache_id]))))
stats.add_process_time(name, time.time() - start_time) stats.add_process_time(name, time.time() - start_time)
except Queue.Empty: except Queue.Empty:
break break
print stats.get_stats() #print stats.get_stats()
print #print
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
def run_in_parallel_remote(func, args_list, unordered=False, def run_in_parallel_remote(func, args_list, unordered=False,
disable_multiprocessing=False, host=None): disable_multiprocessing=False, host=None):
global __multiprocessing, __num_of_processes, __manager global __multiprocessing, __num_of_processes, __manager, __task_source_uuid, __finished_jobs
if __multiprocessing is None: if __multiprocessing is None:
# threading was not configured before # threading was not configured before
init_threading() init_threading()
...@@ -225,9 +229,8 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -225,9 +229,8 @@ def run_in_parallel_remote(func, args_list, unordered=False,
results_queue = __manager.results() results_queue = __manager.results()
cache = __manager.cache() cache = __manager.cache()
stats = __manager.statistics() stats = __manager.statistics()
# TODO: make this queue_id persistent for one manager
queue_id = str(uuid.uuid1())
previous_cache_values = {} previous_cache_values = {}
job_id = str(uuid.uuid1())
for args in args_list: for args in args_list:
normal_args, cache_args = args normal_args, cache_args = args
start_time = time.time() start_time = time.time()
...@@ -239,11 +242,24 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -239,11 +242,24 @@ def run_in_parallel_remote(func, args_list, unordered=False,
cache_id = str(uuid.uuid4()) cache_id = str(uuid.uuid4())
previous_cache_values[cache_id] = cache_args previous_cache_values[cache_id] = cache_args
cache.add(cache_id, cache_args) cache.add(cache_id, cache_args)
tasks_queue.put((func, normal_args, cache_id)) tasks_queue.put((job_id, func, normal_args, cache_id))
stats.add_queueing_time(queue_id, time.time() - start_time) stats.add_queueing_time(__task_source_uuid, time.time() - start_time)
for index in range(len(args_list)): for index in range(len(args_list)):
try: try:
yield results_queue.get() result_job_id = None
while result_job_id != job_id:
result_job_id, result = results_queue.get()
if result_job_id == job_id:
yield result
elif result_job_id in __finished_jobs:
# throw this result of an job away
print "Throwing away an old result: %s" % result_job_id
pass
else:
# put the result back to the queue for the next manager
results_queue.put((result_job_id, result))
# wait for 0.5 up to 1.5 seconds before trying again
time.sleep(0.5 + random.random())
except GeneratorExit: except GeneratorExit:
# catch this specific (silent) exception and flush the task queue # catch this specific (silent) exception and flush the task queue
while not tasks_queue.empty(): while not tasks_queue.empty():
...@@ -251,6 +267,9 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -251,6 +267,9 @@ def run_in_parallel_remote(func, args_list, unordered=False,
# remove the previously added cached items # remove the previously added cached items
for key in previous_cache_values.keys(): for key in previous_cache_values.keys():
cache.remove(key) cache.remove(key)
__finished_jobs.append(job_id)
while len(__finished_jobs) > 10:
__finished_jobs.pop(0)
# re-raise the GeneratorExit exception to finish destruction # re-raise the GeneratorExit exception to finish destruction
raise raise
else: else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment