Source code for promises.multiprocess

# This library is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see
# <http://www.gnu.org/licenses/>.


"""
Multi-process Promises for Python

:author: Christopher O'Brien  <obriencj@gmail.com>
:license: LGPL v.3
"""


from multiprocessing.pool import Pool
from promises import promise, promise_proxy


__all__ = ( 'ProcessExecutor', 'ProxyProcessExecutor' )


def _perform_work(*args, **kwds):
    """
    This function is what the worker processes will use to collect the
    result from work (whether via return or raise)

    Returns
    -------
    value : `tuple`
      `(True, result)` if `work()` succeeds, else `(False, exc_info)`
      if an exception was raised
    """

    work, args = args

    try:
        return (True, work(*args, **kwds))
    except Exception as exc:
        # we are discarding the stack trace as it won't survive
        # pickling (which is how it will be passed back to the
        # handler from the worker process/thread)
        return (False, (type(exc), exc, None))


[docs]class ProcessExecutor(object): """ Create promises which will deliver in a separate process. """ def __init__(self, processes=None): self._processes = processes self._pool = None def __enter__(self): return self def __exit__(self, exc_type, _exc_val, _exc_tb): """ Using the managed interface forces blocking delivery at the end of the managed segment. """ self.deliver() return (exc_type is None) def _promise(self): """ override to use a different promise mechanism """ return promise(blocking=True) def _get_pool(self): """ override to provide a different pool implementation """ if not self._pool: self._pool = Pool(processes=self._processes) return self._pool
[docs] def future(self, work, *args, **kwds): """ Promise to deliver on the results of work in the future. Parameters ---------- work : `callable` This is the work which will be performed to deliver on the future. *args : `optional positional parameters` arguments to the `work` function **kwds : `optional named parameters` keyword arguments to the `work` function Returns ------- value : `promise` a promise acting as a placeholder for the result of evaluating `work(*args, **kwds)`. Note that calling `deliver` on this promise will potentially block until the underlying result is available. """ promised,setter,seterr = self._promise() def callback(value): # value is collected as the result of the _perform_work # function at the top of this module success, result = value if success: setter(result) else: seterr(*result) # queue up the work in our pool pool = self._get_pool() pool.apply_async(_perform_work, [work, args], kwds, callback) return promised
[docs] def terminate(self): """ Breaks all the remaining undelivered promises, halts execution of any parallel work being performed. Any promise which had not managed to be delivered will never be delivered after calling `terminate`. Attempting to call `deliver` on them will result in a deadlock. """ # TODO: is there a way for us to cause all undelivered # promises to raise an exception of some sort when this # happens? That would be better than deadlocking while waiting # for delivery. if self._pool is not None: self._pool.terminate() self._pool = None
[docs] def deliver(self): """ Deliver on all underlying promises. Blocks until complete. """ if self._pool is not None: self._pool.close() self._pool.join() self._pool = None
[docs]class ProxyProcessExecutor(ProcessExecutor): """ Create transparent proxy promises which will deliver in a separate process. """ def _promise(self): return promise_proxy(blocking=True) # # The end.