Commit 665815b7 authored by sumpfralle's avatar sumpfralle

use the hostname in the worker's id

make "get_statistics" (hopefully) even more thread-safe


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@767 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent 3dff29f3
...@@ -25,6 +25,7 @@ import pycam.Utils.log ...@@ -25,6 +25,7 @@ import pycam.Utils.log
#import multiprocessing #import multiprocessing
#from multiprocessing.managers import SyncManager #from multiprocessing.managers import SyncManager
import Queue import Queue
import platform
import random import random
import uuid import uuid
import time import time
...@@ -213,12 +214,13 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list): ...@@ -213,12 +214,13 @@ def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
log.debug("Spawner daemon started with %d processes" % number_of_processes) log.debug("Spawner daemon started with %d processes" % number_of_processes)
log.debug("Registering %d worker threads: %s" \ log.debug("Registering %d worker threads: %s" \
% (len(worker_uuid_list), worker_uuid_list)) % (len(worker_uuid_list), worker_uuid_list))
hostname = platform.node()
try: try:
while not __closing.get(): while not __closing.get():
if not tasks.empty(): if not tasks.empty():
workers = [] workers = []
for task_id in worker_uuid_list: for task_id in worker_uuid_list:
task_name = "task-%s" % str(task_id) task_name = "%s-%s" % (hostname, task_id)
worker = __multiprocessing.Process( worker = __multiprocessing.Process(
name=task_name, target=_handle_tasks, name=task_name, target=_handle_tasks,
args=(tasks, results, stats, cache, args=(tasks, results, stats, cache,
...@@ -430,27 +432,32 @@ class ProcessStatistics(object): ...@@ -430,27 +432,32 @@ class ProcessStatistics(object):
def _refresh_workers(self): def _refresh_workers(self):
oldest_valid = time.time() - self.EXPIRY_TIMER oldest_valid = time.time() - self.EXPIRY_TIMER
for key, timestamp in self.workers.iteritems(): for key in self.workers.keys():
if timestamp < oldest_valid: # be careful: maybe the workers dictionary changed in between
del self.workers[key] try:
timestampe = self.workers[key]
if timestamp < oldest_valid:
del self.workers[key]
except KeyError:
pass
def get_stats(self): def get_stats(self):
return str(self) return str(self)
def add_transfer_time(self, name, amount): def add_transfer_time(self, name, amount):
if not name in self.processes: if not name in self.processes.keys():
self.processes[name] = OneProcess(name) self.processes[name] = OneProcess(name)
self.processes[name].transfer_count += 1 self.processes[name].transfer_count += 1
self.processes[name].transfer_time += amount self.processes[name].transfer_time += amount
def add_process_time(self, name, amount): def add_process_time(self, name, amount):
if not name in self.processes: if not name in self.processes.keys():
self.processes[name] = OneProcess(name) self.processes[name] = OneProcess(name)
self.processes[name].process_count += 1 self.processes[name].process_count += 1
self.processes[name].process_time += amount self.processes[name].process_time += amount
def add_queueing_time(self, name, amount): def add_queueing_time(self, name, amount):
if not name in self.queues: if not name in self.queues.keys():
self.queues[name] = OneProcess(name, is_queue=True) self.queues[name] = OneProcess(name, is_queue=True)
self.queues[name].transfer_count += 1 self.queues[name].transfer_count += 1
self.queues[name].transfer_time += amount self.queues[name].transfer_time += amount
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment