Commit 088f08ab authored by sumpfralle's avatar sumpfralle

fixed keyboard interrupt handling


git-svn-id: https://pycam.svn.sourceforge.net/svnroot/pycam/trunk@736 bbaffbd6-741e-11dd-a85d-61de82d9cad9
parent 6ea0ae76
...@@ -188,6 +188,7 @@ def execute(opts, args, pycam): ...@@ -188,6 +188,7 @@ def execute(opts, args, pycam):
pycam.Utils.threading.init_threading(opts.parallel_processes, pycam.Utils.threading.init_threading(opts.parallel_processes,
remote=opts.remote_server, run_server=True, remote=opts.remote_server, run_server=True,
server_credentials=opts.server_authkey) server_credentials=opts.server_authkey)
pycam.Utils.threading.cleanup()
sys.exit(0) sys.exit(0)
else: else:
pycam.Utils.threading.init_threading(opts.parallel_processes, pycam.Utils.threading.init_threading(opts.parallel_processes,
...@@ -597,6 +598,5 @@ if __name__ == "__main__": ...@@ -597,6 +598,5 @@ if __name__ == "__main__":
# We need to add the parameter "pycam" to avoid weeeeird namespace # We need to add the parameter "pycam" to avoid weeeeird namespace
# issues. Any idea how to fix this? # issues. Any idea how to fix this?
execute(opts, args, pycam) execute(opts, args, pycam)
pycam.Utils.threading.cleanup() pycam.Utils.threading.cleanup()
...@@ -44,11 +44,12 @@ __num_of_processes = None ...@@ -44,11 +44,12 @@ __num_of_processes = None
__manager = None __manager = None
__spawner = None __spawner = None
__closing = None
def init_threading(number_of_processes=None, remote=None, run_server=False, def init_threading(number_of_processes=None, remote=None, run_server=False,
server_credentials=""): server_credentials=""):
global __multiprocessing, __num_of_processes, __manager, __spawner global __multiprocessing, __num_of_processes, __manager, __spawner, __closing
try: try:
import multiprocessing import multiprocessing
mp_is_available = True mp_is_available = True
...@@ -91,9 +92,14 @@ def init_threading(number_of_processes=None, remote=None, run_server=False, ...@@ -91,9 +92,14 @@ def init_threading(number_of_processes=None, remote=None, run_server=False,
else: else:
port = DEFAULT_PORT port = DEFAULT_PORT
address = (remote, port) address = (remote, port)
from multiprocessing.managers import BaseManager from multiprocessing.managers import SyncManager
class TaskManager(BaseManager): class TaskManager(SyncManager):
pass @classmethod
def _run_server(cls, *args):
# make sure that the server ignores SIGINT (KeyboardInterrupt)
import signal
signal.signal(signal.SIGINT, signal.SIG_IGN)
SyncManager._run_server(*args)
if remote is None: if remote is None:
tasks_queue = multiprocessing.Queue() tasks_queue = multiprocessing.Queue()
results_queue = multiprocessing.Queue() results_queue = multiprocessing.Queue()
...@@ -111,49 +117,61 @@ def init_threading(number_of_processes=None, remote=None, run_server=False, ...@@ -111,49 +117,61 @@ def init_threading(number_of_processes=None, remote=None, run_server=False,
__manager.connect() __manager.connect()
log.info("Connected to a remote task server.") log.info("Connected to a remote task server.")
# create the spawning process # create the spawning process
__closing = __manager.Value("b", False)
__spawner = __multiprocessing.Process(name="spawn", target=_spawn_daemon, __spawner = __multiprocessing.Process(name="spawn", target=_spawn_daemon,
args=(__manager, __num_of_processes)) args=(__manager, __num_of_processes))
__spawner.start() __spawner.start()
# wait forever - in case of a server # wait forever - in case of a server
if run_server: if run_server:
log.info("Running a local server and waiting for remote connections.") log.info("Running a local server and waiting for remote connections.")
spawner.join() try:
__spawner.join()
except KeyboardInterrupt:
log.info("Quit requested")
# don't raise - this is just the normal way of quitting
pass
def cleanup(): def cleanup():
global __manager, __spawner global __manager, __spawner
if __multiprocessing: if __multiprocessing:
__manager.shutdown()
__spawner.terminate() __spawner.terminate()
if __manager._process.is_alive():
__manager.shutdown(__manager)
def _spawn_daemon(manager, number_of_processes): def _spawn_daemon(manager, number_of_processes):
""" wait for items in the 'tasks' queue to appear and then spawn workers """ wait for items in the 'tasks' queue to appear and then spawn workers
""" """
global __multiprocessing global __multiprocessing, __closing
# wait for the server to become ready
while manager._state.value != __multiprocessing.managers.State.STARTED:
time.sleep(0.1)
tasks = manager.tasks() tasks = manager.tasks()
while True: results = manager.results()
if not tasks.empty(): try:
workers = [] while not __closing.get():
for index in range(number_of_processes): if not tasks.empty():
worker = __multiprocessing.Process(name="task-%d" % index, workers = []
target=_handle_tasks, args=(manager.tasks(), manager.results())) for index in range(number_of_processes):
worker.start() worker = __multiprocessing.Process(name="task-%d" % index,
workers.append(worker) target=_handle_tasks, args=(tasks, results))
# wait until all workers are finished worker.start()
for worker in workers: workers.append(worker)
worker.join() # wait until all workers are finished
else: for worker in workers:
time.sleep(0.5) worker.join()
else:
time.sleep(0.2)
except KeyboardInterrupt:
# set the "closing" flag and just exit
__closing.set(True)
def _handle_tasks(tasks, results): def _handle_tasks(tasks, results):
while not tasks.empty(): try:
try: while not tasks.empty():
func, args = tasks.get(timeout=0.5) try:
results.put(func(args)) func, args = tasks.get(timeout=0.5)
except Queue.Empty: results.put(func(args))
break except Queue.Empty:
break
except KeyboardInterrupt:
pass
def run_in_parallel_remote(func, args_list, unordered=False, def run_in_parallel_remote(func, args_list, unordered=False,
disable_multiprocessing=False, host=None): disable_multiprocessing=False, host=None):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment