Commit 6ea0ae76 authored by sumpfralle's avatar sumpfralle

experimental implementation of remote worker processes


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@735 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent 4bcf6ea7
...@@ -35,6 +35,7 @@ import pycam.Importers.TestModel ...@@ -35,6 +35,7 @@ import pycam.Importers.TestModel
import pycam.Importers import pycam.Importers
import pycam.Exporters.GCodeExporter import pycam.Exporters.GCodeExporter
import pycam.Toolpath.Generator import pycam.Toolpath.Generator
import pycam.Utils.threading
from pycam.Toolpath import Bounds, Toolpath from pycam.Toolpath import Bounds, Toolpath
from pycam import VERSION from pycam import VERSION
import pycam.Utils.log import pycam.Utils.log
...@@ -183,9 +184,15 @@ def execute(opts, args, pycam): ...@@ -183,9 +184,15 @@ def execute(opts, args, pycam):
log.info("Psyco was disabled via the commandline") log.info("Psyco was disabled via the commandline")
# initialize multiprocessing # initialize multiprocessing
if not opts.parallel_processes is None: if opts.start_server:
import pycam.Utils.threading pycam.Utils.threading.init_threading(opts.parallel_processes,
pycam.Utils.threading.init_threading(opts.parallel_processes) remote=opts.remote_server, run_server=True,
server_credentials=opts.server_authkey)
sys.exit(0)
else:
pycam.Utils.threading.init_threading(opts.parallel_processes,
remote=opts.remote_server,
server_credentials=opts.server_authkey)
# initialize the progress bar # initialize the progress bar
progress_styles = {"none": pycam.Gui.Console.ConsoleProgressBar.STYLE_NONE, progress_styles = {"none": pycam.Gui.Console.ConsoleProgressBar.STYLE_NONE,
...@@ -428,6 +435,18 @@ if __name__ == "__main__": ...@@ -428,6 +435,18 @@ if __name__ == "__main__":
help="override the default detection of multiple CPU cores. " \ help="override the default detection of multiple CPU cores. " \
+ "Parallel processing only works with Python 2.6 or " \ + "Parallel processing only works with Python 2.6 or " \
+ "later.") + "later.")
group_general.add_option("", "--remote-server", dest="remote_server",
default=None, action="store", type="string", help="Connect to a " \
+ "remote task server to distribute the processing load. " \
+ "The server is given as an IP or a hostname with an " \
+ "optional port (default: 1250) separated by a colon.")
group_general.add_option("", "--start-server-only", dest="start_server",
default=False, action="store_true", help="Start only a local " \
+ "server for handling remote requests.")
group_general.add_option("", "--server-auth-key", dest="server_authkey",
default="", action="store", type="string", help="Secret used for " \
+ "connecting to a remote server or for granting access " \
+ "to remote clients.")
group_general.add_option("-q", "--quiet", dest="quiet", group_general.add_option("-q", "--quiet", dest="quiet",
default=False, action="store_true", help="show only warnings and " \ default=False, action="store_true", help="show only warnings and " \
+ "errors.") + "errors.")
...@@ -579,3 +598,5 @@ if __name__ == "__main__": ...@@ -579,3 +598,5 @@ if __name__ == "__main__":
# issues. Any idea how to fix this? # issues. Any idea how to fix this?
execute(opts, args, pycam) execute(opts, args, pycam)
pycam.Utils.threading.cleanup()
...@@ -43,11 +43,12 @@ __multiprocessing = None ...@@ -43,11 +43,12 @@ __multiprocessing = None
__num_of_processes = None __num_of_processes = None
__manager = None __manager = None
__workers = None __spawner = None
def init_threading(number_of_processes=None, host=None): def init_threading(number_of_processes=None, remote=None, run_server=False,
global __multiprocessing, __num_of_processes, __manager, __workers server_credentials=""):
global __multiprocessing, __num_of_processes, __manager, __spawner
try: try:
import multiprocessing import multiprocessing
mp_is_available = True mp_is_available = True
...@@ -59,7 +60,7 @@ def init_threading(number_of_processes=None, host=None): ...@@ -59,7 +60,7 @@ def init_threading(number_of_processes=None, host=None):
if number_of_processes is None: if number_of_processes is None:
# use defaults # use defaults
# don't enable threading for a single cpu # don't enable threading for a single cpu
if multiprocessing.cpu_count() > 1: if (multiprocessing.cpu_count() > 1) or remote or run_server:
__multiprocessing = multiprocessing __multiprocessing = multiprocessing
__num_of_processes = multiprocessing.cpu_count() __num_of_processes = multiprocessing.cpu_count()
else: else:
...@@ -76,11 +77,11 @@ def init_threading(number_of_processes=None, host=None): ...@@ -76,11 +77,11 @@ def init_threading(number_of_processes=None, host=None):
log.info("Enabled multi-threading with %d parallel processes" % __num_of_processes) log.info("Enabled multi-threading with %d parallel processes" % __num_of_processes)
# initialize the manager # initialize the manager
if __multiprocessing: if __multiprocessing:
if host is None: if remote is None:
address = ('', DEFAULT_PORT) address = ('', DEFAULT_PORT)
else: else:
if ":" in host: if ":" in remote:
host, port = host.split(":", 1) host, port = remote.split(":", 1)
try: try:
port = int(port) port = int(port)
except ValueError: except ValueError:
...@@ -89,24 +90,67 @@ def init_threading(number_of_processes=None, host=None): ...@@ -89,24 +90,67 @@ def init_threading(number_of_processes=None, host=None):
port = DEFAULT_PORT port = DEFAULT_PORT
else: else:
port = DEFAULT_PORT port = DEFAULT_PORT
address = (host, port) address = (remote, port)
tasks_queue = multiprocessing.Queue()
results_queue = multiprocessing.Queue()
from multiprocessing.managers import BaseManager from multiprocessing.managers import BaseManager
class TaskManager(BaseManager): class TaskManager(BaseManager):
pass pass
TaskManager.register("tasks", callable=lambda: tasks_queue) if remote is None:
TaskManager.register("results", callable=lambda: results_queue) tasks_queue = multiprocessing.Queue()
__manager = TaskManager(address=address) results_queue = multiprocessing.Queue()
if not host is None: TaskManager.register("tasks", callable=lambda: tasks_queue)
__manager.connect() TaskManager.register("results", callable=lambda: results_queue)
else: else:
TaskManager.register("tasks")
TaskManager.register("results")
__manager = TaskManager(address=address, authkey=server_credentials)
# run the local server, connect to a remote one or begin serving
if remote is None:
__manager.start() __manager.start()
log.info("Started a local server.")
else:
__manager.connect()
log.info("Connected to a remote task server.")
# create the spawning process
__spawner = __multiprocessing.Process(name="spawn", target=_spawn_daemon,
args=(__manager, __num_of_processes))
__spawner.start()
# wait forever - in case of a server
if run_server:
log.info("Running a local server and waiting for remote connections.")
spawner.join()
def cleanup():
global __manager, __spawner
if __multiprocessing:
__manager.shutdown()
__spawner.terminate()
def _spawn_daemon(manager, number_of_processes):
""" wait for items in the 'tasks' queue to appear and then spawn workers
"""
global __multiprocessing
# wait for the server to become ready
while manager._state.value != __multiprocessing.managers.State.STARTED:
time.sleep(0.1)
tasks = manager.tasks()
while True:
if not tasks.empty():
workers = []
for index in range(number_of_processes):
worker = __multiprocessing.Process(name="task-%d" % index,
target=_handle_tasks, args=(manager.tasks(), manager.results()))
worker.start()
workers.append(worker)
# wait until all workers are finished
for worker in workers:
worker.join()
else:
time.sleep(0.5)
def _handle_tasks(tasks, results): def _handle_tasks(tasks, results):
while not tasks.empty(): while not tasks.empty():
try: try:
func, args = tasks.get(timeout=1.0) func, args = tasks.get(timeout=0.5)
results.put(func(args)) results.put(func(args))
except Queue.Empty: except Queue.Empty:
break break
...@@ -122,19 +166,13 @@ def run_in_parallel_remote(func, args_list, unordered=False, ...@@ -122,19 +166,13 @@ def run_in_parallel_remote(func, args_list, unordered=False,
results_queue = __manager.results() results_queue = __manager.results()
for args in args_list: for args in args_list:
tasks_queue.put((func, args)) tasks_queue.put((func, args))
workers = []
for index in range(__num_of_processes):
worker = __multiprocessing.Process(name="task-%d" % index,
target=_handle_tasks, args=(tasks_queue, results_queue))
worker.start()
workers.append(worker)
for index in range(len(args_list)): for index in range(len(args_list)):
try: try:
yield results_queue.get() yield results_queue.get()
except GeneratorExit: except GeneratorExit:
# catch this specific (silent) exception and kill all workers # catch this specific (silent) exception and flush the task queue
for w in workers: while not tasks_queue.empty():
w.terminate() tasks_queue.get(timeout=0.1)
# re-raise the GeneratorExit exception to finish destruction # re-raise the GeneratorExit exception to finish destruction
raise raise
else: else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment