(original) (raw)

Property changes on: . ___________________________________________________________________ Name: svnmerge-integrated - /python/trunk:1-61437,61439-61441,61443-61453,61455-61474,61476-61477,61479-61485,61487-61488,61490,61493-61528,61530-61563,61565-61569,61571-61576,61578-61584,61586,61588-61589,61591-61778,61780-61809,61811-61866,61868-61870,61872-61874,61876-61877,61883-61903,61905-61935,61938-61939,61941-62004,62007-62014,62016-62018,62021-62022,62024,62027,62029,62031-62066,62068-62074,62076-62094,62096,62098,62100-62102,62104-62122,62124-62125,62127-62137,62139-62142,62144-62194,62196-62222,62224-62308,62311-62397,62399-62430,62432-62510,62512-62516,62519-62535,62537-62598,62600-62665,62667-62683,62685-62719,62721,62723-62792,62794-62796,62798-62860,62862-62864,62866-62878,62880-62883,62885-62887,62889-62899,62901-62905,62914-62916,62918-62919,62921-62922,62924-62942,62944-62949,62954-62959,62961,62963-62967,62969-62970,62972-62973,62975-62976,62978-62982,62984,62987-62996,62998-63003,63005-63006,63009-63012,63014-63017,63019-63020,63022-63024,63026-63029,63031-63045,63047-63054,63056-63062,63066-63079,63081-63085,63087-63097,63099,63101-63104,63119-63128,63130-63133,63135-63144,63146-63148,63151-63152,63155-63165,63167-63176,63181-63186,63188-63189,63208-63209,63211-63212,63214-63217,63219-63224,63226-63227,63229-63232,63234-63235,63237-63239,63241,63243-63248,63250-63254,63256-63259,63261,63263-63264,63266-63267,63269-63270,63272-63273,63275-63276,63278,63280-63281,63283-63284,63286-63287,63289-63290,63292-63293,63295-63296,63298-63299,63301-63302,63304-63305,63307,63309-63314,63316-63322,63324-63325,63327-63335,63337-63342,63344-63346,63348,63357,63361-63373,63375,63377-63380,63395-63402,63404-63407,63411-63412,63416-63420,63423-63424,63427,63429,63431-63433,63435-63439,63441-63457,63459-63468,63470-63480,63482-63484,63492,63496-63497,63499-63512,63514-63527,63529-63536,63538-63544,63546,63548,63550-63567,63569-63585,63590-63596,63602,63608-63610,63612-63616,63619-63629,63631-63648,63650-63659,63661-63664,63666-63671,63673-63674,63676,63678,63680-63684,63687-63690,63692-63703,63705-63713,63715-63717,63720,63722-63724,63726-63735,63737,63739-63741,63743-63744,63746-63766,63768-63775,63777-63798,63801-63813,63815-63827,63829-63845,63847,63850-63860,63862,63864-63872,63874-63898,63900-63909,63911-63913,63915-63931,63933-63941,63944,63946-63954,63956-63960,63964,63966,63968-63981,63983-63996,63998-64015,64017,64020-64027,64029-64030,64032-64039,64041-64043,64046-64047,64049-64056,64059-64061,64063-64067,64070-64071 + /python/trunk:1-61437,61439-61441,61443-61453,61455-61474,61476-61477,61479-61485,61487-61488,61490,61493-61528,61530-61563,61565-61569,61571-61576,61578-61584,61586,61588-61589,61591-61778,61780-61809,61811-61866,61868-61870,61872-61874,61876-61877,61883-61903,61905-61935,61938-61939,61941-62004,62007-62014,62016-62018,62021-62022,62024,62027,62029,62031-62066,62068-62074,62076-62094,62096,62098,62100-62102,62104-62122,62124-62125,62127-62137,62139-62142,62144-62194,62196-62222,62224-62308,62311-62397,62399-62430,62432-62510,62512-62516,62519-62535,62537-62598,62600-62665,62667-62683,62685-62719,62721,62723-62792,62794-62796,62798-62860,62862-62864,62866-62878,62880-62883,62885-62887,62889-62899,62901-62905,62914-62916,62918-62919,62921-62922,62924-62942,62944-62949,62954-62959,62961,62963-62967,62969-62970,62972-62973,62975-62976,62978-62982,62984,62987-62996,62998-63003,63005-63006,63009-63012,63014-63017,63019-63020,63022-63024,63026-63029,63031-63045,63047-63054,63056-63062,63066-63079,63081-63085,63087-63097,63099,63101-63104,63119-63128,63130-63133,63135-63144,63146-63148,63151-63152,63155-63165,63167-63176,63181-63186,63188-63189,63208-63209,63211-63212,63214-63217,63219-63224,63226-63227,63229-63232,63234-63235,63237-63239,63241,63243-63248,63250-63254,63256-63259,63261,63263-63264,63266-63267,63269-63270,63272-63273,63275-63276,63278,63280-63281,63283-63284,63286-63287,63289-63290,63292-63293,63295-63296,63298-63299,63301-63302,63304-63305,63307,63309-63314,63316-63322,63324-63325,63327-63335,63337-63342,63344-63346,63348,63357,63361-63373,63375,63377-63380,63395-63402,63404-63407,63411-63412,63416-63420,63423-63424,63427,63429,63431-63433,63435-63439,63441-63457,63459-63468,63470-63480,63482-63484,63492,63496-63497,63499-63512,63514-63527,63529-63536,63538-63544,63546,63548,63550-63567,63569-63585,63590-63596,63602,63608-63610,63612-63616,63619-63629,63631-63648,63650-63659,63661-63664,63666-63671,63673-63674,63676,63678,63680-63684,63687-63690,63692-63703,63705-63713,63715-63717,63720,63722-63724,63726-63735,63737,63739-63741,63743-63744,63746-63766,63768-63775,63777-63798,63801-63813,63815-63827,63829-63845,63847,63850-63860,63862,63864-63872,63874-63898,63900-63909,63911-63913,63915-63931,63933-63941,63944,63946-63954,63956-63960,63964,63966,63968-63981,63983-63996,63998-64015,64017,64020-64027,64029-64030,64032-64039,64041-64043,64046-64047,64049-64056,64059-64061,64063-64067,64070-64071,64104,64106-64112,64117 Index: setup.py =================================================================== --- setup.py (revision 64120) +++ setup.py (working copy) @@ -1110,8 +1110,66 @@ # _fileio -- supposedly cross platform exts.append(Extension('_fileio', ['_fileio.c'])) + # Richard Oudkerk's multiprocessing module + if platform == 'win32': # Windows + macros = dict() + libraries = ['ws2_32'] + elif platform == 'darwin': # Mac OSX + macros = dict( + HAVE_SEM_OPEN=1, + HAVE_SEM_TIMEDWAIT=0, + HAVE_FD_TRANSFER=1, + HAVE_BROKEN_SEM_GETVALUE=1 + ) + libraries = [] + + elif platform == 'cygwin': # Cygwin + macros = dict( + HAVE_SEM_OPEN=1, + HAVE_SEM_TIMEDWAIT=1, + HAVE_FD_TRANSFER=0, + HAVE_BROKEN_SEM_UNLINK=1 + ) + libraries = [] + else: # Linux and other unices + macros = dict( + HAVE_SEM_OPEN=1, + HAVE_SEM_TIMEDWAIT=1, + HAVE_FD_TRANSFER=1 + ) + libraries = ['rt'] + + if platform == 'win32': + multiprocessing_srcs = [ '_multiprocessing/multiprocessing.c', + '_multiprocessing/semaphore.c', + '_multiprocessing/pipe_connection.c', + '_multiprocessing/socket_connection.c', + '_multiprocessing/win32_functions.c' + ] + + else: + multiprocessing_srcs = [ '_multiprocessing/multiprocessing.c', + '_multiprocessing/socket_connection.c' + ] + + if macros.get('HAVE_SEM_OPEN', False): + multiprocessing_srcs.append('_multiprocessing/semaphore.c') + + exts.append ( Extension('_multiprocessing', multiprocessing_srcs, + define_macros=list(macros.items()), + include_dirs=["Modules/_multiprocessing"])) + # End multiprocessing + + # Platform-specific libraries + if platform == 'linux2': + # Linux-specific modules + exts.append( Extension('linuxaudiodev', ['linuxaudiodev.c']) ) + else: + missing.append('linuxaudiodev') + + # Platform-specific libraries if platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6', 'freebsd7', 'freebsd8'): exts.append( Extension('ossaudiodev', ['ossaudiodev.c']) ) Index: Doc/library/someos.rst =================================================================== --- Doc/library/someos.rst (revision 64120) +++ Doc/library/someos.rst (working copy) @@ -18,6 +18,7 @@ dummy_threading.rst _thread.rst _dummy_thread.rst + multiprocessing.rst mmap.rst readline.rst rlcompleter.rst Index: Lib/multiprocessing/pool.py =================================================================== --- Lib/multiprocessing/pool.py (revision 64104) +++ Lib/multiprocessing/pool.py (working copy) @@ -1,596 +1,596 @@ -# -# Module providing the `Pool` class for managing a process pool -# -# multiprocessing/pool.py -# -# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt -# - -__all__ = ['Pool'] - -# -# Imports -# - -import threading -import Queue -import itertools -import collections -import time - -from multiprocessing import Process, cpu_count, TimeoutError -from multiprocessing.util import Finalize, debug - -# -# Constants representing the state of a pool -# - -RUN = 0 -CLOSE = 1 -TERMINATE = 2 - -# -# Miscellaneous -# - -job_counter = itertools.count() - -def mapstar(args): - return map(*args) - -# -# Code run by worker processes -# - -def worker(inqueue, outqueue, initializer=None, initargs=()): - put = outqueue.put - get = inqueue.get - if hasattr(inqueue, '_writer'): - inqueue._writer.close() - outqueue._reader.close() - - if initializer is not None: - initializer(*initargs) - - while 1: - try: - task = get() - except (EOFError, IOError): - debug('worker got EOFError or IOError -- exiting') - break - - if task is None: - debug('worker got sentinel -- exiting') - break - - job, i, func, args, kwds = task - try: - result = (True, func(*args, **kwds)) - except Exception, e: - result = (False, e) - put((job, i, result)) - -# -# Class representing a process pool -# - -class Pool(object): - ''' - Class which supports an async version of the `apply()` builtin - ''' - Process = Process - - def __init__(self, processes=None, initializer=None, initargs=()): - self._setup_queues() - self._taskqueue = Queue.Queue() - self._cache = {} - self._state = RUN - - if processes is None: - try: - processes = cpu_count() - except NotImplementedError: - processes = 1 - - self._pool = [] - for i in range(processes): - w = self.Process( - target=worker, - args=(self._inqueue, self._outqueue, initializer, initargs) - ) - self._pool.append(w) - w.set_name(w.get_name().replace('Process', 'PoolWorker')) - w.set_daemon(True) - w.start() - - self._task_handler = threading.Thread( - target=Pool._handle_tasks, - args=(self._taskqueue, self._quick_put, self._outqueue, self._pool) - ) - self._task_handler.setDaemon(True) - self._task_handler._state = RUN - self._task_handler.start() - - self._result_handler = threading.Thread( - target=Pool._handle_results, - args=(self._outqueue, self._quick_get, self._cache) - ) - self._result_handler.setDaemon(True) - self._result_handler._state = RUN - self._result_handler.start() - - self._terminate = Finalize( - self, self._terminate_pool, - args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, - self._task_handler, self._result_handler, self._cache), - exitpriority=15 - ) - - def _setup_queues(self): - from .queues import SimpleQueue - self._inqueue = SimpleQueue() - self._outqueue = SimpleQueue() - self._quick_put = self._inqueue._writer.send - self._quick_get = self._outqueue._reader.recv - - def apply(self, func, args=(), kwds={}): - ''' - Equivalent of `apply()` builtin - ''' - assert self._state == RUN - return self.apply_async(func, args, kwds).get() - - def map(self, func, iterable, chunksize=None): - ''' - Equivalent of `map()` builtin - ''' - assert self._state == RUN - return self.map_async(func, iterable, chunksize).get() - - def imap(self, func, iterable, chunksize=1): - ''' - Equivalent of `itertool.imap()` -- can be MUCH slower than `Pool.map()` - ''' - assert self._state == RUN - if chunksize == 1: - result = IMapIterator(self._cache) - self._taskqueue.put((((result._job, i, func, (x,), {}) - for i, x in enumerate(iterable)), result._set_length)) - return result - else: - assert chunksize > 1 - task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapIterator(self._cache) - self._taskqueue.put((((result._job, i, mapstar, (x,), {}) - for i, x in enumerate(task_batches)), result._set_length)) - return (item for chunk in result for item in chunk) - - def imap_unordered(self, func, iterable, chunksize=1): - ''' - Like `imap()` method but ordering of results is arbitrary - ''' - assert self._state == RUN - if chunksize == 1: - result = IMapUnorderedIterator(self._cache) - self._taskqueue.put((((result._job, i, func, (x,), {}) - for i, x in enumerate(iterable)), result._set_length)) - return result - else: - assert chunksize > 1 - task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapUnorderedIterator(self._cache) - self._taskqueue.put((((result._job, i, mapstar, (x,), {}) - for i, x in enumerate(task_batches)), result._set_length)) - return (item for chunk in result for item in chunk) - - def apply_async(self, func, args=(), kwds={}, callback=None): - ''' - Asynchronous equivalent of `apply()` builtin - ''' - assert self._state == RUN - result = ApplyResult(self._cache, callback) - self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) - return result - - def map_async(self, func, iterable, chunksize=None, callback=None): - ''' - Asynchronous equivalent of `map()` builtin - ''' - assert self._state == RUN - if not hasattr(iterable, '__len__'): - iterable = list(iterable) - - if chunksize is None: - chunksize, extra = divmod(len(iterable), len(self._pool) * 4) - if extra: - chunksize += 1 - - task_batches = Pool._get_tasks(func, iterable, chunksize) - result = MapResult(self._cache, chunksize, len(iterable), callback) - self._taskqueue.put((((result._job, i, mapstar, (x,), {}) - for i, x in enumerate(task_batches)), None)) - return result - - @staticmethod - def _handle_tasks(taskqueue, put, outqueue, pool): - thread = threading.currentThread() - - for taskseq, set_length in iter(taskqueue.get, None): - i = -1 - for i, task in enumerate(taskseq): - if thread._state: - debug('task handler found thread._state != RUN') - break - try: - put(task) - except IOError: - debug('could not put task on queue') - break - else: - if set_length: - debug('doing set_length()') - set_length(i+1) - continue - break - else: - debug('task handler got sentinel') - - - try: - # tell result handler to finish when cache is empty - debug('task handler sending sentinel to result handler') - outqueue.put(None) - - # tell workers there is no more work - debug('task handler sending sentinel to workers') - for p in pool: - put(None) - except IOError: - debug('task handler got IOError when sending sentinels') - - debug('task handler exiting') - - @staticmethod - def _handle_results(outqueue, get, cache): - thread = threading.currentThread() - - while 1: - try: - task = get() - except (IOError, EOFError): - debug('result handler got EOFError/IOError -- exiting') - return - - if thread._state: - assert thread._state == TERMINATE - debug('result handler found thread._state=TERMINATE') - break - - if task is None: - debug('result handler got sentinel') - break - - job, i, obj = task - try: - cache[job]._set(i, obj) - except KeyError: - pass - - while cache and thread._state != TERMINATE: - try: - task = get() - except (IOError, EOFError): - debug('result handler got EOFError/IOError -- exiting') - return - - if task is None: - debug('result handler ignoring extra sentinel') - continue - job, i, obj = task - try: - cache[job]._set(i, obj) - except KeyError: - pass - - if hasattr(outqueue, '_reader'): - debug('ensuring that outqueue is not full') - # If we don't make room available in outqueue then - # attempts to add the sentinel (None) to outqueue may - # block. There is guaranteed to be no more than 2 sentinels. - try: - for i in range(10): - if not outqueue._reader.poll(): - break - get() - except (IOError, EOFError): - pass - - debug('result handler exiting: len(cache)=%s, thread._state=%s', - len(cache), thread._state) - - @staticmethod - def _get_tasks(func, it, size): - it = iter(it) - while 1: - x = tuple(itertools.islice(it, size)) - if not x: - return - yield (func, x) - - def __reduce__(self): - raise NotImplementedError( - 'pool objects cannot be passed between processes or pickled' - ) - - def close(self): - debug('closing pool') - if self._state == RUN: - self._state = CLOSE - self._taskqueue.put(None) - - def terminate(self): - debug('terminating pool') - self._state = TERMINATE - self._terminate() - - def join(self): - debug('joining pool') - assert self._state in (CLOSE, TERMINATE) - self._task_handler.join() - self._result_handler.join() - for p in self._pool: - p.join() - - @staticmethod - def _help_stuff_finish(inqueue, task_handler, size): - # task_handler may be blocked trying to put items on inqueue - debug('removing tasks from inqueue until task handler finished') - inqueue._rlock.acquire() - while task_handler.isAlive() and inqueue._reader.poll(): - inqueue._reader.recv() - time.sleep(0) - - @classmethod - def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, - task_handler, result_handler, cache): - # this is guaranteed to only be called once - debug('finalizing pool') - - task_handler._state = TERMINATE - taskqueue.put(None) # sentinel - - debug('helping task handler/workers to finish') - cls._help_stuff_finish(inqueue, task_handler, len(pool)) - - assert result_handler.isAlive() or len(cache) == 0 - - result_handler._state = TERMINATE - outqueue.put(None) # sentinel - - if pool and hasattr(pool[0], 'terminate'): - debug('terminating workers') - for p in pool: - p.terminate() - - debug('joining task handler') - task_handler.join(1e100) - - debug('joining result handler') - result_handler.join(1e100) - - if pool and hasattr(pool[0], 'terminate'): - debug('joining pool workers') - for p in pool: - p.join() - -# -# Class whose instances are returned by `Pool.apply_async()` -# - -class ApplyResult(object): - - def __init__(self, cache, callback): - self._cond = threading.Condition(threading.Lock()) - self._job = job_counter.next() - self._cache = cache - self._ready = False - self._callback = callback - cache[self._job] = self - - def ready(self): - return self._ready - - def successful(self): - assert self._ready - return self._success - - def wait(self, timeout=None): - self._cond.acquire() - try: - if not self._ready: - self._cond.wait(timeout) - finally: - self._cond.release() - - def get(self, timeout=None): - self.wait(timeout) - if not self._ready: - raise TimeoutError - if self._success: - return self._value - else: - raise self._value - - def _set(self, i, obj): - self._success, self._value = obj - if self._callback and self._success: - self._callback(self._value) - self._cond.acquire() - try: - self._ready = True - self._cond.notify() - finally: - self._cond.release() - del self._cache[self._job] - -# -# Class whose instances are returned by `Pool.map_async()` -# - -class MapResult(ApplyResult): - - def __init__(self, cache, chunksize, length, callback): - ApplyResult.__init__(self, cache, callback) - self._success = True - self._value = [None] * length - self._chunksize = chunksize - if chunksize <= 0: - self._number_left = 0 - self._ready = True - else: - self._number_left = length//chunksize + bool(length % chunksize) - - def _set(self, i, success_result): - success, result = success_result - if success: - self._value[i*self._chunksize:(i+1)*self._chunksize] = result - self._number_left -= 1 - if self._number_left == 0: - if self._callback: - self._callback(self._value) - del self._cache[self._job] - self._cond.acquire() - try: - self._ready = True - self._cond.notify() - finally: - self._cond.release() - - else: - self._success = False - self._value = result - del self._cache[self._job] - self._cond.acquire() - try: - self._ready = True - self._cond.notify() - finally: - self._cond.release() - -# -# Class whose instances are returned by `Pool.imap()` -# - -class IMapIterator(object): - - def __init__(self, cache): - self._cond = threading.Condition(threading.Lock()) - self._job = job_counter.next() - self._cache = cache - self._items = collections.deque() - self._index = 0 - self._length = None - self._unsorted = {} - cache[self._job] = self - - def __iter__(self): - return self - - def next(self, timeout=None): - self._cond.acquire() - try: - try: - item = self._items.popleft() - except IndexError: - if self._index == self._length: - raise StopIteration - self._cond.wait(timeout) - try: - item = self._items.popleft() - except IndexError: - if self._index == self._length: - raise StopIteration - raise TimeoutError - finally: - self._cond.release() - - success, value = item - if success: - return value - raise value - - __next__ = next # XXX - - def _set(self, i, obj): - self._cond.acquire() - try: - if self._index == i: - self._items.append(obj) - self._index += 1 - while self._index in self._unsorted: - obj = self._unsorted.pop(self._index) - self._items.append(obj) - self._index += 1 - self._cond.notify() - else: - self._unsorted[i] = obj - - if self._index == self._length: - del self._cache[self._job] - finally: - self._cond.release() - - def _set_length(self, length): - self._cond.acquire() - try: - self._length = length - if self._index == self._length: - self._cond.notify() - del self._cache[self._job] - finally: - self._cond.release() - -# -# Class whose instances are returned by `Pool.imap_unordered()` -# - -class IMapUnorderedIterator(IMapIterator): - - def _set(self, i, obj): - self._cond.acquire() - try: - self._items.append(obj) - self._index += 1 - self._cond.notify() - if self._index == self._length: - del self._cache[self._job] - finally: - self._cond.release() - -# -# -# - -class ThreadPool(Pool): - - from .dummy import Process - - def __init__(self, processes=None, initializer=None, initargs=()): - Pool.__init__(self, processes, initializer, initargs) - - def _setup_queues(self): - self._inqueue = Queue.Queue() - self._outqueue = Queue.Queue() - self._quick_put = self._inqueue.put - self._quick_get = self._outqueue.get - - @staticmethod - def _help_stuff_finish(inqueue, task_handler, size): - # put sentinels at head of inqueue to make workers finish - inqueue.not_empty.acquire() - try: - inqueue.queue.clear() - inqueue.queue.extend([None] * size) - inqueue.not_empty.notifyAll() - finally: - inqueue.not_empty.release() +# +# Module providing the `Pool` class for managing a process pool +# +# multiprocessing/pool.py +# +# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt +# + +__all__ = ['Pool'] + +# +# Imports +# + +import threading +import queue +import itertools +import collections +import time + +from multiprocessing import Process, cpu_count, TimeoutError +from multiprocessing.util import Finalize, debug + +# +# Constants representing the state of a pool +# + +RUN = 0 +CLOSE = 1 +TERMINATE = 2 + +# +# Miscellaneous +# + +job_counter = itertools.count() + +def mapstar(args): + return list(map(*args)) + +# +# Code run by worker processes +# + +def worker(inqueue, outqueue, initializer=None, initargs=()): + put = outqueue.put + get = inqueue.get + if hasattr(inqueue, '_writer'): + inqueue._writer.close() + outqueue._reader.close() + + if initializer is not None: + initializer(*initargs) + + while 1: + try: + task = get() + except (EOFError, IOError): + debug('worker got EOFError or IOError -- exiting') + break + + if task is None: + debug('worker got sentinel -- exiting') + break + + job, i, func, args, kwds = task + try: + result = (True, func(*args, **kwds)) + except Exception as e: + result = (False, e) + put((job, i, result)) + +# +# Class representing a process pool +# + +class Pool(object): + ''' + Class which supports an async version of the `apply()` builtin + ''' + Process = Process + + def __init__(self, processes=None, initializer=None, initargs=()): + self._setup_queues() + self._taskqueue = queue.Queue() + self._cache = {} + self._state = RUN + + if processes is None: + try: + processes = cpu_count() + except NotImplementedError: + processes = 1 + + self._pool = [] + for i in range(processes): + w = self.Process( + target=worker, + args=(self._inqueue, self._outqueue, initializer, initargs) + ) + self._pool.append(w) + w.set_name(w.get_name().replace('Process', 'PoolWorker')) + w.set_daemon(True) + w.start() + + self._task_handler = threading.Thread( + target=Pool._handle_tasks, + args=(self._taskqueue, self._quick_put, self._outqueue, self._pool) + ) + self._task_handler.setDaemon(True) + self._task_handler._state = RUN + self._task_handler.start() + + self._result_handler = threading.Thread( + target=Pool._handle_results, + args=(self._outqueue, self._quick_get, self._cache) + ) + self._result_handler.setDaemon(True) + self._result_handler._state = RUN + self._result_handler.start() + + self._terminate = Finalize( + self, self._terminate_pool, + args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, + self._task_handler, self._result_handler, self._cache), + exitpriority=15 + ) + + def _setup_queues(self): + from .queues import SimpleQueue + self._inqueue = SimpleQueue() + self._outqueue = SimpleQueue() + self._quick_put = self._inqueue._writer.send + self._quick_get = self._outqueue._reader.recv + + def apply(self, func, args=(), kwds={}): + ''' + Equivalent of `apply()` builtin + ''' + assert self._state == RUN + return self.apply_async(func, args, kwds).get() + + def map(self, func, iterable, chunksize=None): + ''' + Equivalent of `map()` builtin + ''' + assert self._state == RUN + return self.map_async(func, iterable, chunksize).get() + + def imap(self, func, iterable, chunksize=1): + ''' + Equivalent of `itertool.imap()` -- can be MUCH slower than `Pool.map()` + ''' + assert self._state == RUN + if chunksize == 1: + result = IMapIterator(self._cache) + self._taskqueue.put((((result._job, i, func, (x,), {}) + for i, x in enumerate(iterable)), result._set_length)) + return result + else: + assert chunksize > 1 + task_batches = Pool._get_tasks(func, iterable, chunksize) + result = IMapIterator(self._cache) + self._taskqueue.put((((result._job, i, mapstar, (x,), {}) + for i, x in enumerate(task_batches)), result._set_length)) + return (item for chunk in result for item in chunk) + + def imap_unordered(self, func, iterable, chunksize=1): + ''' + Like `imap()` method but ordering of results is arbitrary + ''' + assert self._state == RUN + if chunksize == 1: + result = IMapUnorderedIterator(self._cache) + self._taskqueue.put((((result._job, i, func, (x,), {}) + for i, x in enumerate(iterable)), result._set_length)) + return result + else: + assert chunksize > 1 + task_batches = Pool._get_tasks(func, iterable, chunksize) + result = IMapUnorderedIterator(self._cache) + self._taskqueue.put((((result._job, i, mapstar, (x,), {}) + for i, x in enumerate(task_batches)), result._set_length)) + return (item for chunk in result for item in chunk) + + def apply_async(self, func, args=(), kwds={}, callback=None): + ''' + Asynchronous equivalent of `apply()` builtin + ''' + assert self._state == RUN + result = ApplyResult(self._cache, callback) + self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) + return result + + def map_async(self, func, iterable, chunksize=None, callback=None): + ''' + Asynchronous equivalent of `map()` builtin + ''' + assert self._state == RUN + if not hasattr(iterable, '__len__'): + iterable = list(iterable) + + if chunksize is None: + chunksize, extra = divmod(len(iterable), len(self._pool) * 4) + if extra: + chunksize += 1 + + task_batches = Pool._get_tasks(func, iterable, chunksize) + result = MapResult(self._cache, chunksize, len(iterable), callback) + self._taskqueue.put((((result._job, i, mapstar, (x,), {}) + for i, x in enumerate(task_batches)), None)) + return result + + @staticmethod + def _handle_tasks(taskqueue, put, outqueue, pool): + thread = threading.currentThread() + + for taskseq, set_length in iter(taskqueue.get, None): + i = -1 + for i, task in enumerate(taskseq): + if thread._state: + debug('task handler found thread._state != RUN') + break + try: + put(task) + except IOError: + debug('could not put task on queue') + break + else: + if set_length: + debug('doing set_length()') + set_length(i+1) + continue + break + else: + debug('task handler got sentinel') + + + try: + # tell result handler to finish when cache is empty + debug('task handler sending sentinel to result handler') + outqueue.put(None) + + # tell workers there is no more work + debug('task handler sending sentinel to workers') + for p in pool: + put(None) + except IOError: + debug('task handler got IOError when sending sentinels') + + debug('task handler exiting') + + @staticmethod + def _handle_results(outqueue, get, cache): + thread = threading.currentThread() + + while 1: + try: + task = get() + except (IOError, EOFError): + debug('result handler got EOFError/IOError -- exiting') + return + + if thread._state: + assert thread._state == TERMINATE + debug('result handler found thread._state=TERMINATE') + break + + if task is None: + debug('result handler got sentinel') + break + + job, i, obj = task + try: + cache[job]._set(i, obj) + except KeyError: + pass + + while cache and thread._state != TERMINATE: + try: + task = get() + except (IOError, EOFError): + debug('result handler got EOFError/IOError -- exiting') + return + + if task is None: + debug('result handler ignoring extra sentinel') + continue + job, i, obj = task + try: + cache[job]._set(i, obj) + except KeyError: + pass + + if hasattr(outqueue, '_reader'): + debug('ensuring that outqueue is not full') + # If we don't make room available in outqueue then + # attempts to add the sentinel (None) to outqueue may + # block. There is guaranteed to be no more than 2 sentinels. + try: + for i in range(10): + if not outqueue._reader.poll(): + break + get() + except (IOError, EOFError): + pass + + debug('result handler exiting: len(cache)=%s, thread._state=%s', + len(cache), thread._state) + + @staticmethod + def _get_tasks(func, it, size): + it = iter(it) + while 1: + x = tuple(itertools.islice(it, size)) + if not x: + return + yield (func, x) + + def __reduce__(self): + raise NotImplementedError( + 'pool objects cannot be passed between processes or pickled' + ) + + def close(self): + debug('closing pool') + if self._state == RUN: + self._state = CLOSE + self._taskqueue.put(None) + + def terminate(self): + debug('terminating pool') + self._state = TERMINATE + self._terminate() + + def join(self): + debug('joining pool') + assert self._state in (CLOSE, TERMINATE) + self._task_handler.join() + self._result_handler.join() + for p in self._pool: + p.join() + + @staticmethod + def _help_stuff_finish(inqueue, task_handler, size): + # task_handler may be blocked trying to put items on inqueue + debug('removing tasks from inqueue until task handler finished') + inqueue._rlock.acquire() + while task_handler.isAlive() and inqueue._reader.poll(): + inqueue._reader.recv() + time.sleep(0) + + @classmethod + def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, + task_handler, result_handler, cache): + # this is guaranteed to only be called once + debug('finalizing pool') + + task_handler._state = TERMINATE + taskqueue.put(None) # sentinel + + debug('helping task handler/workers to finish') + cls._help_stuff_finish(inqueue, task_handler, len(pool)) + + assert result_handler.isAlive() or len(cache) == 0 + + result_handler._state = TERMINATE + outqueue.put(None) # sentinel + + if pool and hasattr(pool[0], 'terminate'): + debug('terminating workers') + for p in pool: + p.terminate() + + debug('joining task handler') + task_handler.join(1e100) + + debug('joining result handler') + result_handler.join(1e100) + + if pool and hasattr(pool[0], 'terminate'): + debug('joining pool workers') + for p in pool: + p.join() + +# +# Class whose instances are returned by `Pool.apply_async()` +# + +class ApplyResult(object): + + def __init__(self, cache, callback): + self._cond = threading.Condition(threading.Lock()) + self._job = next(job_counter) + self._cache = cache + self._ready = False + self._callback = callback + cache[self._job] = self + + def ready(self): + return self._ready + + def successful(self): + assert self._ready + return self._success + + def wait(self, timeout=None): + self._cond.acquire() + try: + if not self._ready: + self._cond.wait(timeout) + finally: + self._cond.release() + + def get(self, timeout=None): + self.wait(timeout) + if not self._ready: + raise TimeoutError + if self._success: + return self._value + else: + raise self._value + + def _set(self, i, obj): + self._success, self._value = obj + if self._callback and self._success: + self._callback(self._value) + self._cond.acquire() + try: + self._ready = True + self._cond.notify() + finally: + self._cond.release() + del self._cache[self._job] + +# +# Class whose instances are returned by `Pool.map_async()` +# + +class MapResult(ApplyResult): + + def __init__(self, cache, chunksize, length, callback): + ApplyResult.__init__(self, cache, callback) + self._success = True + self._value = [None] * length + self._chunksize = chunksize + if chunksize <= 0: + self._number_left = 0 + self._ready = True + else: + self._number_left = length//chunksize + bool(length % chunksize) + + def _set(self, i, success_result): + success, result = success_result + if success: + self._value[i*self._chunksize:(i+1)*self._chunksize] = result + self._number_left -= 1 + if self._number_left == 0: + if self._callback: + self._callback(self._value) + del self._cache[self._job] + self._cond.acquire() + try: + self._ready = True + self._cond.notify() + finally: + self._cond.release() + + else: + self._success = False + self._value = result + del self._cache[self._job] + self._cond.acquire() + try: + self._ready = True + self._cond.notify() + finally: + self._cond.release() + +# +# Class whose instances are returned by `Pool.imap()` +# + +class IMapIterator(object): + + def __init__(self, cache): + self._cond = threading.Condition(threading.Lock()) + self._job = next(job_counter) + self._cache = cache + self._items = collections.deque() + self._index = 0 + self._length = None + self._unsorted = {} + cache[self._job] = self + + def __iter__(self): + return self + + def next(self, timeout=None): + self._cond.acquire() + try: + try: + item = self._items.popleft() + except IndexError: + if self._index == self._length: + raise StopIteration + self._cond.wait(timeout) + try: + item = self._items.popleft() + except IndexError: + if self._index == self._length: + raise StopIteration + raise TimeoutError + finally: + self._cond.release() + + success, value = item + if success: + return value + raise value + + __next__ = next # XXX + + def _set(self, i, obj): + self._cond.acquire() + try: + if self._index == i: + self._items.append(obj) + self._index += 1 + while self._index in self._unsorted: + obj = self._unsorted.pop(self._index) + self._items.append(obj) + self._index += 1 + self._cond.notify() + else: + self._unsorted[i] = obj + + if self._index == self._length: + del self._cache[self._job] + finally: + self._cond.release() + + def _set_length(self, length): + self._cond.acquire() + try: + self._length = length + if self._index == self._length: + self._cond.notify() + del self._cache[self._job] + finally: + self._cond.release() + +# +# Class whose instances are returned by `Pool.imap_unordered()` +# + +class IMapUnorderedIterator(IMapIterator): + + def _set(self, i, obj): + self._cond.acquire() + try: + self._items.append(obj) + self._index += 1 + self._cond.notify() + if self._index == self._length: + del self._cache[self._job] + finally: + self._cond.release() + +# +# +# + +class ThreadPool(Pool): + + from .dummy import Process + + def __init__(self, processes=None, initializer=None, initargs=()): + Pool.__init__(self, processes, initializer, initargs) + + def _setup_queues(self): + self._inqueue = queue.Queue() + self._outqueue = queue.Queue() + self._quick_put = self._inqueue.put + self._quick_get = self._outqueue.get + + @staticmethod + def _help_stuff_finish(inqueue, task_handler, size): + # put sentinels at head of inqueue to make workers finish + inqueue.not_empty.acquire() + try: + inqueue.queue.clear() + inqueue.queue.extend([None] * size) + inqueue.not_empty.notifyAll() + finally: + inqueue.not_empty.release() Index: Lib/multiprocessing/synchronize.py =================================================================== --- Lib/multiprocessing/synchronize.py (revision 64104) +++ Lib/multiprocessing/synchronize.py (working copy) @@ -1,294 +1,294 @@ -# -# Module implementing synchronization primitives -# -# multiprocessing/synchronize.py -# -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt -# - -__all__ = [ - 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' - ] - -import threading -import os -import sys - -from time import time as _time, sleep as _sleep - -import _multiprocessing -from multiprocessing.process import current_process -from multiprocessing.util import Finalize, register_after_fork, debug -from multiprocessing.forking import assert_spawning, Popen - -# -# Constants -# - -RECURSIVE_MUTEX, SEMAPHORE = range(2) -SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX - -# -# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` -# - -class SemLock(object): - - def __init__(self, kind, value, maxvalue): - sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) - debug('created semlock with handle %s' % sl.handle) - self._make_methods() - - if sys.platform != 'win32': - def _after_fork(obj): - obj._semlock._after_fork() - register_after_fork(self, _after_fork) - - def _make_methods(self): - self.acquire = self._semlock.acquire - self.release = self._semlock.release - self.__enter__ = self._semlock.__enter__ - self.__exit__ = self._semlock.__exit__ - - def __getstate__(self): - assert_spawning(self) - sl = self._semlock - return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) - - def __setstate__(self, state): - self._semlock = _multiprocessing.SemLock._rebuild(*state) - debug('recreated blocker with handle %r' % state[0]) - self._make_methods() - -# -# Semaphore -# - -class Semaphore(SemLock): - - def __init__(self, value=1): - SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX) - - def get_value(self): - return self._semlock._get_value() - - def __repr__(self): - try: - value = self._semlock._get_value() - except Exception: - value = 'unknown' - return '<semaphore(value=%s)>' % value - -# -# Bounded semaphore -# - -class BoundedSemaphore(Semaphore): - - def __init__(self, value=1): - SemLock.__init__(self, SEMAPHORE, value, value) - - def __repr__(self): - try: - value = self._semlock._get_value() - except Exception: - value = 'unknown' - return '<boundedsemaphore(value=%s, maxvalue="%s)">' % \ - (value, self._semlock.maxvalue) - -# -# Non-recursive lock -# - -class Lock(SemLock): - - def __init__(self): - SemLock.__init__(self, SEMAPHORE, 1, 1) - - def __repr__(self): - try: - if self._semlock._is_mine(): - name = current_process().get_name() - if threading.currentThread().getName() != 'MainThread': - name += '|' + threading.currentThread().getName() - elif self._semlock._get_value() == 1: - name = 'None' - elif self._semlock._count() > 0: - name = 'SomeOtherThread' - else: - name = 'SomeOtherProcess' - except Exception: - name = 'unknown' - return '<lock(owner=%s)>' % name - -# -# Recursive lock -# - -class RLock(SemLock): - - def __init__(self): - SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1) - - def __repr__(self): - try: - if self._semlock._is_mine(): - name = current_process().get_name() - if threading.currentThread().getName() != 'MainThread': - name += '|' + threading.currentThread().getName() - count = self._semlock._count() - elif self._semlock._get_value() == 1: - name, count = 'None', 0 - elif self._semlock._count() > 0: - name, count = 'SomeOtherThread', 'nonzero' - else: - name, count = 'SomeOtherProcess', 'nonzero' - except Exception: - name, count = 'unknown', 'unknown' - return '<rlock(%s, %s)="">' % (name, count) - -# -# Condition variable -# - -class Condition(object): - - def __init__(self, lock=None): - self._lock = lock or RLock() - self._sleeping_count = Semaphore(0) - self._woken_count = Semaphore(0) - self._wait_semaphore = Semaphore(0) - self._make_methods() - - def __getstate__(self): - assert_spawning(self) - return (self._lock, self._sleeping_count, - self._woken_count, self._wait_semaphore) - - def __setstate__(self, state): - (self._lock, self._sleeping_count, - self._woken_count, self._wait_semaphore) = state - self._make_methods() - - def _make_methods(self): - self.acquire = self._lock.acquire - self.release = self._lock.release - self.__enter__ = self._lock.__enter__ - self.__exit__ = self._lock.__exit__ - - def __repr__(self): - try: - num_waiters = (self._sleeping_count._semlock._get_value() - - self._woken_count._semlock._get_value()) - except Exception: - num_waiters = 'unkown' - return '<condition(%s, %s)="">' % (self._lock, num_waiters) - - def wait(self, timeout=None): - assert self._lock._semlock._is_mine(), \ - 'must acquire() condition before using wait()' - - # indicate that this thread is going to sleep - self._sleeping_count.release() - - # release lock - count = self._lock._semlock._count() - for i in xrange(count): - self._lock.release() - - try: - # wait for notification or timeout - self._wait_semaphore.acquire(True, timeout) - finally: - # indicate that this thread has woken - self._woken_count.release() - - # reacquire lock - for i in xrange(count): - self._lock.acquire() - - def notify(self): - assert self._lock._semlock._is_mine(), 'lock is not owned' - assert not self._wait_semaphore.acquire(False) - - # to take account of timeouts since last notify() we subtract - # woken_count from sleeping_count and rezero woken_count - while self._woken_count.acquire(False): - res = self._sleeping_count.acquire(False) - assert res - - if self._sleeping_count.acquire(False): # try grabbing a sleeper - self._wait_semaphore.release() # wake up one sleeper - self._woken_count.acquire() # wait for the sleeper to wake - - # rezero _wait_semaphore in case a timeout just happened - self._wait_semaphore.acquire(False) - - def notify_all(self): - assert self._lock._semlock._is_mine(), 'lock is not owned' - assert not self._wait_semaphore.acquire(False) - - # to take account of timeouts since last notify*() we subtract - # woken_count from sleeping_count and rezero woken_count - while self._woken_count.acquire(False): - res = self._sleeping_count.acquire(False) - assert res - - sleepers = 0 - while self._sleeping_count.acquire(False): - self._wait_semaphore.release() # wake up one sleeper - sleepers += 1 - - if sleepers: - for i in xrange(sleepers): - self._woken_count.acquire() # wait for a sleeper to wake - - # rezero wait_semaphore in case some timeouts just happened - while self._wait_semaphore.acquire(False): - pass - -# -# Event -# - -class Event(object): - - def __init__(self): - self._cond = Condition(Lock()) - self._flag = Semaphore(0) - - def is_set(self): - self._cond.acquire() - try: - if self._flag.acquire(False): - self._flag.release() - return True - return False - finally: - self._cond.release() - - def set(self): - self._cond.acquire() - try: - self._flag.acquire(False) - self._flag.release() - self._cond.notify_all() - finally: - self._cond.release() - - def clear(self): - self._cond.acquire() - try: - self._flag.acquire(False) - finally: - self._cond.release() - - def wait(self, timeout=None): - self._cond.acquire() - try: - if self._flag.acquire(False): - self._flag.release() - else: - self._cond.wait(timeout) - finally: - self._cond.release() +# +# Module implementing synchronization primitives +# +# multiprocessing/synchronize.py +# +# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# + +__all__ = [ + 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' + ] + +import threading +import os +import sys + +from time import time as _time, sleep as _sleep + +import _multiprocessing +from multiprocessing.process import current_process +from multiprocessing.util import Finalize, register_after_fork, debug +from multiprocessing.forking import assert_spawning, Popen + +# +# Constants +# + +RECURSIVE_MUTEX, SEMAPHORE = list(range(2)) +SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX + +# +# Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` +# + +class SemLock(object): + + def __init__(self, kind, value, maxvalue): + sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) + debug('created semlock with handle %s' % sl.handle) + self._make_methods() + + if sys.platform != 'win32': + def _after_fork(obj): + obj._semlock._after_fork() + register_after_fork(self, _after_fork) + + def _make_methods(self): + self.acquire = self._semlock.acquire + self.release = self._semlock.release + self.__enter__ = self._semlock.__enter__ + self.__exit__ = self._semlock.__exit__ + + def __getstate__(self): + assert_spawning(self) + sl = self._semlock + return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) + + def __setstate__(self, state): + self._semlock = _multiprocessing.SemLock._rebuild(*state) + debug('recreated blocker with handle %r' % state[0]) + self._make_methods() + +# +# Semaphore +# + +class Semaphore(SemLock): + + def __init__(self, value=1): + SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX) + + def get_value(self): + return self._semlock._get_value() + + def __repr__(self): + try: + value = self._semlock._get_value() + except Exception: + value = 'unknown' + return '<semaphore(value=%s)>' % value + +# +# Bounded semaphore +# + +class BoundedSemaphore(Semaphore): + + def __init__(self, value=1): + SemLock.__init__(self, SEMAPHORE, value, value) + + def __repr__(self): + try: + value = self._semlock._get_value() + except Exception: + value = 'unknown' + return '<boundedsemaphore(value=%s, maxvalue="%s)">' % \ + (value, self._semlock.maxvalue) + +# +# Non-recursive lock +# + +class Lock(SemLock): + + def __init__(self): + SemLock.__init__(self, SEMAPHORE, 1, 1) + + def __repr__(self): + try: + if self._semlock._is_mine(): + name = current_process().get_name() + if threading.currentThread().getName() != 'MainThread': + name += '|' + threading.currentThread().getName() + elif self._semlock._get_value() == 1: + name = 'None' + elif self._semlock._count() > 0: + name = 'SomeOtherThread' + else: + name = 'SomeOtherProcess' + except Exception: + name = 'unknown' + return '<lock(owner=%s)>' % name + +# +# Recursive lock +# + +class RLock(SemLock): + + def __init__(self): + SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1) + + def __repr__(self): + try: + if self._semlock._is_mine(): + name = current_process().get_name() + if threading.currentThread().getName() != 'MainThread': + name += '|' + threading.currentThread().getName() + count = self._semlock._count() + elif self._semlock._get_value() == 1: + name, count = 'None', 0 + elif self._semlock._count() > 0: + name, count = 'SomeOtherThread', 'nonzero' + else: + name, count = 'SomeOtherProcess', 'nonzero' + except Exception: + name, count = 'unknown', 'unknown' + return '<rlock(%s, %s)="">' % (name, count) + +# +# Condition variable +# + +class Condition(object): + + def __init__(self, lock=None): + self._lock = lock or RLock() + self._sleeping_count = Semaphore(0) + self._woken_count = Semaphore(0) + self._wait_semaphore = Semaphore(0) + self._make_methods() + + def __getstate__(self): + assert_spawning(self) + return (self._lock, self._sleeping_count, + self._woken_count, self._wait_semaphore) + + def __setstate__(self, state): + (self._lock, self._sleeping_count, + self._woken_count, self._wait_semaphore) = state + self._make_methods() + + def _make_methods(self): + self.acquire = self._lock.acquire + self.release = self._lock.release + self.__enter__ = self._lock.__enter__ + self.__exit__ = self._lock.__exit__ + + def __repr__(self): + try: + num_waiters = (self._sleeping_count._semlock._get_value() - + self._woken_count._semlock._get_value()) + except Exception: + num_waiters = 'unkown' + return '<condition(%s, %s)="">' % (self._lock, num_waiters) + + def wait(self, timeout=None): + assert self._lock._semlock._is_mine(), \ + 'must acquire() condition before using wait()' + + # indicate that this thread is going to sleep + self._sleeping_count.release() + + # release lock + count = self._lock._semlock._count() + for i in range(count): + self._lock.release() + + try: + # wait for notification or timeout + self._wait_semaphore.acquire(True, timeout) + finally: + # indicate that this thread has woken + self._woken_count.release() + + # reacquire lock + for i in range(count): + self._lock.acquire() + + def notify(self): + assert self._lock._semlock._is_mine(), 'lock is not owned' + assert not self._wait_semaphore.acquire(False) + + # to take account of timeouts since last notify() we subtract + # woken_count from sleeping_count and rezero woken_count + while self._woken_count.acquire(False): + res = self._sleeping_count.acquire(False) + assert res + + if self._sleeping_count.acquire(False): # try grabbing a sleeper + self._wait_semaphore.release() # wake up one sleeper + self._woken_count.acquire() # wait for the sleeper to wake + + # rezero _wait_semaphore in case a timeout just happened + self._wait_semaphore.acquire(False) + + def notify_all(self): + assert self._lock._semlock._is_mine(), 'lock is not owned' + assert not self._wait_semaphore.acquire(False) + + # to take account of timeouts since last notify*() we subtract + # woken_count from sleeping_count and rezero woken_count + while self._woken_count.acquire(False): + res = self._sleeping_count.acquire(False) + assert res + + sleepers = 0 + while self._sleeping_count.acquire(False): + self._wait_semaphore.release() # wake up one sleeper + sleepers += 1 + + if sleepers: + for i in range(sleepers): + self._woken_count.acquire() # wait for a sleeper to wake + + # rezero wait_semaphore in case some timeouts just happened + while self._wait_semaphore.acquire(False): + pass + +# +# Event +# + +class Event(object): + + def __init__(self): + self._cond = Condition(Lock()) + self._flag = Semaphore(0) + + def is_set(self): + self._cond.acquire() + try: + if self._flag.acquire(False): + self._flag.release() + return True + return False + finally: + self._cond.release() + + def set(self): + self._cond.acquire() + try: + self._flag.acquire(False) + self._flag.release() + self._cond.notify_all() + finally: + self._cond.release() + + def clear(self): + self._cond.acquire() + try: + self._flag.acquire(False) + finally: + self._cond.release() + + def wait(self, timeout=None): + self._cond.acquire() + try: + if self._flag.acquire(False): + self._flag.release() + else: + self._cond.wait(timeout) + finally: + self._cond.release() Index: Lib/multiprocessing/reduction.py =================================================================== --- Lib/multiprocessing/reduction.py (revision 64104) +++ Lib/multiprocessing/reduction.py (working copy) @@ -1,190 +1,190 @@ -# -# Module to allow connection and socket objects to be transferred -# between processes -# -# multiprocessing/reduction.py -# -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt -# - -__all__ = [] - -import os -import sys -import socket -import threading -import copy_reg - -import _multiprocessing -from multiprocessing import current_process -from multiprocessing.forking import Popen, duplicate, close -from multiprocessing.util import register_after_fork, debug, sub_debug -from multiprocessing.connection import Client, Listener - - -# -# -# - -if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): - raise ImportError('pickling of connections not supported') - -# -# Platform specific definitions -# - -if sys.platform == 'win32': - import _subprocess - from ._multiprocessing import win32 - - def send_handle(conn, handle, destination_pid): - process_handle = win32.OpenProcess( - win32.PROCESS_ALL_ACCESS, False, destination_pid - ) - try: - new_handle = duplicate(handle, process_handle) - conn.send(new_handle) - finally: - close(process_handle) - - def recv_handle(conn): - return conn.recv() - -else: - def send_handle(conn, handle, destination_pid): - _multiprocessing.sendfd(conn.fileno(), handle) - - def recv_handle(conn): - return _multiprocessing.recvfd(conn.fileno()) - -# -# Support for a per-process server thread which caches pickled handles -# - -_cache = set() - -def _reset(obj): - global _lock, _listener, _cache - for h in _cache: - close(h) - _cache.clear() - _lock = threading.Lock() - _listener = None - -_reset(None) -register_after_fork(_reset, _reset) - -def _get_listener(): - global _listener - - if _listener is None: - _lock.acquire() - try: - if _listener is None: - debug('starting listener and thread for sending handles') - _listener = Listener(authkey=current_process().get_authkey()) - t = threading.Thread(target=_serve) - t.setDaemon(True) - t.start() - finally: - _lock.release() - - return _listener - -def _serve(): - from .util import is_exiting, sub_warning - - while 1: - try: - conn = _listener.accept() - handle_wanted, destination_pid = conn.recv() - _cache.remove(handle_wanted) - send_handle(conn, handle_wanted, destination_pid) - close(handle_wanted) - conn.close() - except: - if not is_exiting(): - import traceback - sub_warning( - 'thread for sharing handles raised exception :\n' + - '-'*79 + '\n' + traceback.format_exc() + '-'*79 - ) - -# -# Functions to be used for pickling/unpickling objects with handles -# - -def reduce_handle(handle): - if Popen.thread_is_spawning(): - return (None, Popen.duplicate_for_child(handle), True) - dup_handle = duplicate(handle) - _cache.add(dup_handle) - sub_debug('reducing handle %d', handle) - return (_get_listener().address, dup_handle, False) - -def rebuild_handle(pickled_data): - address, handle, inherited = pickled_data - if inherited: - return handle - sub_debug('rebuilding handle %d', handle) - conn = Client(address, authkey=current_process().get_authkey()) - conn.send((handle, os.getpid())) - new_handle = recv_handle(conn) - conn.close() - return new_handle - -# -# Register `_multiprocessing.Connection` with `copy_reg` -# - -def reduce_connection(conn): - rh = reduce_handle(conn.fileno()) - return rebuild_connection, (rh, conn.readable, conn.writable) - -def rebuild_connection(reduced_handle, readable, writable): - handle = rebuild_handle(reduced_handle) - return _multiprocessing.Connection( - handle, readable=readable, writable=writable - ) - -copy_reg.pickle(_multiprocessing.Connection, reduce_connection) - -# -# Register `socket.socket` with `copy_reg` -# - -def fromfd(fd, family, type_, proto=0): - s = socket.fromfd(fd, family, type_, proto) - if s.__class__ is not socket.socket: - s = socket.socket(_sock=s) - return s - -def reduce_socket(s): - reduced_handle = reduce_handle(s.fileno()) - return rebuild_socket, (reduced_handle, s.family, s.type, s.proto) - -def rebuild_socket(reduced_handle, family, type_, proto): - fd = rebuild_handle(reduced_handle) - _sock = fromfd(fd, family, type_, proto) - close(fd) - return _sock - -copy_reg.pickle(socket.socket, reduce_socket) - -# -# Register `_multiprocessing.PipeConnection` with `copy_reg` -# - -if sys.platform == 'win32': - - def reduce_pipe_connection(conn): - rh = reduce_handle(conn.fileno()) - return rebuild_pipe_connection, (rh, conn.readable, conn.writable) - - def rebuild_pipe_connection(reduced_handle, readable, writable): - handle = rebuild_handle(reduced_handle) - return _multiprocessing.PipeConnection( - handle, readable=readable, writable=writable - ) - - copy_reg.pickle(_multiprocessing.PipeConnection, reduce_pipe_connection) +# +# Module to allow connection and socket objects to be transferred +# between processes +# +# multiprocessing/reduction.py +# +# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# + +__all__ = [] + +import os +import sys +import socket +import threading +import copyreg + +import _multiprocessing +from multiprocessing import current_process +from multiprocessing.forking import Popen, duplicate, close +from multiprocessing.util import register_after_fork, debug, sub_debug +from multiprocessing.connection import Client, Listener + + +# +# +# + +if not(sys.platform == 'win32' or hasattr(_multiprocessing, 'recvfd')): + raise ImportError('pickling of connections not supported') + +# +# Platform specific definitions +# + +if sys.platform == 'win32': + import _subprocess + from ._multiprocessing import win32 + + def send_handle(conn, handle, destination_pid): + process_handle = win32.OpenProcess( + win32.PROCESS_ALL_ACCESS, False, destination_pid + ) + try: + new_handle = duplicate(handle, process_handle) + conn.send(new_handle) + finally: + close(process_handle) + + def recv_handle(conn): + return conn.recv() + +else: + def send_handle(conn, handle, destination_pid): + _multiprocessing.sendfd(conn.fileno(), handle) + + def recv_handle(conn): + return _multiprocessing.recvfd(conn.fileno()) + +# +# Support for a per-process server thread which caches pickled handles +# + +_cache = set() + +def _reset(obj): + global _lock, _listener, _cache + for h in _cache: + close(h) + _cache.clear() + _lock = threading.Lock() + _listener = None + +_reset(None) +register_after_fork(_reset, _reset) + +def _get_listener(): + global _listener + + if _listener is None: + _lock.acquire() + try: + if _listener is None: + debug('starting listener and thread for sending handles') + _listener = Listener(authkey=current_process().get_authkey()) + t = threading.Thread(target=_serve) + t.setDaemon(True) + t.start() + finally: + _lock.release() + + return _listener + +def _serve(): + from .util import is_exiting, sub_warning + + while 1: + try: + conn = _listener.accept() + handle_wanted, destination_pid = conn.recv() + _cache.remove(handle_wanted) + send_handle(conn, handle_wanted, destination_pid) + close(handle_wanted) + conn.close() + except: + if not is_exiting(): + import traceback + sub_warning( + 'thread for sharing handles raised exception :\n' + + '-'*79 + '\n' + traceback.format_exc() + '-'*79 + ) + +# +# Functions to be used for pickling/unpickling objects with handles +# + +def reduce_handle(handle): + if Popen.thread_is_spawning(): + return (None, Popen.duplicate_for_child(handle), True) + dup_handle = duplicate(handle) + _cache.add(dup_handle) + sub_debug('reducing handle %d', handle) + return (_get_listener().address, dup_handle, False) + +def rebuild_handle(pickled_data): + address, handle, inherited = pickled_data + if inherited: + return handle + sub_debug('rebuilding handle %d', handle) + conn = Client(address, authkey=current_process().get_authkey()) + conn.send((handle, os.getpid())) + new_handle = recv_handle(conn) + conn.close() + return new_handle + +# +# Register `_multiprocessing.Connection` with `copy_reg` +# + +def reduce_connection(conn): + rh = reduce_handle(conn.fileno()) + return rebuild_connection, (rh, conn.readable, conn.writable) + +def rebuild_connection(reduced_handle, readable, writable): + handle = rebuild_handle(reduced_handle) + return _multiprocessing.Connection( + handle, readable=readable, writable=writable + ) + +copyreg.pickle(_multiprocessing.Connection, reduce_connection) + +# +# Register `socket.socket` with `copy_reg` +# + +def fromfd(fd, family, type_, proto=0): + s = socket.fromfd(fd, family, type_, proto) + if s.__class__ is not socket.socket: + s = socket.socket(_sock=s) + return s + +def reduce_socket(s): + reduced_handle = reduce_handle(s.fileno()) + return rebuild_socket, (reduced_handle, s.family, s.type, s.proto) + +def rebuild_socket(reduced_handle, family, type_, proto): + fd = rebuild_handle(reduced_handle) + _sock = fromfd(fd, family, type_, proto) + close(fd) + return _sock + +copyreg.pickle(socket.socket, reduce_socket) + +# +# Register `_multiprocessing.PipeConnection` with `copy_reg` +# + +if sys.platform == 'win32': + + def reduce_pipe_connection(conn): + rh = reduce_handle(conn.fileno()) + return rebuild_pipe_connection, (rh, conn.readable, conn.writable) + + def rebuild_pipe_connection(reduced_handle, readable, writable): + handle = rebuild_handle(reduced_handle) + return _multiprocessing.PipeConnection( + handle, readable=readable, writable=writable + ) + + copyreg.pickle(_multiprocessing.PipeConnection, reduce_pipe_connection) Index: Lib/multiprocessing/heap.py =================================================================== --- Lib/multiprocessing/heap.py (revision 64104) +++ Lib/multiprocessing/heap.py (working copy) @@ -34,7 +34,7 @@ def __init__(self, size): self.size = size - self.name = 'pym-%d-%d' % (os.getpid(), Arena._counter.next()) + self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter)) self.buffer = mmap.mmap(-1, self.size, tagname=self.name) assert win32.GetLastError() == 0, 'tagname already in use' self._state = (self.size, self.name) @@ -161,7 +161,7 @@ def malloc(self, size): # return a block of right size (possibly rounded up) - assert 0 <= size < sys.maxint + assert 0 <= size < sys.maxsize if os.getpid() != self._lastpid: self.__init__() # reinitialize after fork self._lock.acquire() @@ -186,7 +186,7 @@ _heap = Heap() def __init__(self, size): - assert 0 <= size < sys.maxint + assert 0 <= size < sys.maxsize block = BufferWrapper._heap.malloc(size) self._state = (block, size) Finalize(self, BufferWrapper._heap.free, args=(block,)) Index: Lib/multiprocessing/managers.py =================================================================== --- Lib/multiprocessing/managers.py (revision 64104) +++ Lib/multiprocessing/managers.py (working copy) @@ -1,1092 +1,1092 @@ -# -# Module providing the `SyncManager` class for dealing -# with shared objects -# -# multiprocessing/managers.py -# -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt -# - -__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] - -# -# Imports -# - -import os -import sys -import weakref -import threading -import array -import copy_reg -import Queue - -from traceback import format_exc -from multiprocessing import Process, current_process, active_children, Pool, util, connection -from multiprocessing.process import AuthenticationString -from multiprocessing.forking import exit, Popen, assert_spawning -from multiprocessing.util import Finalize, info - -try: - from cPickle import PicklingError -except ImportError: - from pickle import PicklingError - -# -# -# - -try: - bytes -except NameError: - bytes = str # XXX not needed in Py2.6 and Py3.0 - -# -# Register some things for pickling -# - -def reduce_array(a): - return array.array, (a.typecode, a.tostring()) -copy_reg.pickle(array.array, reduce_array) - -view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] -if view_types[0] is not list: # XXX only needed in Py3.0 - def rebuild_as_list(obj): - return list, (list(obj),) - for view_type in view_types: - copy_reg.pickle(view_type, rebuild_as_list) - -# -# Type for identifying shared objects -# - -class Token(object): - ''' - Type to uniquely indentify a shared object - ''' - __slots__ = ('typeid', 'address', 'id') - - def __init__(self, typeid, address, id): - (self.typeid, self.address, self.id) = (typeid, address, id) - - def __getstate__(self): - return (self.typeid, self.address, self.id) - - def __setstate__(self, state): - (self.typeid, self.address, self.id) = state - - def __repr__(self): - return 'Token(typeid=%r, address=%r, id=%r)' % \ - (self.typeid, self.address, self.id) - -# -# Function for communication with a manager's server process -# - -def dispatch(c, id, methodname, args=(), kwds={}): - ''' - Send a message to manager using connection `c` and return response - ''' - c.send((id, methodname, args, kwds)) - kind, result = c.recv() - if kind == '#RETURN': - return result - raise convert_to_error(kind, result) - -def convert_to_error(kind, result): - if kind == '#ERROR': - return result - elif kind == '#TRACEBACK': - assert type(result) is str - return RemoteError(result) - elif kind == '#UNSERIALIZABLE': - assert type(result) is str - return RemoteError('Unserializable message: %s\n' % result) - else: - return ValueError('Unrecognized message type') - -class RemoteError(Exception): - def __str__(self): - return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) - -# -# Functions for finding the method names of an object -# - -def all_methods(obj): - ''' - Return a list of names of methods of `obj` - ''' - temp = [] - for name in dir(obj): - func = getattr(obj, name) - if hasattr(func, '__call__'): - temp.append(name) - return temp - -def public_methods(obj): - ''' - Return a list of names of methods of `obj` which do not start with '_' - ''' - return [name for name in all_methods(obj) if name[0] != '_'] - -# -# Server which is run in a process controlled by a manager -# - -class Server(object): - ''' - Server class which runs in a process controlled by a manager object - ''' - public = ['shutdown', 'create', 'accept_connection', 'get_methods', - 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] - - def __init__(self, registry, address, authkey, serializer): - assert isinstance(authkey, bytes) - self.registry = registry - self.authkey = AuthenticationString(authkey) - Listener, Client = listener_client[serializer] - - # do authentication later - self.listener = Listener(address=address, backlog=5) - self.address = self.listener.address - - self.id_to_obj = {0: (None, ())} - self.id_to_refcount = {} - self.mutex = threading.RLock() - self.stop = 0 - - def serve_forever(self): - ''' - Run the server forever - ''' - current_process()._manager_server = self - try: - try: - while 1: - try: - c = self.listener.accept() - except (OSError, IOError): - continue - t = threading.Thread(target=self.handle_request, args=(c,)) - t.setDaemon(True) - t.start() - except (KeyboardInterrupt, SystemExit): - pass - finally: - self.stop = 999 - self.listener.close() - - def handle_request(self, c): - ''' - Handle a new connection - ''' - funcname = result = request = None - try: - connection.deliver_challenge(c, self.authkey) - connection.answer_challenge(c, self.authkey) - request = c.recv() - ignore, funcname, args, kwds = request - assert funcname in self.public, '%r unrecognized' % funcname - func = getattr(self, funcname) - except Exception: - msg = ('#TRACEBACK', format_exc()) - else: - try: - result = func(c, *args, **kwds) - except Exception: - msg = ('#TRACEBACK', format_exc()) - else: - msg = ('#RETURN', result) - try: - c.send(msg) - except Exception, e: - try: - c.send(('#TRACEBACK', format_exc())) - except Exception: - pass - util.info('Failure to send message: %r', msg) - util.info(' ... request was %r', request) - util.info(' ... exception was %r', e) - - c.close() - - def serve_client(self, conn): - ''' - Handle requests from the proxies in a particular process/thread - ''' - util.debug('starting server thread to service %r', - threading.currentThread().getName()) - - recv = conn.recv - send = conn.send - id_to_obj = self.id_to_obj - - while not self.stop: - - try: - methodname = obj = None - request = recv() - ident, methodname, args, kwds = request - obj, exposed, gettypeid = id_to_obj[ident] - - if methodname not in exposed: - raise AttributeError( - 'method %r of %r object is not in exposed=%r' % - (methodname, type(obj), exposed) - ) - - function = getattr(obj, methodname) - - try: - res = function(*args, **kwds) - except Exception, e: - msg = ('#ERROR', e) - else: - typeid = gettypeid and gettypeid.get(methodname, None) - if typeid: - rident, rexposed = self.create(conn, typeid, res) - token = Token(typeid, self.address, rident) - msg = ('#PROXY', (rexposed, token)) - else: - msg = ('#RETURN', res) - - except AttributeError: - if methodname is None: - msg = ('#TRACEBACK', format_exc()) - else: - try: - fallback_func = self.fallback_mapping[methodname] - result = fallback_func( - self, conn, ident, obj, *args, **kwds - ) - msg = ('#RETURN', result) - except Exception: - msg = ('#TRACEBACK', format_exc()) - - except EOFError: - util.debug('got EOF -- exiting thread serving %r', - threading.currentThread().getName()) - sys.exit(0) - - except Exception: - msg = ('#TRACEBACK', format_exc()) - - try: - try: - send(msg) - except Exception, e: - send(('#UNSERIALIZABLE', repr(msg))) - except Exception, e: - util.info('exception in thread serving %r', - threading.currentThread().getName()) - util.info(' ... message was %r', msg) - util.info(' ... exception was %r', e) - conn.close() - sys.exit(1) - - def fallback_getvalue(self, conn, ident, obj): - return obj - - def fallback_str(self, conn, ident, obj): - return str(obj) - - def fallback_repr(self, conn, ident, obj): - return repr(obj) - - fallback_mapping = { - '__str__':fallback_str, - '__repr__':fallback_repr, - '#GETVALUE':fallback_getvalue - } - - def dummy(self, c): - pass - - def debug_info(self, c): - ''' - Return some info --- useful to spot problems with refcounting - ''' - self.mutex.acquire() - try: - result = [] - keys = self.id_to_obj.keys() - keys.sort() - for ident in keys: - if ident != 0: - result.append(' %s: refcount=%s\n %s' % - (ident, self.id_to_refcount[ident], - str(self.id_to_obj[ident][0])[:75])) - return '\n'.join(result) - finally: - self.mutex.release() - - def number_of_objects(self, c): - ''' - Number of shared objects - ''' - return len(self.id_to_obj) - 1 # don't count ident=0 - - def shutdown(self, c): - ''' - Shutdown this process - ''' - try: - try: - util.debug('manager received shutdown message') - c.send(('#RETURN', None)) - - if sys.stdout != sys.__stdout__: - util.debug('resetting stdout, stderr') - sys.stdout = sys.__stdout__ - sys.stderr = sys.__stderr__ - - util._run_finalizers(0) - - for p in active_children(): - util.debug('terminating a child process of manager') - p.terminate() - - for p in active_children(): - util.debug('terminating a child process of manager') - p.join() - - util._run_finalizers() - util.info('manager exiting with exitcode 0') - except: - import traceback - traceback.print_exc() - finally: - exit(0) - - def create(self, c, typeid, *args, **kwds): - ''' - Create a new shared object and return its id - ''' - self.mutex.acquire() - try: - callable, exposed, method_to_typeid, proxytype = \ - self.registry[typeid] - - if callable is None: - assert len(args) == 1 and not kwds - obj = args[0] - else: - obj = callable(*args, **kwds) - - if exposed is None: - exposed = public_methods(obj) - if method_to_typeid is not None: - assert type(method_to_typeid) is dict - exposed = list(exposed) + list(method_to_typeid) - - ident = '%x' % id(obj) # convert to string because xmlrpclib - # only has 32 bit signed integers - util.debug('%r callable returned object with id %r', typeid, ident) - - self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) - if ident not in self.id_to_refcount: - self.id_to_refcount[ident] = None - return ident, tuple(exposed) - finally: - self.mutex.release() - - def get_methods(self, c, token): - ''' - Return the methods of the shared object indicated by token - ''' - return tuple(self.id_to_obj[token.id][1]) - - def accept_connection(self, c, name): - ''' - Spawn a new thread to serve this connection - ''' - threading.currentThread().setName(name) - c.send(('#RETURN', None)) - self.serve_client(c) - - def incref(self, c, ident): - self.mutex.acquire() - try: - try: - self.id_to_refcount[ident] += 1 - except TypeError: - assert self.id_to_refcount[ident] is None - self.id_to_refcount[ident] = 1 - finally: - self.mutex.release() - - def decref(self, c, ident): - self.mutex.acquire() - try: - assert self.id_to_refcount[ident] >= 1 - self.id_to_refcount[ident] -= 1 - if self.id_to_refcount[ident] == 0: - del self.id_to_obj[ident], self.id_to_refcount[ident] - util.debug('disposing of obj with id %d', ident) - finally: - self.mutex.release() - -# -# Class to represent state of a manager -# - -class State(object): - __slots__ = ['value'] - INITIAL = 0 - STARTED = 1 - SHUTDOWN = 2 - -# -# Mapping from serializer name to Listener and Client types -# - -listener_client = { - 'pickle' : (connection.Listener, connection.Client), - 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) - } - -# -# Definition of BaseManager -# - -class BaseManager(object): - ''' - Base class for managers - ''' - _registry = {} - _Server = Server - - def __init__(self, address=None, authkey=None, serializer='pickle'): - if authkey is None: - authkey = current_process().get_authkey() - self._address = address # XXX not final address if eg ('', 0) - self._authkey = AuthenticationString(authkey) - self._state = State() - self._state.value = State.INITIAL - self._serializer = serializer - self._Listener, self._Client = listener_client[serializer] - - def __reduce__(self): - return type(self).from_address, \ - (self._address, self._authkey, self._serializer) - - def get_server(self): - ''' - Return server object with serve_forever() method and address attribute - ''' - assert self._state.value == State.INITIAL - return Server(self._registry, self._address, - self._authkey, self._serializer) - - def connect(self): - ''' - Connect manager object to the server process - ''' - Listener, Client = listener_client[self._serializer] - conn = Client(self._address, authkey=self._authkey) - dispatch(conn, None, 'dummy') - self._state.value = State.STARTED - - def start(self): - ''' - Spawn a server process for this manager object - ''' - assert self._state.value == State.INITIAL - - # pipe over which we will retrieve address of server - reader, writer = connection.Pipe(duplex=False) - - # spawn process which runs a server - self._process = Process( - target=type(self)._run_server, - args=(self._registry, self._address, self._authkey, - self._serializer, writer), - ) - ident = ':'.join(str(i) for i in self._process._identity) - self._process.set_name(type(self).__name__ + '-' + ident) - self._process.start() - - # get address of server - writer.close() - self._address = reader.recv() - reader.close() - - # register a finalizer - self._state.value = State.STARTED - self.shutdown = util.Finalize( - self, type(self)._finalize_manager, - args=(self._process, self._address, self._authkey, - self._state, self._Client), - exitpriority=0 - ) - - @classmethod - def _run_server(cls, registry, address, authkey, serializer, writer): - ''' - Create a server, report its address and run it - ''' - # create server - server = cls._Server(registry, address, authkey, serializer) - - # inform parent process of the server's address - writer.send(server.address) - writer.close() - - # run the manager - util.info('manager serving at %r', server.address) - server.serve_forever() - - def _create(self, typeid, *args, **kwds): - ''' - Create a new shared object; return the token and exposed tuple - ''' - assert self._state.value == State.STARTED, 'server not yet started' - conn = self._Client(self._address, authkey=self._authkey) - try: - id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) - finally: - conn.close() - return Token(typeid, self._address, id), exposed - - def join(self, timeout=None): - ''' - Join the manager process (if it has been spawned) - ''' - self._process.join(timeout) - - def _debug_info(self): - ''' - Return some info about the servers shared objects and connections - ''' - conn = self._Client(self._address, authkey=self._authkey) - try: - return dispatch(conn, None, 'debug_info') - finally: - conn.close() - - def _number_of_objects(self): - ''' - Return the number of shared objects - ''' - conn = self._Client(self._address, authkey=self._authkey) - try: - return dispatch(conn, None, 'number_of_objects') - finally: - conn.close() - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.shutdown() - - @staticmethod - def _finalize_manager(process, address, authkey, state, _Client): - ''' - Shutdown the manager process; will be registered as a finalizer - ''' - if process.is_alive(): - util.info('sending shutdown message to manager') - try: - conn = _Client(address, authkey=authkey) - try: - dispatch(conn, None, 'shutdown') - finally: - conn.close() - except Exception: - pass - - process.join(timeout=0.2) - if process.is_alive(): - util.info('manager still alive') - if hasattr(process, 'terminate'): - util.info('trying to `terminate()` manager process') - process.terminate() - process.join(timeout=0.1) - if process.is_alive(): - util.info('manager still alive after terminate') - - state.value = State.SHUTDOWN - try: - del BaseProxy._address_to_local[address] - except KeyError: - pass - - address = property(lambda self: self._address) - - @classmethod - def register(cls, typeid, callable=None, proxytype=None, exposed=None, - method_to_typeid=None, create_method=True): - ''' - Register a typeid with the manager type - ''' - if '_registry' not in cls.__dict__: - cls._registry = cls._registry.copy() - - if proxytype is None: - proxytype = AutoProxy - - exposed = exposed or getattr(proxytype, '_exposed_', None) - - method_to_typeid = method_to_typeid or \ - getattr(proxytype, '_method_to_typeid_', None) - - if method_to_typeid: - for key, value in method_to_typeid.items(): - assert type(key) is str, '%r is not a string' % key - assert type(value) is str, '%r is not a string' % value - - cls._registry[typeid] = ( - callable, exposed, method_to_typeid, proxytype - ) - - if create_method: - def temp(self, *args, **kwds): - util.debug('requesting creation of a shared %r object', typeid) - token, exp = self._create(typeid, *args, **kwds) - proxy = proxytype( - token, self._serializer, manager=self, - authkey=self._authkey, exposed=exp - ) - return proxy - temp.__name__ = typeid - setattr(cls, typeid, temp) - -# -# Subclass of set which get cleared after a fork -# - -class ProcessLocalSet(set): - def __init__(self): - util.register_after_fork(self, lambda obj: obj.clear()) - def __reduce__(self): - return type(self), () - -# -# Definition of BaseProxy -# - -class BaseProxy(object): - ''' - A base for proxies of shared objects - ''' - _address_to_local = {} - _mutex = util.ForkAwareThreadLock() - - def __init__(self, token, serializer, manager=None, - authkey=None, exposed=None, incref=True): - BaseProxy._mutex.acquire() - try: - tls_idset = BaseProxy._address_to_local.get(token.address, None) - if tls_idset is None: - tls_idset = util.ForkAwareLocal(), ProcessLocalSet() - BaseProxy._address_to_local[token.address] = tls_idset - finally: - BaseProxy._mutex.release() - - # self._tls is used to record the connection used by this - # thread to communicate with the manager at token.address - self._tls = tls_idset[0] - - # self._idset is used to record the identities of all shared - # objects for which the current process owns references and - # which are in the manager at token.address - self._idset = tls_idset[1] - - self._token = token - self._id = self._token.id - self._manager = manager - self._serializer = serializer - self._Client = listener_client[serializer][1] - - if authkey is not None: - self._authkey = AuthenticationString(authkey) - elif self._manager is not None: - self._authkey = self._manager._authkey - else: - self._authkey = current_process().get_authkey() - - if incref: - self._incref() - - util.register_after_fork(self, BaseProxy._after_fork) - - def _connect(self): - util.debug('making connection to manager') - name = current_process().get_name() - if threading.currentThread().getName() != 'MainThread': - name += '|' + threading.currentThread().getName() - conn = self._Client(self._token.address, authkey=self._authkey) - dispatch(conn, None, 'accept_connection', (name,)) - self._tls.connection = conn - - def _callmethod(self, methodname, args=(), kwds={}): - ''' - Try to call a method of the referrent and return a copy of the result - ''' - try: - conn = self._tls.connection - except AttributeError: - util.debug('thread %r does not own a connection', - threading.currentThread().getName()) - self._connect() - conn = self._tls.connection - - conn.send((self._id, methodname, args, kwds)) - kind, result = conn.recv() - - if kind == '#RETURN': - return result - elif kind == '#PROXY': - exposed, token = result - proxytype = self._manager._registry[token.typeid][-1] - return proxytype( - token, self._serializer, manager=self._manager, - authkey=self._authkey, exposed=exposed - ) - raise convert_to_error(kind, result) - - def _getvalue(self): - ''' - Get a copy of the value of the referent - ''' - return self._callmethod('#GETVALUE') - - def _incref(self): - conn = self._Client(self._token.address, authkey=self._authkey) - dispatch(conn, None, 'incref', (self._id,)) - util.debug('INCREF %r', self._token.id) - - self._idset.add(self._id) - - state = self._manager and self._manager._state - - self._close = util.Finalize( - self, BaseProxy._decref, - args=(self._token, self._authkey, state, - self._tls, self._idset, self._Client), - exitpriority=10 - ) - - @staticmethod - def _decref(token, authkey, state, tls, idset, _Client): - idset.discard(token.id) - - # check whether manager is still alive - if state is None or state.value == State.STARTED: - # tell manager this process no longer cares about referent - try: - util.debug('DECREF %r', token.id) - conn = _Client(token.address, authkey=authkey) - dispatch(conn, None, 'decref', (token.id,)) - except Exception, e: - util.debug('... decref failed %s', e) - - else: - util.debug('DECREF %r -- manager already shutdown', token.id) - - # check whether we can close this thread's connection because - # the process owns no more references to objects for this manager - if not idset and hasattr(tls, 'connection'): - util.debug('thread %r has no more proxies so closing conn', - threading.currentThread().getName()) - tls.connection.close() - del tls.connection - - def _after_fork(self): - self._manager = None - try: - self._incref() - except Exception, e: - # the proxy may just be for a manager which has shutdown - util.info('incref failed: %s' % e) - - def __reduce__(self): - kwds = {} - if Popen.thread_is_spawning(): - kwds['authkey'] = self._authkey - - if getattr(self, '_isauto', False): - kwds['exposed'] = self._exposed_ - return (RebuildProxy, - (AutoProxy, self._token, self._serializer, kwds)) - else: - return (RebuildProxy, - (type(self), self._token, self._serializer, kwds)) - - def __deepcopy__(self, memo): - return self._getvalue() - - def __repr__(self): - return '<%s object, typeid %r at %s>' % \ - (type(self).__name__, self._token.typeid, '0x%x' % id(self)) - - def __str__(self): - ''' - Return representation of the referent (or a fall-back if that fails) - ''' - try: - return self._callmethod('__repr__') - except Exception: - return repr(self)[:-1] + "; '__str__()' failed>" - -# -# Function used for unpickling -# - -def RebuildProxy(func, token, serializer, kwds): - ''' - Function used for unpickling proxy objects. - - If possible the shared object is returned, or otherwise a proxy for it. - ''' - server = getattr(current_process(), '_manager_server', None) - - if server and server.address == token.address: - return server.id_to_obj[token.id][0] - else: - incref = ( - kwds.pop('incref', True) and - not getattr(current_process(), '_inheriting', False) - ) - return func(token, serializer, incref=incref, **kwds) - -# -# Functions to create proxies and proxy types -# - -def MakeProxyType(name, exposed, _cache={}): - ''' - Return an proxy type whose methods are given by `exposed` - ''' - exposed = tuple(exposed) - try: - return _cache[(name, exposed)] - except KeyError: - pass - - dic = {} - - for meth in exposed: - exec '''def %s(self, *args, **kwds): - return self._callmethod(%r, args, kwds)''' % (meth, meth) in dic - - ProxyType = type(name, (BaseProxy,), dic) - ProxyType._exposed_ = exposed - _cache[(name, exposed)] = ProxyType - return ProxyType - - -def AutoProxy(token, serializer, manager=None, authkey=None, - exposed=None, incref=True): - ''' - Return an auto-proxy for `token` - ''' - _Client = listener_client[serializer][1] - - if exposed is None: - conn = _Client(token.address, authkey=authkey) - try: - exposed = dispatch(conn, None, 'get_methods', (token,)) - finally: - conn.close() - - if authkey is None and manager is not None: - authkey = manager._authkey - if authkey is None: - authkey = current_process().get_authkey() - - ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) - proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, - incref=incref) - proxy._isauto = True - return proxy - -# -# Types/callables which we will register with SyncManager -# - -class Namespace(object): - def __init__(self, **kwds): - self.__dict__.update(kwds) - def __repr__(self): - items = self.__dict__.items() - temp = [] - for name, value in items: - if not name.startswith('_'): - temp.append('%s=%r' % (name, value)) - temp.sort() - return 'Namespace(%s)' % str.join(', ', temp) - -class Value(object): - def __init__(self, typecode, value, lock=True): - self._typecode = typecode - self._value = value - def get(self): - return self._value - def set(self, value): - self._value = value - def __repr__(self): - return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) - value = property(get, set) - -def Array(typecode, sequence, lock=True): - return array.array(typecode, sequence) - -# -# Proxy types used by SyncManager -# - -class IteratorProxy(BaseProxy): - # XXX remove methods for Py3.0 and Py2.6 - _exposed_ = ('__next__', 'next', 'send', 'throw', 'close') - def __iter__(self): - return self - def __next__(self, *args): - return self._callmethod('__next__', args) - def next(self, *args): - return self._callmethod('next', args) - def send(self, *args): - return self._callmethod('send', args) - def throw(self, *args): - return self._callmethod('throw', args) - def close(self, *args): - return self._callmethod('close', args) - - -class AcquirerProxy(BaseProxy): - _exposed_ = ('acquire', 'release') - def acquire(self, blocking=True): - return self._callmethod('acquire', (blocking,)) - def release(self): - return self._callmethod('release') - def __enter__(self): - return self._callmethod('acquire') - def __exit__(self, exc_type, exc_val, exc_tb): - return self._callmethod('release') - - -class ConditionProxy(AcquirerProxy): - # XXX will Condition.notfyAll() name be available in Py3.0? - _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notifyAll') - def wait(self, timeout=None): - return self._callmethod('wait', (timeout,)) - def notify(self): - return self._callmethod('notify') - def notify_all(self): - return self._callmethod('notifyAll') - -class EventProxy(BaseProxy): - # XXX will Event.isSet name be available in Py3.0? - _exposed_ = ('isSet', 'set', 'clear', 'wait') - def is_set(self): - return self._callmethod('isSet') - def set(self): - return self._callmethod('set') - def clear(self): - return self._callmethod('clear') - def wait(self, timeout=None): - return self._callmethod('wait', (timeout,)) - -class NamespaceProxy(BaseProxy): - _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') - def __getattr__(self, key): - if key[0] == '_': - return object.__getattribute__(self, key) - callmethod = object.__getattribute__(self, '_callmethod') - return callmethod('__getattribute__', (key,)) - def __setattr__(self, key, value): - if key[0] == '_': - return object.__setattr__(self, key, value) - callmethod = object.__getattribute__(self, '_callmethod') - return callmethod('__setattr__', (key, value)) - def __delattr__(self, key): - if key[0] == '_': - return object.__delattr__(self, key) - callmethod = object.__getattribute__(self, '_callmethod') - return callmethod('__delattr__', (key,)) - - -class ValueProxy(BaseProxy): - _exposed_ = ('get', 'set') - def get(self): - return self._callmethod('get') - def set(self, value): - return self._callmethod('set', (value,)) - value = property(get, set) - - -BaseListProxy = MakeProxyType('BaseListProxy', ( - '__add__', '__contains__', '__delitem__', '__delslice__', - '__getitem__', '__getslice__', '__len__', '__mul__', - '__reversed__', '__rmul__', '__setitem__', '__setslice__', - 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', - 'reverse', 'sort', '__imul__' - )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 -class ListProxy(BaseListProxy): - def __iadd__(self, value): - self._callmethod('extend', (value,)) - return self - def __imul__(self, value): - self._callmethod('__imul__', (value,)) - return self - - -DictProxy = MakeProxyType('DictProxy', ( - '__contains__', '__delitem__', '__getitem__', '__len__', - '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items', - 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' - )) - - -ArrayProxy = MakeProxyType('ArrayProxy', ( - '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__' - )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 - - -PoolProxy = MakeProxyType('PoolProxy', ( - 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', - 'map', 'map_async', 'terminate' - )) -PoolProxy._method_to_typeid_ = { - 'apply_async': 'AsyncResult', - 'map_async': 'AsyncResult', - 'imap': 'Iterator', - 'imap_unordered': 'Iterator' - } - -# -# Definition of SyncManager -# - -class SyncManager(BaseManager): - ''' - Subclass of `BaseManager` which supports a number of shared object types. - - The types registered are those intended for the synchronization - of threads, plus `dict`, `list` and `Namespace`. - - The `multiprocessing.Manager()` function creates started instances of - this class. - ''' - -SyncManager.register('Queue', Queue.Queue) -SyncManager.register('JoinableQueue', Queue.Queue) -SyncManager.register('Event', threading.Event, EventProxy) -SyncManager.register('Lock', threading.Lock, AcquirerProxy) -SyncManager.register('RLock', threading.RLock, AcquirerProxy) -SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) -SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, - AcquirerProxy) -SyncManager.register('Condition', threading.Condition, ConditionProxy) -SyncManager.register('Pool', Pool, PoolProxy) -SyncManager.register('list', list, ListProxy) -SyncManager.register('dict', dict, DictProxy) -SyncManager.register('Value', Value, ValueProxy) -SyncManager.register('Array', Array, ArrayProxy) -SyncManager.register('Namespace', Namespace, NamespaceProxy) - -# types returned by methods of PoolProxy -SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) -SyncManager.register('AsyncResult', create_method=False) +# +# Module providing the `SyncManager` class for dealing +# with shared objects +# +# multiprocessing/managers.py +# +# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt +# + +__all__ = [ 'BaseManager', 'SyncManager', 'BaseProxy', 'Token' ] + +# +# Imports +# + +import os +import sys +import weakref +import threading +import array +import copyreg +import queue + +from traceback import format_exc +from multiprocessing import Process, current_process, active_children, Pool, util, connection +from multiprocessing.process import AuthenticationString +from multiprocessing.forking import exit, Popen, assert_spawning +from multiprocessing.util import Finalize, info + +try: + from cPickle import PicklingError +except ImportError: + from pickle import PicklingError + +# +# +# + +try: + bytes +except NameError: + bytes = str # XXX not needed in Py2.6 and Py3.0 + +# +# Register some things for pickling +# + +def reduce_array(a): + return array.array, (a.typecode, a.tostring()) +copyreg.pickle(array.array, reduce_array) + +view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] +if view_types[0] is not list: # XXX only needed in Py3.0 + def rebuild_as_list(obj): + return list, (list(obj),) + for view_type in view_types: + copyreg.pickle(view_type, rebuild_as_list) + +# +# Type for identifying shared objects +# + +class Token(object): + ''' + Type to uniquely indentify a shared object + ''' + __slots__ = ('typeid', 'address', 'id') + + def __init__(self, typeid, address, id): + (self.typeid, self.address, self.id) = (typeid, address, id) + + def __getstate__(self): + return (self.typeid, self.address, self.id) + + def __setstate__(self, state): + (self.typeid, self.address, self.id) = state + + def __repr__(self): + return 'Token(typeid=%r, address=%r, id=%r)' % \ + (self.typeid, self.address, self.id) + +# +# Function for communication with a manager's server process +# + +def dispatch(c, id, methodname, args=(), kwds={}): + ''' + Send a message to manager using connection `c` and return response + ''' + c.send((id, methodname, args, kwds)) + kind, result = c.recv() + if kind == '#RETURN': + return result + raise convert_to_error(kind, result) + +def convert_to_error(kind, result): + if kind == '#ERROR': + return result + elif kind == '#TRACEBACK': + assert type(result) is str + return RemoteError(result) + elif kind == '#UNSERIALIZABLE': + assert type(result) is str + return RemoteError('Unserializable message: %s\n' % result) + else: + return ValueError('Unrecognized message type') + +class RemoteError(Exception): + def __str__(self): + return ('\n' + '-'*75 + '\n' + str(self.args[0]) + '-'*75) + +# +# Functions for finding the method names of an object +# + +def all_methods(obj): + ''' + Return a list of names of methods of `obj` + ''' + temp = [] + for name in dir(obj): + func = getattr(obj, name) + if hasattr(func, '__call__'): + temp.append(name) + return temp + +def public_methods(obj): + ''' + Return a list of names of methods of `obj` which do not start with '_' + ''' + return [name for name in all_methods(obj) if name[0] != '_'] + +# +# Server which is run in a process controlled by a manager +# + +class Server(object): + ''' + Server class which runs in a process controlled by a manager object + ''' + public = ['shutdown', 'create', 'accept_connection', 'get_methods', + 'debug_info', 'number_of_objects', 'dummy', 'incref', 'decref'] + + def __init__(self, registry, address, authkey, serializer): + assert isinstance(authkey, bytes) + self.registry = registry + self.authkey = AuthenticationString(authkey) + Listener, Client = listener_client[serializer] + + # do authentication later + self.listener = Listener(address=address, backlog=5) + self.address = self.listener.address + + self.id_to_obj = {0: (None, ())} + self.id_to_refcount = {} + self.mutex = threading.RLock() + self.stop = 0 + + def serve_forever(self): + ''' + Run the server forever + ''' + current_process()._manager_server = self + try: + try: + while 1: + try: + c = self.listener.accept() + except (OSError, IOError): + continue + t = threading.Thread(target=self.handle_request, args=(c,)) + t.setDaemon(True) + t.start() + except (KeyboardInterrupt, SystemExit): + pass + finally: + self.stop = 999 + self.listener.close() + + def handle_request(self, c): + ''' + Handle a new connection + ''' + funcname = result = request = None + try: + connection.deliver_challenge(c, self.authkey) + connection.answer_challenge(c, self.authkey) + request = c.recv() + ignore, funcname, args, kwds = request + assert funcname in self.public, '%r unrecognized' % funcname + func = getattr(self, funcname) + except Exception: + msg = ('#TRACEBACK', format_exc()) + else: + try: + result = func(c, *args, **kwds) + except Exception: + msg = ('#TRACEBACK', format_exc()) + else: + msg = ('#RETURN', result) + try: + c.send(msg) + except Exception as e: + try: + c.send(('#TRACEBACK', format_exc())) + except Exception: + pass + util.info('Failure to send message: %r', msg) + util.info(' ... request was %r', request) + util.info(' ... exception was %r', e) + + c.close() + + def serve_client(self, conn): + ''' + Handle requests from the proxies in a particular process/thread + ''' + util.debug('starting server thread to service %r', + threading.currentThread().getName()) + + recv = conn.recv + send = conn.send + id_to_obj = self.id_to_obj + + while not self.stop: + + try: + methodname = obj = None + request = recv() + ident, methodname, args, kwds = request + obj, exposed, gettypeid = id_to_obj[ident] + + if methodname not in exposed: + raise AttributeError( + 'method %r of %r object is not in exposed=%r' % + (methodname, type(obj), exposed) + ) + + function = getattr(obj, methodname) + + try: + res = function(*args, **kwds) + except Exception as e: + msg = ('#ERROR', e) + else: + typeid = gettypeid and gettypeid.get(methodname, None) + if typeid: + rident, rexposed = self.create(conn, typeid, res) + token = Token(typeid, self.address, rident) + msg = ('#PROXY', (rexposed, token)) + else: + msg = ('#RETURN', res) + + except AttributeError: + if methodname is None: + msg = ('#TRACEBACK', format_exc()) + else: + try: + fallback_func = self.fallback_mapping[methodname] + result = fallback_func( + self, conn, ident, obj, *args, **kwds + ) + msg = ('#RETURN', result) + except Exception: + msg = ('#TRACEBACK', format_exc()) + + except EOFError: + util.debug('got EOF -- exiting thread serving %r', + threading.currentThread().getName()) + sys.exit(0) + + except Exception: + msg = ('#TRACEBACK', format_exc()) + + try: + try: + send(msg) + except Exception as e: + send(('#UNSERIALIZABLE', repr(msg))) + except Exception as e: + util.info('exception in thread serving %r', + threading.currentThread().getName()) + util.info(' ... message was %r', msg) + util.info(' ... exception was %r', e) + conn.close() + sys.exit(1) + + def fallback_getvalue(self, conn, ident, obj): + return obj + + def fallback_str(self, conn, ident, obj): + return str(obj) + + def fallback_repr(self, conn, ident, obj): + return repr(obj) + + fallback_mapping = { + '__str__':fallback_str, + '__repr__':fallback_repr, + '#GETVALUE':fallback_getvalue + } + + def dummy(self, c): + pass + + def debug_info(self, c): + ''' + Return some info --- useful to spot problems with refcounting + ''' + self.mutex.acquire() + try: + result = [] + keys = list(self.id_to_obj.keys()) + keys.sort() + for ident in keys: + if ident != 0: + result.append(' %s: refcount=%s\n %s' % + (ident, self.id_to_refcount[ident], + str(self.id_to_obj[ident][0])[:75])) + return '\n'.join(result) + finally: + self.mutex.release() + + def number_of_objects(self, c): + ''' + Number of shared objects + ''' + return len(self.id_to_obj) - 1 # don't count ident=0 + + def shutdown(self, c): + ''' + Shutdown this process + ''' + try: + try: + util.debug('manager received shutdown message') + c.send(('#RETURN', None)) + + if sys.stdout != sys.__stdout__: + util.debug('resetting stdout, stderr') + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ + + util._run_finalizers(0) + + for p in active_children(): + util.debug('terminating a child process of manager') + p.terminate() + + for p in active_children(): + util.debug('terminating a child process of manager') + p.join() + + util._run_finalizers() + util.info('manager exiting with exitcode 0') + except: + import traceback + traceback.print_exc() + finally: + exit(0) + + def create(self, c, typeid, *args, **kwds): + ''' + Create a new shared object and return its id + ''' + self.mutex.acquire() + try: + callable, exposed, method_to_typeid, proxytype = \ + self.registry[typeid] + + if callable is None: + assert len(args) == 1 and not kwds + obj = args[0] + else: + obj = callable(*args, **kwds) + + if exposed is None: + exposed = public_methods(obj) + if method_to_typeid is not None: + assert type(method_to_typeid) is dict + exposed = list(exposed) + list(method_to_typeid) + + ident = '%x' % id(obj) # convert to string because xmlrpclib + # only has 32 bit signed integers + util.debug('%r callable returned object with id %r', typeid, ident) + + self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid) + if ident not in self.id_to_refcount: + self.id_to_refcount[ident] = None + return ident, tuple(exposed) + finally: + self.mutex.release() + + def get_methods(self, c, token): + ''' + Return the methods of the shared object indicated by token + ''' + return tuple(self.id_to_obj[token.id][1]) + + def accept_connection(self, c, name): + ''' + Spawn a new thread to serve this connection + ''' + threading.currentThread().setName(name) + c.send(('#RETURN', None)) + self.serve_client(c) + + def incref(self, c, ident): + self.mutex.acquire() + try: + try: + self.id_to_refcount[ident] += 1 + except TypeError: + assert self.id_to_refcount[ident] is None + self.id_to_refcount[ident] = 1 + finally: + self.mutex.release() + + def decref(self, c, ident): + self.mutex.acquire() + try: + assert self.id_to_refcount[ident] >= 1 + self.id_to_refcount[ident] -= 1 + if self.id_to_refcount[ident] == 0: + del self.id_to_obj[ident], self.id_to_refcount[ident] + util.debug('disposing of obj with id %d', ident) + finally: + self.mutex.release() + +# +# Class to represent state of a manager +# + +class State(object): + __slots__ = ['value'] + INITIAL = 0 + STARTED = 1 + SHUTDOWN = 2 + +# +# Mapping from serializer name to Listener and Client types +# + +listener_client = { + 'pickle' : (connection.Listener, connection.Client), + 'xmlrpclib' : (connection.XmlListener, connection.XmlClient) + } + +# +# Definition of BaseManager +# + +class BaseManager(object): + ''' + Base class for managers + ''' + _registry = {} + _Server = Server + + def __init__(self, address=None, authkey=None, serializer='pickle'): + if authkey is None: + authkey = current_process().get_authkey() + self._address = address # XXX not final address if eg ('', 0) + self._authkey = AuthenticationString(authkey) + self._state = State() + self._state.value = State.INITIAL + self._serializer = serializer + self._Listener, self._Client = listener_client[serializer] + + def __reduce__(self): + return type(self).from_address, \ + (self._address, self._authkey, self._serializer) + + def get_server(self): + ''' + Return server object with serve_forever() method and address attribute + ''' + assert self._state.value == State.INITIAL + return Server(self._registry, self._address, + self._authkey, self._serializer) + + def connect(self): + ''' + Connect manager object to the server process + ''' + Listener, Client = listener_client[self._serializer] + conn = Client(self._address, authkey=self._authkey) + dispatch(conn, None, 'dummy') + self._state.value = State.STARTED + + def start(self): + ''' + Spawn a server process for this manager object + ''' + assert self._state.value == State.INITIAL + + # pipe over which we will retrieve address of server + reader, writer = connection.Pipe(duplex=False) + + # spawn process which runs a server + self._process = Process( + target=type(self)._run_server, + args=(self._registry, self._address, self._authkey, + self._serializer, writer), + ) + ident = ':'.join(str(i) for i in self._process._identity) + self._process.set_name(type(self).__name__ + '-' + ident) + self._process.start() + + # get address of server + writer.close() + self._address = reader.recv() + reader.close() + + # register a finalizer + self._state.value = State.STARTED + self.shutdown = util.Finalize( + self, type(self)._finalize_manager, + args=(self._process, self._address, self._authkey, + self._state, self._Client), + exitpriority=0 + ) + + @classmethod + def _run_server(cls, registry, address, authkey, serializer, writer): + ''' + Create a server, report its address and run it + ''' + # create server + server = cls._Server(registry, address, authkey, serializer) + + # inform parent process of the server's address + writer.send(server.address) + writer.close() + + # run the manager + util.info('manager serving at %r', server.address) + server.serve_forever() + + def _create(self, typeid, *args, **kwds): + ''' + Create a new shared object; return the token and exposed tuple + ''' + assert self._state.value == State.STARTED, 'server not yet started' + conn = self._Client(self._address, authkey=self._authkey) + try: + id, exposed = dispatch(conn, None, 'create', (typeid,)+args, kwds) + finally: + conn.close() + return Token(typeid, self._address, id), exposed + + def join(self, timeout=None): + ''' + Join the manager process (if it has been spawned) + ''' + self._process.join(timeout) + + def _debug_info(self): + ''' + Return some info about the servers shared objects and connections + ''' + conn = self._Client(self._address, authkey=self._authkey) + try: + return dispatch(conn, None, 'debug_info') + finally: + conn.close() + + def _number_of_objects(self): + ''' + Return the number of shared objects + ''' + conn = self._Client(self._address, authkey=self._authkey) + try: + return dispatch(conn, None, 'number_of_objects') + finally: + conn.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown() + + @staticmethod + def _finalize_manager(process, address, authkey, state, _Client): + ''' + Shutdown the manager process; will be registered as a finalizer + ''' + if process.is_alive(): + util.info('sending shutdown message to manager') + try: + conn = _Client(address, authkey=authkey) + try: + dispatch(conn, None, 'shutdown') + finally: + conn.close() + except Exception: + pass + + process.join(timeout=0.2) + if process.is_alive(): + util.info('manager still alive') + if hasattr(process, 'terminate'): + util.info('trying to `terminate()` manager process') + process.terminate() + process.join(timeout=0.1) + if process.is_alive(): + util.info('manager still alive after terminate') + + state.value = State.SHUTDOWN + try: + del BaseProxy._address_to_local[address] + except KeyError: + pass + + address = property(lambda self: self._address) + + @classmethod + def register(cls, typeid, callable=None, proxytype=None, exposed=None, + method_to_typeid=None, create_method=True): + ''' + Register a typeid with the manager type + ''' + if '_registry' not in cls.__dict__: + cls._registry = cls._registry.copy() + + if proxytype is None: + proxytype = AutoProxy + + exposed = exposed or getattr(proxytype, '_exposed_', None) + + method_to_typeid = method_to_typeid or \ + getattr(proxytype, '_method_to_typeid_', None) + + if method_to_typeid: + for key, value in list(method_to_typeid.items()): + assert type(key) is str, '%r is not a string' % key + assert type(value) is str, '%r is not a string' % value + + cls._registry[typeid] = ( + callable, exposed, method_to_typeid, proxytype + ) + + if create_method: + def temp(self, *args, **kwds): + util.debug('requesting creation of a shared %r object', typeid) + token, exp = self._create(typeid, *args, **kwds) + proxy = proxytype( + token, self._serializer, manager=self, + authkey=self._authkey, exposed=exp + ) + return proxy + temp.__name__ = typeid + setattr(cls, typeid, temp) + +# +# Subclass of set which get cleared after a fork +# + +class ProcessLocalSet(set): + def __init__(self): + util.register_after_fork(self, lambda obj: obj.clear()) + def __reduce__(self): + return type(self), () + +# +# Definition of BaseProxy +# + +class BaseProxy(object): + ''' + A base for proxies of shared objects + ''' + _address_to_local = {} + _mutex = util.ForkAwareThreadLock() + + def __init__(self, token, serializer, manager=None, + authkey=None, exposed=None, incref=True): + BaseProxy._mutex.acquire() + try: + tls_idset = BaseProxy._address_to_local.get(token.address, None) + if tls_idset is None: + tls_idset = util.ForkAwareLocal(), ProcessLocalSet() + BaseProxy._address_to_local[token.address] = tls_idset + finally: + BaseProxy._mutex.release() + + # self._tls is used to record the connection used by this + # thread to communicate with the manager at token.address + self._tls = tls_idset[0] + + # self._idset is used to record the identities of all shared + # objects for which the current process owns references and + # which are in the manager at token.address + self._idset = tls_idset[1] + + self._token = token + self._id = self._token.id + self._manager = manager + self._serializer = serializer + self._Client = listener_client[serializer][1] + + if authkey is not None: + self._authkey = AuthenticationString(authkey) + elif self._manager is not None: + self._authkey = self._manager._authkey + else: + self._authkey = current_process().get_authkey() + + if incref: + self._incref() + + util.register_after_fork(self, BaseProxy._after_fork) + + def _connect(self): + util.debug('making connection to manager') + name = current_process().get_name() + if threading.currentThread().getName() != 'MainThread': + name += '|' + threading.currentThread().getName() + conn = self._Client(self._token.address, authkey=self._authkey) + dispatch(conn, None, 'accept_connection', (name,)) + self._tls.connection = conn + + def _callmethod(self, methodname, args=(), kwds={}): + ''' + Try to call a method of the referrent and return a copy of the result + ''' + try: + conn = self._tls.connection + except AttributeError: + util.debug('thread %r does not own a connection', + threading.currentThread().getName()) + self._connect() + conn = self._tls.connection + + conn.send((self._id, methodname, args, kwds)) + kind, result = conn.recv() + + if kind == '#RETURN': + return result + elif kind == '#PROXY': + exposed, token = result + proxytype = self._manager._registry[token.typeid][-1] + return proxytype( + token, self._serializer, manager=self._manager, + authkey=self._authkey, exposed=exposed + ) + raise convert_to_error(kind, result) + + def _getvalue(self): + ''' + Get a copy of the value of the referent + ''' + return self._callmethod('#GETVALUE') + + def _incref(self): + conn = self._Client(self._token.address, authkey=self._authkey) + dispatch(conn, None, 'incref', (self._id,)) + util.debug('INCREF %r', self._token.id) + + self._idset.add(self._id) + + state = self._manager and self._manager._state + + self._close = util.Finalize( + self, BaseProxy._decref, + args=(self._token, self._authkey, state, + self._tls, self._idset, self._Client), + exitpriority=10 + ) + + @staticmethod + def _decref(token, authkey, state, tls, idset, _Client): + idset.discard(token.id) + + # check whether manager is still alive + if state is None or state.value == State.STARTED: + # tell manager this process no longer cares about referent + try: + util.debug('DECREF %r', token.id) + conn = _Client(token.address, authkey=authkey) + dispatch(conn, None, 'decref', (token.id,)) + except Exception as e: + util.debug('... decref failed %s', e) + + else: + util.debug('DECREF %r -- manager already shutdown', token.id) + + # check whether we can close this thread's connection because + # the process owns no more references to objects for this manager + if not idset and hasattr(tls, 'connection'): + util.debug('thread %r has no more proxies so closing conn', + threading.currentThread().getName()) + tls.connection.close() + del tls.connection + + def _after_fork(self): + self._manager = None + try: + self._incref() + except Exception as e: + # the proxy may just be for a manager which has shutdown + util.info('incref failed: %s' % e) + + def __reduce__(self): + kwds = {} + if Popen.thread_is_spawning(): + kwds['authkey'] = self._authkey + + if getattr(self, '_isauto', False): + kwds['exposed'] = self._exposed_ + return (RebuildProxy, + (AutoProxy, self._token, self._serializer, kwds)) + else: + return (RebuildProxy, + (type(self), self._token, self._serializer, kwds)) + + def __deepcopy__(self, memo): + return self._getvalue() + + def __repr__(self): + return '<%s object, typeid %r at %s>' % \ + (type(self).__name__, self._token.typeid, '0x%x' % id(self)) + + def __str__(self): + ''' + Return representation of the referent (or a fall-back if that fails) + ''' + try: + return self._callmethod('__repr__') + except Exception: + return repr(self)[:-1] + "; '__str__()' failed>" + +# +# Function used for unpickling +# + +def RebuildProxy(func, token, serializer, kwds): + ''' + Function used for unpickling proxy objects. + + If possible the shared object is returned, or otherwise a proxy for it. + ''' + server = getattr(current_process(), '_manager_server', None) + + if server and server.address == token.address: + return server.id_to_obj[token.id][0] + else: + incref = ( + kwds.pop('incref', True) and + not getattr(current_process(), '_inheriting', False) + ) + return func(token, serializer, incref=incref, **kwds) + +# +# Functions to create proxies and proxy types +# + +def MakeProxyType(name, exposed, _cache={}): + ''' + Return an proxy type whose methods are given by `exposed` + ''' + exposed = tuple(exposed) + try: + return _cache[(name, exposed)] + except KeyError: + pass + + dic = {} + + for meth in exposed: + exec('''def %s(self, *args, **kwds): + return self._callmethod(%r, args, kwds)''' % (meth, meth), dic) + + ProxyType = type(name, (BaseProxy,), dic) + ProxyType._exposed_ = exposed + _cache[(name, exposed)] = ProxyType + return ProxyType + + +def AutoProxy(token, serializer, manager=None, authkey=None, + exposed=None, incref=True): + ''' + Return an auto-proxy for `token` + ''' + _Client = listener_client[serializer][1] + + if exposed is None: + conn = _Client(token.address, authkey=authkey) + try: + exposed = dispatch(conn, None, 'get_methods', (token,)) + finally: + conn.close() + + if authkey is None and manager is not None: + authkey = manager._authkey + if authkey is None: + authkey = current_process().get_authkey() + + ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) + proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, + incref=incref) + proxy._isauto = True + return proxy + +# +# Types/callables which we will register with SyncManager +# + +class Namespace(object): + def __init__(self, **kwds): + self.__dict__.update(kwds) + def __repr__(self): + items = list(self.__dict__.items()) + temp = [] + for name, value in items: + if not name.startswith('_'): + temp.append('%s=%r' % (name, value)) + temp.sort() + return 'Namespace(%s)' % str.join(', ', temp) + +class Value(object): + def __init__(self, typecode, value, lock=True): + self._typecode = typecode + self._value = value + def get(self): + return self._value + def set(self, value): + self._value = value + def __repr__(self): + return '%s(%r, %r)'%(type(self).__name__, self._typecode, self._value) + value = property(get, set) + +def Array(typecode, sequence, lock=True): + return array.array(typecode, sequence) + +# +# Proxy types used by SyncManager +# + +class IteratorProxy(BaseProxy): + # XXX remove methods for Py3.0 and Py2.6 + _exposed_ = ('__next__', 'next', 'send', 'throw', 'close') + def __iter__(self): + return self + def __next__(self, *args): + return self._callmethod('__next__', args) + def next(self, *args): + return self._callmethod('next', args) + def send(self, *args): + return self._callmethod('send', args) + def throw(self, *args): + return self._callmethod('throw', args) + def close(self, *args): + return self._callmethod('close', args) + + +class AcquirerProxy(BaseProxy): + _exposed_ = ('acquire', 'release') + def acquire(self, blocking=True): + return self._callmethod('acquire', (blocking,)) + def release(self): + return self._callmethod('release') + def __enter__(self): + return self._callmethod('acquire') + def __exit__(self, exc_type, exc_val, exc_tb): + return self._callmethod('release') + + +class ConditionProxy(AcquirerProxy): + # XXX will Condition.notfyAll() name be available in Py3.0? + _exposed_ = ('acquire', 'release', 'wait', 'notify', 'notifyAll') + def wait(self, timeout=None): + return self._callmethod('wait', (timeout,)) + def notify(self): + return self._callmethod('notify') + def notify_all(self): + return self._callmethod('notifyAll') + +class EventProxy(BaseProxy): + # XXX will Event.isSet name be available in Py3.0? + _exposed_ = ('isSet', 'set', 'clear', 'wait') + def is_set(self): + return self._callmethod('isSet') + def set(self): + return self._callmethod('set') + def clear(self): + return self._callmethod('clear') + def wait(self, timeout=None): + return self._callmethod('wait', (timeout,)) + +class NamespaceProxy(BaseProxy): + _exposed_ = ('__getattribute__', '__setattr__', '__delattr__') + def __getattr__(self, key): + if key[0] == '_': + return object.__getattribute__(self, key) + callmethod = object.__getattribute__(self, '_callmethod') + return callmethod('__getattribute__', (key,)) + def __setattr__(self, key, value): + if key[0] == '_': + return object.__setattr__(self, key, value) + callmethod = object.__getattribute__(self, '_callmethod') + return callmethod('__setattr__', (key, value)) + def __delattr__(self, key): + if key[0] == '_': + return object.__delattr__(self, key) + callmethod = object.__getattribute__(self, '_callmethod') + return callmethod('__delattr__', (key,)) + + +class ValueProxy(BaseProxy): + _exposed_ = ('get', 'set') + def get(self): + return self._callmethod('get') + def set(self, value): + return self._callmethod('set', (value,)) + value = property(get, set) + + +BaseListProxy = MakeProxyType('BaseListProxy', ( + '__add__', '__contains__', '__delitem__', '__delslice__', + '__getitem__', '__getslice__', '__len__', '__mul__', + '__reversed__', '__rmul__', '__setitem__', '__setslice__', + 'append', 'count', 'extend', 'index', 'insert', 'pop', 'remove', + 'reverse', 'sort', '__imul__' + )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 +class ListProxy(BaseListProxy): + def __iadd__(self, value): + self._callmethod('extend', (value,)) + return self + def __imul__(self, value): + self._callmethod('__imul__', (value,)) + return self + + +DictProxy = MakeProxyType('DictProxy', ( + '__contains__', '__delitem__', '__getitem__', '__len__', + '__setitem__', 'clear', 'copy', 'get', 'has_key', 'items', + 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values' + )) + + +ArrayProxy = MakeProxyType('ArrayProxy', ( + '__len__', '__getitem__', '__setitem__', '__getslice__', '__setslice__' + )) # XXX __getslice__ and __setslice__ unneeded in Py3.0 + + +PoolProxy = MakeProxyType('PoolProxy', ( + 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', + 'map', 'map_async', 'terminate' + )) +PoolProxy._method_to_typeid_ = { + 'apply_async': 'AsyncResult', + 'map_async': 'AsyncResult', + 'imap': 'Iterator', + 'imap_unordered': 'Iterator' + } + +# +# Definition of SyncManager +# + +class SyncManager(BaseManager): + ''' + Subclass of `BaseManager` which supports a number of shared object types. + + The types registered are those intended for the synchronization + of threads, plus `dict`, `list` and `Namespace`. + + The `multiprocessing.Manager()` function creates started instances of + this class. + ''' + +SyncManager.register('Queue', queue.Queue) +SyncManager.register('JoinableQueue', queue.Queue) +SyncManager.register('Event', threading.Event, EventProxy) +SyncManager.register('Lock', threading.Lock, AcquirerProxy) +SyncManager.register('RLock', threading.RLock, AcquirerProxy) +SyncManager.register('Semaphore', threading.Semaphore, AcquirerProxy) +SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, + AcquirerProxy) +SyncManager.register('Condition', threading.Condition, ConditionProxy) +SyncManager.register('Pool', Pool, PoolProxy) +SyncManager.register('list', list, ListProxy) +SyncManager.register('dict', dict, DictProxy) +SyncManager.register('Value', Value, ValueProxy) +SyncManager.register('Array', Array, ArrayProxy) +SyncManager.register('Namespace', Namespace, NamespaceProxy) + +# types returned by methods of PoolProxy +SyncManager.register('Iterator', proxytype=IteratorProxy, create_method=False) +SyncManager.register('AsyncResult', create_method=False) Index: Lib/multiprocessing/util.py =================================================================== --- Lib/multiprocessing/util.py (revision 64104) +++ Lib/multiprocessing/util.py (working copy) @@ -1,336 +1,336 @@ -# -# Module providing various facilities to other parts of the package -# -# multiprocessing/util.py -# -# Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt -# - -import itertools -import weakref -import copy_reg -import atexit -import threading # we want threading to install it's - # cleanup function before multiprocessing does - -from multiprocessing.process import current_process, active_children - -__all__ = [ - 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', - 'log_to_stderr', 'get_temp_dir', 'register_after_fork', - 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal' - ] - -# -# Logging -# - -NOTSET = 0 -SUBDEBUG = 5 -DEBUG = 10 -INFO = 20 -SUBWARNING = 25 - -LOGGER_NAME = 'multiprocessing' -DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s' - -_logger = None -_log_to_stderr = False - -def sub_debug(msg, *args): - if _logger: - _logger.log(SUBDEBUG, msg, *args) - -def debug(msg, *args): - if _logger: - _logger.log(DEBUG, msg, *args) - -def info(msg, *args): - if _logger: - _logger.log(INFO, msg, *args) - -def sub_warning(msg, *args): - if _logger: - _logger.log(SUBWARNING, msg, *args) - -def get_logger(): - ''' - Returns logger used by multiprocessing - ''' - global _logger - - if not _logger: - import logging, atexit - - # XXX multiprocessing should cleanup before logging - if hasattr(atexit, 'unregister'): - atexit.unregister(_exit_function) - atexit.register(_exit_function) - else: - atexit._exithandlers.remove((_exit_function, (), {})) - atexit._exithandlers.append((_exit_function, (), {})) - - _check_logger_class() - _logger = logging.getLogger(LOGGER_NAME) - - return _logger - -def _check_logger_class(): - ''' - Make sure process name is recorded when loggers are used - ''' - # XXX This function is unnecessary once logging is patched - import logging - if hasattr(logging, 'multiprocessing'): - return - - logging._acquireLock() - try: - OldLoggerClass = logging.getLoggerClass() - if not getattr(OldLoggerClass, '_process_aware', False): - class ProcessAwareLogger(OldLoggerClass): - _process_aware = True - def makeRecord(self, *args, **kwds): - record = OldLoggerClass.makeRecord(self, *args, **kwds) - record.processName = current_process()._name - return record - logging.setLoggerClass(ProcessAwareLogger) - finally: - logging._releaseLock() - -def log_to_stderr(level=None): - ''' - Turn on logging and add a handler which prints to stderr - ''' - global _log_to_stderr - import logging - logger = get_logger() - formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT) - handler = logging.StreamHandler() - handler.setFormatter(formatter) - logger.addHandler(handler) - if level is not None: - logger.setLevel(level) - _log_to_stderr = True - -# -# Function returning a temp directory which will be removed on exit -# - -def get_temp_dir(): - # get name of a temp directory which will be automatically cleaned up - if current_process()._tempdir is None: - import shutil, tempfile - tempdir = tempfile.mkdtemp(prefix='pymp-') - info('created temp directory %s', tempdir) - Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100) - current_process()._tempdir = tempdir - return current_process()._tempdir - -# -# Support for reinitialization of objects when bootstrapping a child process -# - -_afterfork_registry = weakref.WeakValueDictionary() -_afterfork_counter = itertools.count() - -def _run_after_forkers(): - items = list(_afterfork_registry.items()) - items.sort() - for (index, ident, func), obj in items: - try: - func(obj) - except Exception, e: - info('after forker raised exception %s', e) - -def register_after_fork(obj, func): - _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj - -# -# Finalization using weakrefs -# - -_finalizer_registry = {} -_finalizer_counter = itertools.count() - - -class Finalize(object): - ''' - Class which supports object finalization using weakrefs - ''' - def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): - assert exitpriority is None or type(exitpriority) is int - - if obj is not None: - self._weakref = weakref.ref(obj, self) - else: - assert exitpriority is not None - - self._callback = callback - self._args = args - self._kwargs = kwargs or {} - self._key = (exitpriority, _finalizer_counter.next()) - - _finalizer_registry[self._key] = self - - def __call__(self, wr=None): - ''' - Run the callback unless it has already been called or cancelled - ''' - try: - del _finalizer_registry[self._key] - except KeyError: - sub_debug('finalizer no longer registered') - else: - sub_debug('finalizer calling %s with args %s and kwargs %s', - self._callback, self._args, self._kwargs) - res = self._callback(*self._args, **self._kwargs) - self._weakref = self._callback = self._args = \ - self._kwargs = self._key = None - return res - - def cancel(self): - ''' - Cancel finalization of the object - ''' - try: - del _finalizer_registry[self._key] - except KeyError: - pass - else: - self._weakref = self._callback = self._args = \ - self._kwargs = self._key = None - - def still_active(self): - ''' - Return whether this finalizer is still waiting to invoke callback - ''' - return self._key in _finalizer_registry - - def __repr__(self): - try: - obj = self._weakref() - except (AttributeError, TypeError): - obj = None - - if obj is None: - return '' - - x = '</condition(%s,></rlock(%s,></lock(owner=%s)></boundedsemaphore(value=%s,></semaphore(value=%s)></condition(%s,></rlock(%s,></lock(owner=%s)></boundedsemaphore(value=%s,></semaphore(value=%s)>