Commit 0c5b264a authored by sumpfralle's avatar sumpfralle

added statistics for each worker thread


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@738 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent f6e0e5f8
......@@ -25,7 +25,9 @@ import pycam.Utils.log
#import multiprocessing
#from multiprocessing.managers import SyncManager
import Queue
import uuid
import time
import os
DEFAULT_PORT = 1250
......@@ -78,6 +80,8 @@ def init_threading(number_of_processes=None, remote=None, run_server=False,
log.info("Enabled multi-threading with %d parallel processes" % __num_of_processes)
# initialize the manager
if __multiprocessing:
# initialize the uuid list for all workers
worker_uuid_list = [str(uuid.uuid1()) for index in range(__num_of_processes)]
if remote is None:
address = ('', DEFAULT_PORT)
else:
......@@ -103,11 +107,14 @@ def init_threading(number_of_processes=None, remote=None, run_server=False,
if remote is None:
tasks_queue = multiprocessing.Queue()
results_queue = multiprocessing.Queue()
statistics = ProcessStatistics()
TaskManager.register("tasks", callable=lambda: tasks_queue)
TaskManager.register("results", callable=lambda: results_queue)
TaskManager.register("statistics", callable=lambda: statistics)
else:
TaskManager.register("tasks")
TaskManager.register("results")
TaskManager.register("statistics")
__manager = TaskManager(address=address, authkey=server_credentials)
# run the local server, connect to a remote one or begin serving
if remote is None:
......@@ -119,38 +126,34 @@ def init_threading(number_of_processes=None, remote=None, run_server=False,
# create the spawning process
__closing = __manager.Value("b", False)
__spawner = __multiprocessing.Process(name="spawn", target=_spawn_daemon,
args=(__manager, __num_of_processes))
args=(__manager, __num_of_processes, worker_uuid_list))
__spawner.start()
# wait forever - in case of a server
if run_server:
log.info("Running a local server and waiting for remote connections.")
try:
# the server can be stopped via CTRL-C - it is caught later
__spawner.join()
except KeyboardInterrupt:
log.info("Quit requested")
# don't raise - this is just the normal way of quitting
pass
def cleanup():
global __manager, __spawner
global __manager, __closing
if __multiprocessing:
__spawner.terminate()
if __manager._process.is_alive():
__manager.shutdown(__manager)
__closing.set(True)
def _spawn_daemon(manager, number_of_processes):
def _spawn_daemon(manager, number_of_processes, worker_uuid_list):
""" wait for items in the 'tasks' queue to appear and then spawn workers
"""
global __multiprocessing, __closing
tasks = manager.tasks()
results = manager.results()
stats = manager.statistics()
try:
while not __closing.get():
if not tasks.empty():
workers = []
for index in range(number_of_processes):
worker = __multiprocessing.Process(name="task-%d" % index,
target=_handle_tasks, args=(tasks, results))
for task_id in worker_uuid_list:
worker = __multiprocessing.Process(
name="task-%s" % str(task_id), target=_handle_tasks,
args=(tasks, results, stats))
worker.start()
workers.append(worker)
# wait until all workers are finished
......@@ -162,14 +165,22 @@ def _spawn_daemon(manager, number_of_processes):
# set the "closing" flag and just exit
__closing.set(True)
def _handle_tasks(tasks, results):
def _handle_tasks(tasks, results, stats):
global __multiprocessing
name = __multiprocessing.current_process().name
try:
while not tasks.empty():
try:
func, args = tasks.get(timeout=0.5)
start_time = time.time()
func, args = tasks.get(timeout=1.0)
stats.add_transfer_time(name, time.time() - start_time)
start_time = time.time()
results.put(func(args))
stats.add_process_time(name, time.time() - start_time)
except Queue.Empty:
break
print stats.get_stats()
print
except KeyboardInterrupt:
pass
......@@ -221,3 +232,48 @@ def run_in_parallel_local(func, args, unordered=False, disable_multiprocessing=F
#run_in_parallel = run_in_parallel_local
run_in_parallel = run_in_parallel_remote
class OneProcess(object):
def __init__(self, name):
self.name = name
self.transfer_time = 0
self.transfer_count = 0
self.process_time = 0
self.process_count = 0
def __str__(self):
try:
return "Process %s: %s (%s/%s) - %s (%s/%s)" \
% (self.name, self.transfer_time/self.transfer_count,
self.transfer_time, self.transfer_count,
self.process_time/self.process_count,
self.process_time, self.process_count)
except ZeroDivisionError:
# race condition between adding new objects and output
return "Process %s"
class ProcessStatistics(object):
def __init__(self):
self.processes = {}
def __str__(self):
return os.linesep.join([str(process)
for process in self.processes.values()])
def get_stats(self):
return str(self)
def add_transfer_time(self, name, amount):
if not name in self.processes:
self.processes[name] = OneProcess(name)
self.processes[name].transfer_count += 1
self.processes[name].transfer_time += amount
def add_process_time(self, name, amount):
if not name in self.processes:
self.processes[name] = OneProcess()
self.processes[name].process_count += 1
self.processes[name].process_time += amount
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment