Commit 374a893e authored by sumpfralle's avatar sumpfralle

added a data cache for an improved performance of the task manager


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@739 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent 0c5b264a
Version 0.3.1 - UNRELEASED
* use multiple threads for some toolpath strategies
* use multiple processes for some toolpath strategies
* this requires at least Python 2.6 or the "python-multiprocessing" package
* added an improved contour toolpath strategy (ContourFollow)
* see http://fab.senselab.org/node/43
* added options for conventional/climb milling
* added automatic support grid positioning for contour models
* allow to reverse the direction of a 2D contour model
* beautification of the process name (via python-setproctitle)
* added an experiemental remote-processing handler
* Usability:
* added optional "orthogonal" view (instead of perspective)
* added a "recent files" item to the file menu
......
......@@ -45,7 +45,7 @@ log = pycam.Utils.log.get_logger()
# We need to use a global function here - otherwise it does not work with
# the multiprocessing Pool.
def _process_one_triangle((obj, triangle, z)):
def _process_one_triangle((triangle,), (obj, z)):
result = []
if id(triangle) in obj._processed_triangles:
# skip triangles that are known to cause no collision
......@@ -287,7 +287,7 @@ class ContourFollow:
triangles = self.model.triangles(minx=minx, miny=miny, maxx=maxx,
maxy=maxy)
results_iter = run_in_parallel(_process_one_triangle,
[(self, t, z) for t in triangles], unordered=True)
[((t,), (self, z)) for t in triangles], unordered=True)
for result in results_iter:
for edge, shifted_edge in result:
waterline_triangles.add(edge, shifted_edge)
......
......@@ -33,7 +33,7 @@ log = pycam.Utils.log.get_logger()
# We need to use a global function here - otherwise it does not work with
# the multiprocessing Pool.
def _process_one_grid_line((positions, minz, maxz, dim_attrs, model, cutter, physics, safety_height)):
def _process_one_grid_line((positions,), (minz, maxz, dim_attrs, model, cutter, physics, safety_height)):
# for now only used for triangular collision detection
last_position = None
points = []
......@@ -129,8 +129,8 @@ class DropCutter:
args = []
for one_grid_line in grid:
args.append((one_grid_line, minz, maxz, dim_attrs, self.model,
self.cutter, self.physics, self.model.maxz))
args.append(((one_grid_line,), (minz, maxz, dim_attrs, self.model,
self.cutter, self.physics, self.model.maxz)))
# ODE does not work with multi-threading
disable_multiprocessing = not self.physics is None
for points, height_exceeded in run_in_parallel(_process_one_grid_line,
......
......@@ -31,7 +31,7 @@ import math
# We need to use a global function here - otherwise it does not work with
# the multiprocessing Pool.
def _process_one_line((p1, p2, depth, model, cutter, physics)):
def _process_one_line((p1, p2), (depth, model, cutter, physics)):
if physics:
points = get_free_paths_ode(physics, p1, p2, depth=depth)
else:
......@@ -135,7 +135,7 @@ class PushCutter:
p1, p2 = Point(x, miny, z), Point(x, maxy, z)
else:
p1, p2 = Point(minx, y, z), Point(maxx, y, z)
args.append((p1, p2, depth, self.model, self.cutter, self.physics))
args.append(((p1, p2), (depth, self.model, self.cutter, self.physics)))
# ODE does not work with multi-threading
disable_multiprocessing = not self.physics is None
......
......@@ -108,13 +108,16 @@ def init_threading(number_of_processes=None, remote=None, run_server=False,
tasks_queue = multiprocessing.Queue()
results_queue = multiprocessing.Queue()
statistics = ProcessStatistics()
cache = ProcessDataCache()
TaskManager.register("tasks", callable=lambda: tasks_queue)
TaskManager.register("results", callable=lambda: results_queue)
TaskManager.register("statistics", callable=lambda: statistics)
TaskManager.register("cache", callable=lambda: cache)
else:
TaskManager.register("tasks")
TaskManager.register("results")
TaskManager.register("statistics")
TaskManager.register("cache")
__manager = TaskManager(address=address, authkey=server_credentials)
# run the local server, connect to a remote one or begin serving
if remote is None:
......@@ -146,6 +149,7 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
tasks = manager.tasks()
results = manager.results()
stats = manager.statistics()
cache = manager.cache()
try:
while not __closing.get():
if not tasks.empty():
......@@ -153,7 +157,7 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
for task_id in worker_uuid_list:
worker = __multiprocessing.Process(
name="task-%s" % str(task_id), target=_handle_tasks,
args=(tasks, results, stats))
args=(tasks, results, stats, cache))
worker.start()
workers.append(worker)
# wait until all workers are finished
......@@ -165,17 +169,20 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
# set the "closing" flag and just exit
__closing.set(True)
def _handle_tasks(tasks, results, stats):
def _handle_tasks(tasks, results, stats, cache):
global __multiprocessing
name = __multiprocessing.current_process().name
local_cache = {}
try:
while not tasks.empty():
try:
start_time = time.time()
func, args = tasks.get(timeout=1.0)
func, args, cache_id = tasks.get(timeout=1.0)
if not cache_id in local_cache.keys():
local_cache[cache_id] = cache.get(cache_id)
stats.add_transfer_time(name, time.time() - start_time)
start_time = time.time()
results.put(func(args))
results.put(func(args, local_cache[cache_id]))
stats.add_process_time(name, time.time() - start_time)
except Queue.Empty:
break
......@@ -193,8 +200,24 @@ def run_in_parallel_remote(func, args_list, unordered=False,
if __multiprocessing and not disable_multiprocessing:
tasks_queue = __manager.tasks()
results_queue = __manager.results()
cache = __manager.cache()
stats = __manager.statistics()
# TODO: make this queue_id persistent for one manager
queue_id = str(uuid.uuid1())
previous_cache_values = {}
for args in args_list:
tasks_queue.put((func, args))
normal_args, cache_args = args
start_time = time.time()
for key, value in previous_cache_values.iteritems():
if cache_args == value:
cache_id = key
break
else:
cache_id = str(uuid.uuid4())
previous_cache_values[cache_id] = cache_args
cache.add(cache_id, cache_args)
tasks_queue.put((func, normal_args, cache_id))
stats.add_queueing_time(queue_id, time.time() - start_time)
for index in range(len(args_list)):
try:
yield results_queue.get()
......@@ -202,6 +225,9 @@ def run_in_parallel_remote(func, args_list, unordered=False,
# catch this specific (silent) exception and flush the task queue
while not tasks_queue.empty():
tasks_queue.get(timeout=0.1)
# remove the previously added cached items
for key in previous_cache_values.keys():
cache.remove(key)
# re-raise the GeneratorExit exception to finish destruction
raise
else:
......@@ -234,7 +260,8 @@ run_in_parallel = run_in_parallel_remote
class OneProcess(object):
def __init__(self, name):
def __init__(self, name, is_queue=False):
self.is_queue = is_queue
self.name = name
self.transfer_time = 0
self.transfer_count = 0
......@@ -243,24 +270,33 @@ class OneProcess(object):
def __str__(self):
try:
return "Process %s: %s (%s/%s) - %s (%s/%s)" \
if self.is_queue:
return "Queue %s: %s (%s/%s)" \
% (self.name, self.transfer_time/self.transfer_count,
self.transfer_time, self.transfer_count,
self.process_time/self.process_count,
self.process_time, self.process_count)
self.transfer_time, self.transfer_count)
else:
return "Process %s: %s (%s/%s) - %s (%s/%s)" \
% (self.name, self.transfer_time/self.transfer_count,
self.transfer_time, self.transfer_count,
self.process_time/self.process_count,
self.process_time, self.process_count)
except ZeroDivisionError:
# race condition between adding new objects and output
return "Process %s"
if self.is_queue:
return "Queue %s: not ready" % str(self.name)
else:
return "Process %s: not ready" % str(self.name)
class ProcessStatistics(object):
def __init__(self):
self.processes = {}
self.queues = {}
def __str__(self):
return os.linesep.join([str(process)
for process in self.processes.values()])
return os.linesep.join([str(item)
for item in self.processes.values() + self.queues.values()])
def get_stats(self):
return str(self)
......@@ -273,7 +309,30 @@ class ProcessStatistics(object):
def add_process_time(self, name, amount):
if not name in self.processes:
self.processes[name] = OneProcess()
self.processes[name] = OneProcess(name)
self.processes[name].process_count += 1
self.processes[name].process_time += amount
def add_queueing_time(self, name, amount):
if not name in self.processes:
self.queues[name] = OneProcess(name, is_queue=True)
self.queues[name].transfer_count += 1
self.queues[name].transfer_time += amount
class ProcessDataCache(object):
def __init__(self):
self.cache = {}
def add(self, name, value):
print "New cache item: %s" % str(name)
self.cache[name] = value
def get(self, name):
return self.cache[name]
def remove(self, name):
if name in self.cache:
del self.cache[name]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment