Commit 4bcf6ea7 authored by sumpfralle's avatar sumpfralle

simple implementation of a multiprocessing manager


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@734 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent b293a66a
......@@ -23,6 +23,11 @@ along with PyCAM. If not, see <http://www.gnu.org/licenses/>.
import pycam.Utils.log
# multiprocessing is imported later
#import multiprocessing
#from multiprocessing.managers import SyncManager
import Queue
import time
DEFAULT_PORT = 1250
log = pycam.Utils.log.get_logger()
......@@ -37,9 +42,12 @@ __multiprocessing = None
# needs to be initialized, if multiprocessing is enabled
__num_of_processes = None
__manager = None
__workers = None
def init_threading(number_of_processes=None):
global __multiprocessing, __num_of_processes
def init_threading(number_of_processes=None, host=None):
global __multiprocessing, __num_of_processes, __manager, __workers
try:
import multiprocessing
mp_is_available = True
......@@ -66,9 +74,74 @@ def init_threading(number_of_processes=None):
log.info("Disabled multi-threading")
else:
log.info("Enabled multi-threading with %d parallel processes" % __num_of_processes)
# initialize the manager
if __multiprocessing:
if host is None:
address = ('', DEFAULT_PORT)
else:
if ":" in host:
host, port = host.split(":", 1)
try:
port = int(port)
except ValueError:
log.warning(("Invalid port specified: '%s' - using default " \
+ "port (%d) instead") % (port, DEFAULT_PORT))
port = DEFAULT_PORT
else:
port = DEFAULT_PORT
address = (host, port)
tasks_queue = multiprocessing.Queue()
results_queue = multiprocessing.Queue()
from multiprocessing.managers import BaseManager
class TaskManager(BaseManager):
pass
TaskManager.register("tasks", callable=lambda: tasks_queue)
TaskManager.register("results", callable=lambda: results_queue)
__manager = TaskManager(address=address)
if not host is None:
__manager.connect()
else:
__manager.start()
def _handle_tasks(tasks, results):
while not tasks.empty():
try:
func, args = tasks.get(timeout=1.0)
results.put(func(args))
except Queue.Empty:
break
def run_in_parallel_remote(func, args_list, unordered=False,
disable_multiprocessing=False, host=None):
global __multiprocessing, __num_of_processes, __manager
if __multiprocessing is None:
# threading was not configured before
init_threading()
if __multiprocessing and not disable_multiprocessing:
tasks_queue = __manager.tasks()
results_queue = __manager.results()
for args in args_list:
tasks_queue.put((func, args))
workers = []
for index in range(__num_of_processes):
worker = __multiprocessing.Process(name="task-%d" % index,
target=_handle_tasks, args=(tasks_queue, results_queue))
worker.start()
workers.append(worker)
for index in range(len(args_list)):
try:
yield results_queue.get()
except GeneratorExit:
# catch this specific (silent) exception and kill all workers
for w in workers:
w.terminate()
# re-raise the GeneratorExit exception to finish destruction
raise
else:
for args in args_list:
yield func(args)
def run_in_parallel(func, args, unordered=False, disable_multiprocessing=False):
def run_in_parallel_local(func, args, unordered=False, disable_multiprocessing=False):
global __multiprocessing, __num_of_processes
if __multiprocessing is None:
# threading was not configured before
......@@ -89,3 +162,6 @@ def run_in_parallel(func, args, unordered=False, disable_multiprocessing=False):
for arg in args:
yield func(arg)
#run_in_parallel = run_in_parallel_local
run_in_parallel = run_in_parallel_remote
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment