Commit b2c5dc0b authored by sumpfralle's avatar sumpfralle

it is now possible to switch between local and server-based task handling

fixed a minor bug reported by "abrom" - thanks!


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@741 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent 9bde21bb
......@@ -187,7 +187,7 @@ def execute(opts, args, pycam):
sys.exit(0)
else:
pycam.Utils.threading.init_threading(opts.parallel_processes,
remote=opts.remote_server,
enable_server=opts.enable_server, remote=opts.remote_server,
server_credentials=opts.server_authkey)
# initialize the progress bar
......@@ -431,6 +431,9 @@ if __name__ == "__main__":
help="override the default detection of multiple CPU cores. " \
+ "Parallel processing only works with Python 2.6 or " \
+ "later.")
group_general.add_option("", "--enable-server", dest="enable_server",
default=False, action="store_true", help="enable a local server " \
+ "and (optionally) remote worker servers.")
group_general.add_option("", "--remote-server", dest="remote_server",
default=None, action="store", type="string", help="Connect to a " \
+ "remote task server to distribute the processing load. " \
......
......@@ -45,7 +45,7 @@ log = pycam.Utils.log.get_logger()
# We need to use a global function here - otherwise it does not work with
# the multiprocessing Pool.
def _process_one_triangle((triangle,), (obj, z)):
def _process_one_triangle(((triangle,), (obj, z))):
result = []
if id(triangle) in obj._processed_triangles:
# skip triangles that are known to cause no collision
......
......@@ -33,7 +33,8 @@ log = pycam.Utils.log.get_logger()
# We need to use a global function here - otherwise it does not work with
# the multiprocessing Pool.
def _process_one_grid_line((positions,), (minz, maxz, dim_attrs, model, cutter, physics, safety_height)):
def _process_one_grid_line(((positions,), (minz, maxz, dim_attrs, model, cutter,
physics, safety_height))):
# for now only used for triangular collision detection
last_position = None
points = []
......
......@@ -31,7 +31,7 @@ import math
# We need to use a global function here - otherwise it does not work with
# the multiprocessing Pool.
def _process_one_line((p1, p2), (depth, model, cutter, physics)):
def _process_one_line(((p1, p2), (depth, model, cutter, physics))):
if physics:
points = get_free_paths_ode(physics, p1, p2, depth=depth)
else:
......
......@@ -45,13 +45,24 @@ __multiprocessing = None
__num_of_processes = None
__manager = None
__spawner = None
__closing = None
def init_threading(number_of_processes=None, remote=None, run_server=False,
def run_in_parallel(*args, **kwargs):
global __manager
if __manager is None:
return run_in_parallel_local(*args, **kwargs)
else:
return run_in_parallel_remote(*args, **kwargs)
def init_threading(number_of_processes=None, enable_server=False, remote=None, run_server=False,
server_credentials=""):
global __multiprocessing, __num_of_processes, __manager, __spawner, __closing
global __multiprocessing, __num_of_processes, __manager, __closing, run_in_parallel
# only local -> no server settings allowed
if (not enable_server) and (not run_server):
remote = None
run_server = None
server_credentials = ""
try:
import multiprocessing
mp_is_available = True
......@@ -68,18 +79,24 @@ def init_threading(number_of_processes=None, remote=None, run_server=False,
__num_of_processes = multiprocessing.cpu_count()
else:
__multiprocessing = False
elif number_of_processes < 1:
elif (number_of_processes < 1) and (remote is None):
# zero processes are allowed if we use a remote server
__multiprocessing = False
else:
__multiprocessing = multiprocessing
__num_of_processes = number_of_processes
# send the configured state to the logger
if __multiprocessing is False:
log.info("Disabled multi-threading")
else:
log.info("Enabled multi-threading with %d parallel processes" % __num_of_processes)
# initialize the manager
if __multiprocessing:
if not __multiprocessing:
__manager == None
log.info("Disabled parallel processing")
elif not enable_server:
__manager == None
log.info("Enabled %d parallel local processes" % __num_of_processes)
else:
# with multiprocessing
run_in_parallel = run_in_parallel_remote
log.info("Enabled %d parallel local processes" % __num_of_processes)
log.info("Allow remote processing")
# initialize the uuid list for all workers
worker_uuid_list = [str(uuid.uuid1()) for index in range(__num_of_processes)]
if remote is None:
......@@ -94,8 +111,9 @@ def init_threading(number_of_processes=None, remote=None, run_server=False,
+ "port (%d) instead") % (port, DEFAULT_PORT))
port = DEFAULT_PORT
else:
host = remote
port = DEFAULT_PORT
address = (remote, port)
address = (host, port)
from multiprocessing.managers import SyncManager
class TaskManager(SyncManager):
@classmethod
......@@ -128,14 +146,19 @@ def init_threading(number_of_processes=None, remote=None, run_server=False,
log.info("Connected to a remote task server.")
# create the spawning process
__closing = __manager.Value("b", False)
__spawner = __multiprocessing.Process(name="spawn", target=_spawn_daemon,
if __num_of_processes > 0:
# only start the spawner, if we want to use local workers
spawner = __multiprocessing.Process(name="spawn", target=_spawn_daemon,
args=(__manager, __num_of_processes, worker_uuid_list))
__spawner.start()
spawner.start()
else:
spawner = None
# wait forever - in case of a server
if run_server:
log.info("Running a local server and waiting for remote connections.")
# the server can be stopped via CTRL-C - it is caught later
__spawner.join()
if not spawner is None:
spawner.join()
def cleanup():
global __manager, __closing
......@@ -182,7 +205,7 @@ def _handle_tasks(tasks, results, stats, cache):
local_cache[cache_id] = cache.get(cache_id)
stats.add_transfer_time(name, time.time() - start_time)
start_time = time.time()
results.put(func(args, local_cache[cache_id]))
results.put(func((args, local_cache[cache_id])))
stats.add_process_time(name, time.time() - start_time)
except Queue.Empty:
break
......@@ -255,9 +278,6 @@ def run_in_parallel_local(func, args, unordered=False, disable_multiprocessing=F
for arg in args:
yield func(arg)
#run_in_parallel = run_in_parallel_local
run_in_parallel = run_in_parallel_remote
class OneProcess(object):
def __init__(self, name, is_queue=False):
......@@ -314,7 +334,7 @@ class ProcessStatistics(object):
self.processes[name].process_time += amount
def add_queueing_time(self, name, amount):
if not name in self.processes:
if not name in self.queues:
self.queues[name] = OneProcess(name, is_queue=True)
self.queues[name].transfer_count += 1
self.queues[name].transfer_time += amount
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment