Commit 43aa8ce4 authored by sumpfralle's avatar sumpfralle

added an expiry timeout for cache items (process pool)


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@783 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent c1f6d471
...@@ -221,10 +221,15 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list): ...@@ -221,10 +221,15 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
log.debug("Spawner daemon started with %d processes" % number_of_processes) log.debug("Spawner daemon started with %d processes" % number_of_processes)
log.debug("Registering %d worker threads: %s" \ log.debug("Registering %d worker threads: %s" \
% (len(worker_uuid_list), worker_uuid_list)) % (len(worker_uuid_list), worker_uuid_list))
last_cache_update = time.time()
# use only the hostname (for brevity) - no domain part # use only the hostname (for brevity) - no domain part
hostname = platform.node().split(".", 1)[0] hostname = platform.node().split(".", 1)[0]
try: try:
while not __closing.get(): while not __closing.get():
# check the expire timeout of the cache from time to time
if last_cache_update + 30 < time.time():
cache.expire_cache_items()
last_cache_update = time.time()
if not tasks.empty(): if not tasks.empty():
workers = [] workers = []
for task_id in worker_uuid_list: for task_id in worker_uuid_list:
...@@ -251,7 +256,7 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list): ...@@ -251,7 +256,7 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
def _handle_tasks(tasks, results, stats, cache, closing): def _handle_tasks(tasks, results, stats, cache, closing):
global __multiprocessing global __multiprocessing
name = __multiprocessing.current_process().name name = __multiprocessing.current_process().name
local_cache = {} local_cache = ProcessDataCache()
timeout_limit = 60 timeout_limit = 60
timeout_counter = 0 timeout_counter = 0
last_worker_notification = 0 last_worker_notification = 0
...@@ -269,20 +274,25 @@ def _handle_tasks(tasks, results, stats, cache, closing): ...@@ -269,20 +274,25 @@ def _handle_tasks(tasks, results, stats, cache, closing):
real_args = [] real_args = []
for arg in args: for arg in args:
if isinstance(arg, ProcessDataCacheItemID): if isinstance(arg, ProcessDataCacheItemID):
cache_id = arg.value try:
if not cache_id in local_cache.keys(): value = local_cache.get(arg)
local_cache[cache_id] = cache.get(cache_id) except KeyError:
real_args.append(local_cache[cache_id]) # TODO: we will break hard, if the item is expired
value = cache.get(arg)
local_cache.add(arg, value)
real_args.append(value)
elif isinstance(arg, list) and [True for item in arg \ elif isinstance(arg, list) and [True for item in arg \
if isinstance(item, ProcessDataCacheItemID)]: if isinstance(item, ProcessDataCacheItemID)]:
# check if any item in the list is cacheable # check if any item in the list is cacheable
args_list = [] args_list = []
for item in arg: for item in arg:
if isinstance(item, ProcessDataCacheItemID): if isinstance(item, ProcessDataCacheItemID):
cache_id = item.value try:
if not cache_id in local_cache.keys(): value = local_cache.get(item)
local_cache[cache_id] = cache.get(cache_id) except KeyError:
args_list.append(local_cache[cache_id]) value = cache.get(item)
local_cache.add(item, value)
args_list.append(value)
else: else:
args_list.append(item) args_list.append(item)
real_args.append(args_list) real_args.append(args_list)
...@@ -456,19 +466,18 @@ class OneProcess(object): ...@@ -456,19 +466,18 @@ class OneProcess(object):
class ProcessStatistics(object): class ProcessStatistics(object):
EXPIRY_TIMER = 120 def __init__(self, timeout=120):
def __init__(self):
self.processes = {} self.processes = {}
self.queues = {} self.queues = {}
self.workers = {} self.workers = {}
self.timeout = timeout
def __str__(self): def __str__(self):
return os.linesep.join([str(item) return os.linesep.join([str(item)
for item in self.processes.values() + self.queues.values()]) for item in self.processes.values() + self.queues.values()])
def _refresh_workers(self): def _refresh_workers(self):
oldest_valid = time.time() - self.EXPIRY_TIMER oldest_valid = time.time() - self.timeout
for key in self.workers.keys(): for key in self.workers.keys():
# be careful: maybe the workers dictionary changed in between # be careful: maybe the workers dictionary changed in between
try: try:
...@@ -528,32 +537,53 @@ class ProcessStatistics(object): ...@@ -528,32 +537,53 @@ class ProcessStatistics(object):
return result return result
# TODO: implement an expiry time for cache items
class ProcessDataCache(object): class ProcessDataCache(object):
def __init__(self): def __init__(self, timeout=600):
self.cache = {} self.cache = {}
self.timeout = timeout
def _update_timestamp(self, name):
if isinstance(name, ProcessDataCacheItemID):
name = name.value
now = time.time()
try:
self.cache[name][1] = now
except KeyError:
# the item was deleted meanwhile
pass
def expire_cache_items(self):
expired = time.time() - self.timeout
for key in self.cache.keys():
try:
if self.cache[key][1] < expired:
del self.cache[key]
except KeyError:
# ignore removed items
pass
def contains(self, name): def contains(self, name):
if isinstance(name, ProcessDataCacheItemID): if isinstance(name, ProcessDataCacheItemID):
name = name.value name = name.value
self._update_timestamp(name)
self.expire_cache_items()
return name in self.cache.keys() return name in self.cache.keys()
def add(self, name, value): def add(self, name, value):
now = time.time()
if isinstance(name, ProcessDataCacheItemID): if isinstance(name, ProcessDataCacheItemID):
name = name.value name = name.value
self.cache[name] = value self.expire_cache_items()
self.cache[name] = [value, now]
def get(self, name): def get(self, name):
if isinstance(name, ProcessDataCacheItemID): if isinstance(name, ProcessDataCacheItemID):
name = name.value name = name.value
return self.cache[name] self._update_timestamp(name)
self.expire_cache_items()
return self.cache[name][0]
def remove(self, name):
if isinstance(name, ProcessDataCacheItemID):
name = name.value
if name in self.cache:
del self.cache[name]
class ProcessDataCacheItemID(object): class ProcessDataCacheItemID(object):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment