Commit 1e430f72 authored by sumpfralle's avatar sumpfralle

improved caching of remote process data


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@744 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent 9c04b15c
......@@ -27,6 +27,7 @@ from pycam.Geometry.Point import Point
from pycam.Geometry.utils import number, INFINITE, epsilon
from pycam.Geometry.intersection import intersect_circle_point, \
intersect_cylinder_point, intersect_cylinder_line
import uuid
class BaseCutter(object):
......@@ -49,6 +50,8 @@ class BaseCutter(object):
self.distance_radiussq = self.distance_radius ** 2
self.shape = {}
self.moveto(location)
self.uuid = None
self.update_uuid()
def get_minx(self, start=None):
if start is None:
......@@ -70,6 +73,9 @@ class BaseCutter(object):
start = self.location
return self.location.y + self.distance_radius
def update_uuid(self):
self.uuid = uuid.uuid4()
def __repr__(self):
return "BaseCutter"
......@@ -90,6 +96,7 @@ class BaseCutter(object):
self.required_distance = number(value)
self.distance_radius = self.radius + self.get_required_distance()
self.distance_radiussq = self.distance_radius * self.distance_radius
self.update_uuid()
def get_required_distance(self):
return self.required_distance
......
......@@ -34,6 +34,7 @@ from pycam.Geometry.utils import INFINITE
from pycam.Geometry import TransformableContainer
from pycam.Utils import ProgressCounter
import pycam.Utils.log
import uuid
log = pycam.Utils.log.get_logger()
......@@ -178,18 +179,27 @@ class Model(BaseModel):
self._triangles = []
self._item_groups.append(self._triangles)
self._export_function = pycam.Exporters.STLExporter.STLExporter
# marker for state of kdtree
self._kdtree_dirty = True
# marker for state of kdtree and uuid
self._dirty = True
# enable/disable kdtree
self._use_kdtree = use_kdtree
self._t_kdtree = None
self.__uuid = None
@property
def uuid(self):
if (self.__uuid is None) or self._dirty:
self.__uuid = str(uuid.uuid4())
if self._dirty:
self._update_kdtree()
return self.__uuid
def append(self, item):
super(Model, self).append(item)
if isinstance(item, Triangle):
self._triangles.append(item)
# we assume, that the kdtree needs to be rebuilt again
self._kdtree_dirty = True
self._dirty = True
def reset_cache(self):
super(Model, self).reset_cache()
......@@ -200,7 +210,7 @@ class Model(BaseModel):
if self._use_kdtree:
self._t_kdtree = TriangleKdtree(self.triangles())
# the kdtree is up-to-date again
self._kdtree_dirty = False
self._dirty = False
def triangles(self, minx=-INFINITE, miny=-INFINITE, minz=-INFINITE,
maxx=+INFINITE, maxy=+INFINITE, maxz=+INFINITE):
......@@ -209,7 +219,7 @@ class Model(BaseModel):
return self._triangles
if self._use_kdtree:
# update the kdtree, if new triangles were added meanwhile
if self._kdtree_dirty:
if self._dirty:
self._update_kdtree()
return self._t_kdtree.Search(minx, maxx, miny, maxy)
return self._triangles
......
......@@ -45,7 +45,7 @@ log = pycam.Utils.log.get_logger()
# We need to use a global function here - otherwise it does not work with
# the multiprocessing Pool.
def _process_one_triangle(((triangle,), (obj, z))):
def _process_one_triangle(obj, triangle, z):
result = []
if id(triangle) in obj._processed_triangles:
# skip triangles that are known to cause no collision
......@@ -287,7 +287,7 @@ class ContourFollow:
triangles = self.model.triangles(minx=minx, miny=miny, maxx=maxx,
maxy=maxy)
results_iter = run_in_parallel(_process_one_triangle,
[((t,), (self, z)) for t in triangles], unordered=True)
[(self, t, z) for t in triangles], unordered=True)
for result in results_iter:
for edge, shifted_edge in result:
waterline_triangles.add(edge, shifted_edge)
......
......@@ -33,8 +33,8 @@ log = pycam.Utils.log.get_logger()
# We need to use a global function here - otherwise it does not work with
# the multiprocessing Pool.
def _process_one_grid_line(((positions,), (minz, maxz, dim_attrs, model, cutter,
physics, safety_height))):
def _process_one_grid_line(positions, minz, maxz, dim_attrs, model, cutter,
physics, safety_height):
# for now only used for triangular collision detection
last_position = None
points = []
......@@ -130,8 +130,8 @@ class DropCutter:
args = []
for one_grid_line in grid:
args.append(((one_grid_line,), (minz, maxz, dim_attrs, self.model,
self.cutter, self.physics, self.model.maxz)))
args.append((one_grid_line, minz, maxz, dim_attrs, self.model,
self.cutter, self.physics, self.model.maxz))
# ODE does not work with multi-threading
disable_multiprocessing = not self.physics is None
for points, height_exceeded in run_in_parallel(_process_one_grid_line,
......
......@@ -31,7 +31,7 @@ import math
# We need to use a global function here - otherwise it does not work with
# the multiprocessing Pool.
def _process_one_line(((p1, p2), (depth, model, cutter, physics))):
def _process_one_line(p1, p2, depth, model, cutter, physics):
if physics:
points = get_free_paths_ode(physics, p1, p2, depth=depth)
else:
......@@ -135,7 +135,7 @@ class PushCutter:
p1, p2 = Point(x, miny, z), Point(x, maxy, z)
else:
p1, p2 = Point(minx, y, z), Point(maxx, y, z)
args.append(((p1, p2), (depth, self.model, self.cutter, self.physics)))
args.append((p1, p2, depth, self.model, self.cutter, self.physics))
# ODE does not work with multi-threading
disable_multiprocessing = not self.physics is None
......
......@@ -22,6 +22,7 @@ along with PyCAM. If not, see <http://www.gnu.org/licenses/>.
from pycam.Geometry.Triangle import Triangle
from pycam.Geometry.utils import number
import uuid
try:
import ode
......@@ -105,7 +106,7 @@ def get_parallelepiped_geom(low_points, high_points, space=None):
return geom
class PhysicalWorld:
class PhysicalWorld(object):
def __init__(self):
self._world = ode.World()
......@@ -115,6 +116,15 @@ class PhysicalWorld:
self._drill = None
self._drill_offset = None
self._collision_detected = False
self._dirty = True
self.__uuid = None
@property
def uuid(self):
if (self.__uuid is None) or self._dirty:
self.__uuid = str(uuid.uuid4())
self._dirty = False
return self.__uuid
def reset(self):
self._world = ode.World()
......@@ -124,6 +134,7 @@ class PhysicalWorld:
self._drill = None
self._drill_offset = None
self._collision_detected = False
self._dirty = True
def _add_geom(self, geom, position, append=True):
body = ode.Body(self._world)
......@@ -132,6 +143,7 @@ class PhysicalWorld:
geom.setBody(body)
if append:
self._obstacles.append(geom)
self._dirty = True
def add_mesh(self, triangles, position=None):
if position is None:
......@@ -141,6 +153,7 @@ class PhysicalWorld:
mesh.build(vertices, faces)
geom = ode.GeomTriMesh(mesh, self._space)
self._add_geom(geom, position)
self._dirty = True
def set_drill(self, shape, position):
#geom = ode.GeomTransform(self._space)
......@@ -155,6 +168,7 @@ class PhysicalWorld:
self._drill_offset = [number(value) for value in position]
self._drill = shape
self.reset_drill()
self._dirty = True
def extend_drill(self, diff_x, diff_y, diff_z):
try:
......@@ -162,6 +176,7 @@ class PhysicalWorld:
except ValueError:
return
func(diff_x, diff_y, diff_z)
self._dirty = True
def reset_drill(self):
try:
......@@ -169,6 +184,7 @@ class PhysicalWorld:
except ValueError:
return
func()
self._dirty = True
def set_drill_position(self, position):
if self._drill:
......
......@@ -200,19 +200,28 @@ def _handle_tasks(tasks, results, stats, cache):
global __multiprocessing
name = __multiprocessing.current_process().name
local_cache = {}
timeout_counter = 60
try:
while not tasks.empty():
while timeout_counter > 0:
try:
start_time = time.time()
job_id, func, args, cache_id = tasks.get(timeout=1.0)
if not cache_id in local_cache.keys():
local_cache[cache_id] = cache.get(cache_id)
job_id, func, args = tasks.get(timeout=1.0)
real_args = []
for arg in args:
if isinstance(arg, ProcessDataCacheItemID):
cache_id = arg.value
if not cache_id in local_cache.keys():
local_cache[cache_id] = cache.get(cache_id)
real_args.append(local_cache[cache_id])
else:
real_args.append(arg)
stats.add_transfer_time(name, time.time() - start_time)
start_time = time.time()
results.put((job_id, func((args, local_cache[cache_id]))))
results.put((job_id, func(*real_args)))
stats.add_process_time(name, time.time() - start_time)
except Queue.Empty:
break
time.sleep(1.0)
timeout_counter -= 1
except KeyboardInterrupt:
pass
......@@ -225,27 +234,27 @@ def run_in_parallel_remote(func, args_list, unordered=False,
if __multiprocessing and not disable_multiprocessing:
tasks_queue = __manager.tasks()
results_queue = __manager.results()
cache = __manager.cache()
remote_cache = __manager.cache()
stats = __manager.statistics()
previous_cache_values = {}
local_cache = {}
job_id = str(uuid.uuid1())
for args in args_list:
normal_args, cache_args = args
start_time = time.time()
for key, value in previous_cache_values.iteritems():
if cache_args == value:
cache_id = key
break
else:
cache_id = str(uuid.uuid4())
previous_cache_values[cache_id] = cache_args
cache.add(cache_id, cache_args)
tasks_queue.put((job_id, func, normal_args, cache_id))
result_args = []
for arg in args:
# add the argument to the cache if possible
if hasattr(arg, "uuid"):
data_uuid = ProcessDataCacheItemID(arg.uuid)
if not data_uuid in local_cache.keys():
local_cache[data_uuid] = arg
if not remote_cache.contains(data_uuid):
remote_cache.add(data_uuid, arg)
result_args.append(data_uuid)
else:
result_args.append(arg)
tasks_queue.put((job_id, func, result_args))
stats.add_queueing_time(__task_source_uuid, time.time() - start_time)
def job_cleanup():
# remove the previously added cached items
for key in previous_cache_values.keys():
cache.remove(key)
print stats.get_stats()
for index in range(len(args_list)):
try:
......@@ -255,7 +264,7 @@ def run_in_parallel_remote(func, args_list, unordered=False,
if result_job_id == job_id:
yield result
elif result_job_id in __finished_jobs:
# throw this result of an job away
# throw away this result of an old job
pass
else:
# put the result back to the queue for the next manager
......@@ -264,12 +273,12 @@ def run_in_parallel_remote(func, args_list, unordered=False,
time.sleep(0.5 + random.random())
except GeneratorExit:
# catch this specific (silent) exception and flush the task queue
queue_len = tasks.qsize()
queue_len = tasks_queue.qsize()
# remove all remaining tasks with the current job id
for index in queue_len:
this_job_id, func, normal_args, cache_id = tasks_queue.get(timeout=0.1)
if job_id != this_job_id:
tasks.put((this_job_id, func, normal_args, cache_id))
this_job_id, func, args = tasks_queue.get(timeout=0.1)
if this_job_id != job_id:
tasks_queue.put((this_job_id, func, args))
__finished_jobs.append(job_id)
while len(__finished_jobs) > 10:
__finished_jobs.pop(0)
......@@ -369,13 +378,29 @@ class ProcessDataCache(object):
def __init__(self):
self.cache = {}
def contains(self, name):
if isinstance(name, ProcessDataCacheItemID):
name = name.value
return name in self.cache.keys()
def add(self, name, value):
if isinstance(name, ProcessDataCacheItemID):
name = name.value
self.cache[name] = value
def get(self, name):
if isinstance(name, ProcessDataCacheItemID):
name = name.value
return self.cache[name]
def remove(self, name):
if isinstance(name, ProcessDataCacheItemID):
name = name.value
if name in self.cache:
del self.cache[name]
class ProcessDataCacheItemID(object):
def __init__(self, value):
self.value = value
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment