cpython: 3b82e0d83bf9 (original) (raw)

deleted file mode 100644 --- a/Doc/includes/mp_benchmarks.py +++ /dev/null @@ -1,239 +0,0 @@ -# -# Simple benchmarks for the multiprocessing package -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - -import time -import multiprocessing -import threading -import queue -import gc - -_timer = time.perf_counter - -delta = 1 - - -#### TEST_QUEUESPEED - -def queuespeed_func(q, c, iterations):

-

-

- -def test_queuespeed(Process, q, c):

-

-

-

-

-

-

-

- - -#### TEST_PIPESPEED - -def pipe_func(c, cond, iterations):

-

-

- -def test_pipespeed():

-

-

-

-

-

-

- - -#### TEST_SEQSPEED - -def test_seqspeed(seq):

-

-

-

-

-

- - -#### TEST_LOCK - -def test_lockspeed(l):

-

-

-

-

-

- - -#### TEST_CONDITION - -def conditionspeed_func(c, N):

-

-

- -def test_conditionspeed(Process, c):

-

-

-

-

-

-

-

-

- -#### - -def test():

-

-

-

-

-

-

-

-

-

- -if name == 'main':

--- a/Doc/includes/mp_newtype.py +++ b/Doc/includes/mp_newtype.py @@ -1,11 +1,3 @@ -# -# This module shows how to use arbitrary callables with a subclass of -# BaseManager. -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - from multiprocessing import freeze_support from multiprocessing.managers import BaseManager, BaseProxy import operator @@ -27,12 +19,10 @@ def baz():

Proxy type for generator objects

class GeneratorProxy(BaseProxy):

Function to return the operator module

@@ -90,8 +80,6 @@ def test(): op = manager.operator() print('op.add(23, 45) =', op.add(23, 45)) print('op.pow(2, 94) =', op.pow(2, 94))

##

--- a/Doc/includes/mp_pool.py +++ b/Doc/includes/mp_pool.py @@ -1,10 +1,3 @@ -# -# A test of multiprocessing.Pool class -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - import multiprocessing import time import random @@ -46,269 +39,115 @@ def noop(x): # def test():

-

- PROCESSES = 4 print('Creating pool with %d processes\n' % PROCESSES)

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

if name == 'main': multiprocessing.freeze_support() -

-

- test()

deleted file mode 100644 --- a/Doc/includes/mp_synchronize.py +++ /dev/null @@ -1,278 +0,0 @@ -# -# A test file for the multiprocessing package -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - -import time -import sys -import random -from queue import Empty - -import multiprocessing # may get overwritten - - -#### TEST_VALUE - -def value_func(running, mutex):

-

- -def test_value():

-

-

-

- - -#### TEST_QUEUE - -def queue_func(queue):

- -def test_queue():

-

-

-

- - -#### TEST_CONDITION - -def condition_func(cond):

- -def test_condition():

-

-

-

-

-

-

- - -#### TEST_SEMAPHORE - -def semaphore_func(sema, mutex, running):

-

-

-

-

- -def test_semaphore():

-

-

-

- - -#### TEST_JOIN_TIMEOUT - -def join_timeout_func():

- -def test_join_timeout():

-

-

- - -#### TEST_EVENT - -def event_func(event):

- -def test_event():

-

-

-

-

-

- - -#### TEST_SHAREDVALUES - -def sharedvalues_func(values, arrays, shared_values, shared_arrays):

-

-

- -def test_sharedvalues():

-

-

-

- - -#### - -def test(namespace=multiprocessing):

-

-

-

-

- - -if name == 'main':

-

-

-

deleted file mode 100644 --- a/Doc/includes/mp_webserver.py +++ /dev/null @@ -1,70 +0,0 @@ -# -# Example where a pool of http servers share a single listening socket -# -# On Windows this module depends on the ability to pickle a socket -# object so that the worker processes can inherit a copy of the server -# object. (We import multiprocessing.reduction to enable this pickling.) -# -# Not sure if we should synchronize access to socket.accept() method by -# using a process-shared lock -- does not seem to be necessary. -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - -import os -import sys - -from multiprocessing import Process, current_process, freeze_support -from http.server import HTTPServer -from http.server import SimpleHTTPRequestHandler - -if sys.platform == 'win32':

- - -def note(format, *args):

- - -class RequestHandler(SimpleHTTPRequestHandler):

- -def serve_forever(server):

- - -def runpool(address, number_of_processes):

-

-

- - -def test():

-

-

- - -if name == 'main':

--- a/Doc/includes/mp_workers.py +++ b/Doc/includes/mp_workers.py @@ -1,16 +1,3 @@ -# -# Simple example which uses a pool of workers to carry out some tasks. -# -# Notice that the results will probably not come out of the output -# queue in the same in the same order as the corresponding tasks were -# put on the input queue. If it is important to get the results back -# in the original order then consider using Pool.map() or -# Pool.imap() (which will save on the amount of code needed anyway). -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - import time import random

--- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -93,11 +93,80 @@ To show the individual process IDs invol p.start() p.join() -For an explanation of why (on Windows) the if __name__ == '__main__' part is +For an explanation of why the if __name__ == '__main__' part is necessary, see :ref:multiprocessing-programming. +Start methods +~~~~~~~~~~~~~ + +Depending on the platform, :mod:multiprocessing supports three ways +to start a process. These start methods are +

+

+

+

+

+

+ +Before Python 3.4 fork was the only option available on Unix. Also, +prior to Python 3.4, child processes would inherit all the parents +inheritable handles on Windows. + +On Unix using the spawn or forkserver start methods will also +start a semaphore tracker process which tracks the unlinked named +semaphores created by processes of the program. When all processes +have exited the semaphore tracker unlinks any remaining semaphores. +Usually there should be none, but if a process was killed by a signal +there may some "leaked" semaphores. (Unlinking the named semaphores +is a serious matter since the system allows only a limited number, and +they will not be automatically unlinked until the next reboot.) + +To select the a start method you use the :func:set_start_method in +the if __name__ == '__main__' clause of the main module. For +example:: +

+

+

+ +:func:set_start_method should not be used more than once in the +program. + + + Exchanging objects between processes

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -274,15 +343,31 @@ processes in a few different ways. For example:: from multiprocessing import Pool

if name == 'main':

+

+

+

+

+

Note that the methods of a pool should only ever be used by the process which created it. @@ -763,6 +848,24 @@ Miscellaneous If the module is being run normally by the Python interpreter then :func:freeze_support has no effect. +.. function:: get_all_start_methods() +

+.. function:: get_start_method() +

.. function:: set_executable() Sets the path of the Python interpreter to use when starting a child process. @@ -771,8 +874,21 @@ Miscellaneous set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

+ +.. function:: set_start_method(method) +

.. note:: @@ -2175,43 +2291,8 @@ Below is an example session with logging [INFO/MainProcess] sending shutdown message to manager [INFO/SyncManager-...] manager exiting with exitcode 0 -In addition to having these two logging functions, the multiprocessing also -exposes two additional logging level attributes. These are :const:SUBWARNING -and :const:SUBDEBUG. The table below illustrates where theses fit in the -normal level hierarchy. - -+----------------+----------------+ -| Level | Numeric value | -+================+================+ -| SUBWARNING | 25 | -+----------------+----------------+ -| SUBDEBUG | 5 | -+----------------+----------------+ - For a full table of logging levels, see the :mod:logging module. -These additional logging levels are used primarily for certain debug messages -within the multiprocessing module. Below is the same example as above, except -with :const:SUBDEBUG enabled:: -

The :mod:multiprocessing.dummy module

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -2232,8 +2313,10 @@ There are certain guidelines and idioms :mod:multiprocessing. -All platforms -~~~~~~~~~~~~~ +All start methods +~~~~~~~~~~~~~~~~~ + +The following applies to all start methods. Avoid shared state @@ -2266,11 +2349,13 @@ Joining zombie processes Better to inherit than pickle/unpickle

Avoid terminating processes @@ -2314,15 +2399,17 @@ Joining processes that use queues Explicitly pass resources to child processes

-

+

So for instance :: @@ -2381,17 +2468,19 @@ Beware of replacing :data:sys.stdin wi For more information, see :issue:5155, :issue:5313 and :issue:5331 -Windows -~~~~~~~ - -Since Windows lacks :func:os.fork it has a few extra restrictions: +The spawn and forkserver start methods +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +There are a few extra restriction which don't apply to the fork +start method. More picklability

Also, if you subclass :class:Process then make sure that instances will be picklable when the :meth:Process.start method is called. @@ -2411,7 +2500,8 @@ Safe importing of main module interpreter without causing unintended side effects (such a starting a new process).

@@ -2425,13 +2515,14 @@ Safe importing of main module Instead one should protect the "entry point" of the program by using if[](#l7.312) __name__ == '__main__': as follows::

def foo(): print('hello') if name == 'main': freeze_support()

@@ -2462,26 +2553,7 @@ Using :class:Pool: :language: python3 -Synchronization types like locks, conditions and queues: - -.. literalinclude:: ../includes/mp_synchronize.py

- An example showing how to use queues to feed tasks to a collection of worker processes and collect the results: .. literalinclude:: ../includes/mp_workers.py - - -An example of how a pool of worker processes can each run a -:class:~http.server.SimpleHTTPRequestHandler instance while sharing a single -listening socket. - -.. literalinclude:: ../includes/mp_webserver.py - - -Some simple benchmarks comparing :mod:multiprocessing with :mod:threading: - -.. literalinclude:: ../includes/mp_benchmarks.py -

--- a/Doc/whatsnew/3.4.rst +++ b/Doc/whatsnew/3.4.rst @@ -108,6 +108,8 @@ Significantly Improved Library Modules:

--- a/Lib/multiprocessing/init.py +++ b/Lib/multiprocessing/init.py @@ -21,6 +21,8 @@ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool', 'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',

# @@ -30,8 +32,14 @@ import os import sys -from multiprocessing.process import Process, current_process, active_children -from multiprocessing.util import SUBDEBUG, SUBWARNING +from .process import Process, current_process, active_children + +# +# XXX These should not really be documented or public. +# + +SUBDEBUG = 5 +SUBWARNING = 25 #

Alias for main module -- will be reset by bootstrapping child processes

@@ -56,8 +64,6 @@ class TimeoutError(ProcessError): class AuthenticationError(ProcessError): pass -import _multiprocessing - #

Definitions not depending on native semaphores

# @@ -69,7 +75,7 @@ def Manager(): The managers methods such as Lock(), Condition() and Queue() can be used to create shared objects. '''

def get_logger(): ''' Return package logger -- if it does not already exist then it is created '''

#

Definitions depending on native semaphores

@@ -130,120 +136,151 @@ def Lock(): ''' Returns a non-recursive lock object '''

# # # -if sys.platform == 'win32': +def set_executable(executable):

+ +def set_start_method(method):

+def get_start_method():

+ +def get_all_start_methods():

+def set_forkserver_preload(module_names):

--- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -21,9 +21,13 @@ import tempfile import itertools import _multiprocessing -from multiprocessing import current_process, AuthenticationError, BufferTooShort -from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug -from multiprocessing.forking import ForkingPickler + +from . import reduction +from . import util + +from . import AuthenticationError, BufferTooShort +from .reduction import ForkingPickler + try: import _winapi from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE @@ -71,7 +75,7 @@ def arbitrary_address(family): if family == 'AF_INET': return ('localhost', 0) elif family == 'AF_UNIX':

@@ -505,7 +509,7 @@ if sys.platform != 'win32': c1 = Connection(s1.detach()) c2 = Connection(s2.detach()) else:

@@ -577,7 +581,7 @@ class SocketListener(object): self._last_accepted = None if family == 'AF_UNIX':

@@ -625,8 +629,8 @@ if sys.platform == 'win32': self._handle_queue = [self._new_handle(first=True)] self._last_accepted = None

@@ -668,7 +672,7 @@ if sys.platform == 'win32': @staticmethod def _finalize_pipe_listener(queue, address):

@@ -919,15 +923,32 @@ else: # if sys.platform == 'win32':

+

+ else:

--- a/Lib/multiprocessing/dummy/init.py +++ b/Lib/multiprocessing/dummy/init.py @@ -22,7 +22,7 @@ import sys import weakref import array -from multiprocessing.dummy.connection import Pipe +from .connection import Pipe from threading import Lock, RLock, Semaphore, BoundedSemaphore from threading import Event, Condition, Barrier from queue import Queue @@ -113,7 +113,7 @@ def shutdown(): pass def Pool(processes=None, initializer=None, initargs=()):

deleted file mode 100644 --- a/Lib/multiprocessing/forking.py +++ /dev/null @@ -1,477 +0,0 @@ -# -# Module for starting a process object using os.fork() or CreateProcess() -# -# multiprocessing/forking.py -# -# Copyright (c) 2006-2008, R Oudkerk -# Licensed to PSF under a Contributor Agreement. -# - -import io -import os -import pickle -import sys -import signal -import errno - -from multiprocessing import util, process - -all = ['Popen', 'assert_spawning', 'duplicate', 'close', 'ForkingPickler'] - -# -# Check that the current thread is spawning a child process -# - -def assert_spawning(self):

- -# -# Try making some callable types picklable -# - -from pickle import Pickler -from copyreg import dispatch_table - -class ForkingPickler(Pickler):

-

-

- - -def _reduce_method(m):

-class _C:

-ForkingPickler.register(type(_C().f), _reduce_method) - - -def _reduce_method_descriptor(m):

-ForkingPickler.register(type(list.append), _reduce_method_descriptor) -ForkingPickler.register(type(int.add), _reduce_method_descriptor) - -try:

-except ImportError:

-else:

- -# -# Unix -# - -if sys.platform != 'win32':

-

-

-

-

-

-

-

-

-

-

- -# -# Windows -# - -else:

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

-

- -

- -

-

-

-

-

- -

-

-

-

-

- -

-

-

-

-

- -# -# Prepare current process -# - -old_main_modules = [] - -def prepare(data):

-

-

-

-

-

-

-

-

-

-

-

-

-

new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/forkserver.py @@ -0,0 +1,238 @@ +import errno +import os +import select +import signal +import socket +import struct +import sys +import threading + +from . import connection +from . import process +from . import reduction +from . import spawn +from . import util + +all = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',

+ +# +# +# + +MAXFDS_TO_SEND = 256 +UNSIGNED_STRUCT = struct.Struct('Q') # large enough for pid_t + +_inherited_fds = None +_lock = threading.Lock() +_preload_modules = ['main'] + + +# +# Public function +# + +def set_forkserver_preload(modules_names):

+ + +def get_inherited_fds():

+

+ + +def connect_to_new_process(fds):

+

+ + +def ensure_running():

+

+

+

+

+

+ + +def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):

+

+

+

+

+

+

+ +# +# Code to bootstrap new process +# + +def _serve_one(s, listener, alive_r, handler):

+

+

+

+

+

+

+ +# +# Read and write unsigned numbers +# + +def read_unsigned(fd):

+ +def write_unsigned(fd, n):

--- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -8,15 +8,17 @@ # import bisect +import itertools import mmap import os import sys +import tempfile import threading -import itertools +import _multiprocessing -import _multiprocessing -from multiprocessing.util import Finalize, info -from multiprocessing.forking import assert_spawning +from . import popen +from . import reduction +from . import util all = ['BufferWrapper'] @@ -30,17 +32,25 @@ if sys.platform == 'win32': class Arena(object):

def init(self, size): self.size = size

def getstate(self):

def setstate(self, state): @@ -52,10 +62,28 @@ else: class Arena(object):

+

+

+

#

Class allowing allocation of chunks of memory from arenas

@@ -90,7 +118,7 @@ class Heap(object): if i == len(self._lengths): length = self._roundup(max(self._size, size), mmap.PAGESIZE) self._size *= 2

@@ -216,7 +244,7 @@ class BufferWrapper(object): assert 0 <= size < sys.maxsize block = BufferWrapper._heap.malloc(size) self._state = (block, size)

def create_memoryview(self): (arena, start, stop), size = self._state

--- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -19,11 +19,15 @@ import threading import array import queue +from time import time as _time from traceback import format_exc -from multiprocessing import Process, current_process, active_children, Pool, util, connection -from multiprocessing.process import AuthenticationString -from multiprocessing.forking import Popen, ForkingPickler -from time import time as _time + +from . import connection +from . import pool +from . import process +from . import popen +from . import reduction +from . import util #

Register some things for pickling

@@ -31,16 +35,14 @@ from time import time as _time def reduce_array(a): return array.array, (a.typecode, a.tobytes()) -ForkingPickler.register(array.array, reduce_array) +reduction.register(array.array, reduce_array) view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] if view_types[0] is not list: # only needed in Py3.0 def rebuild_as_list(obj): return list, (list(obj),) for view_type in view_types:

#

Type for identifying shared objects

@@ -130,7 +132,7 @@ class Server(object): def init(self, registry, address, authkey, serializer): assert isinstance(authkey, bytes) self.registry = registry

# do authentication later @@ -146,7 +148,7 @@ class Server(object): Run the server forever ''' self.stop_event = threading.Event()

@@ -438,9 +440,9 @@ class BaseManager(object): def init(self, address=None, authkey=None, serializer='pickle'): if authkey is None:

@@ -476,7 +478,7 @@ class BaseManager(object): reader, writer = connection.Pipe(duplex=False) # spawn process which runs a server

@@ -691,11 +693,11 @@ class BaseProxy(object): self._Client = listener_client[serializer][1] if authkey is not None:

if incref: self._incref() @@ -704,7 +706,7 @@ class BaseProxy(object): def _connect(self): util.debug('making connection to manager')

@@ -798,7 +800,7 @@ class BaseProxy(object): def reduce(self): kwds = {}

if getattr(self, '_isauto', False): @@ -835,14 +837,14 @@ def RebuildProxy(func, token, serializer If possible the shared object is returned, or otherwise a proxy for it. '''

if server and server.address == token.address: return server.id_to_obj[token.id][0] else: incref = ( kwds.pop('incref', True) and

@@ -889,7 +891,7 @@ def AutoProxy(token, serializer, manager if authkey is None and manager is not None: authkey = manager._authkey if authkey is None:

ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, @@ -1109,7 +1111,7 @@ SyncManager.register('BoundedSemaphore', AcquirerProxy) SyncManager.register('Condition', threading.Condition, ConditionProxy) SyncManager.register('Barrier', threading.Barrier, BarrierProxy) -SyncManager.register('Pool', Pool, PoolProxy) +SyncManager.register('Pool', pool.Pool, PoolProxy) SyncManager.register('list', list, ListProxy) SyncManager.register('dict', dict, DictProxy) SyncManager.register('Value', Value, ValueProxy)

--- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -7,7 +7,7 @@

Licensed to PSF under a Contributor Agreement.

# -all = ['Pool'] +all = ['Pool', 'ThreadPool'] #

Imports

@@ -21,8 +21,10 @@ import os import time import traceback -from multiprocessing import Process, TimeoutError -from multiprocessing.util import Finalize, debug +# If threading is available then ThreadPool should be provided. Therefore +# we avoid top-level imports which are liable to fail on some systems. +from . import util +from . import Process, cpu_count, TimeoutError, SimpleQueue #

Constants representing the state of a pool

@@ -104,11 +106,11 @@ def worker(inqueue, outqueue, initialize try: task = get() except (EOFError, OSError):

if task is None:

job, i, func, args, kwds = task @@ -121,11 +123,11 @@ def worker(inqueue, outqueue, initialize put((job, i, result)) except Exception as e: wrapped = MaybeEncodingError(e, result[1])

#

Class representing a process pool

@@ -184,7 +186,7 @@ class Pool(object): self._result_handler._state = RUN self._result_handler.start()

@@ -201,7 +203,7 @@ class Pool(object): worker = self._pool[i] if worker.exitcode is not None: # worker exited

@@ -221,7 +223,7 @@ class Pool(object): w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start()

def _maintain_pool(self): """Clean up any exited workers and start replacements for them. @@ -230,7 +232,6 @@ class Pool(object): self._repopulate_pool() def _setup_queues(self):

@@ -358,7 +359,7 @@ class Pool(object): time.sleep(0.1) # send sentinel to stop workers pool._taskqueue.put(None)

@staticmethod def _handle_tasks(taskqueue, put, outqueue, pool): @@ -368,36 +369,36 @@ class Pool(object): i = -1 for i, task in enumerate(taskseq): if thread._state:

try: # tell result handler to finish when cache is empty

# tell workers there is no more work

@staticmethod def _handle_results(outqueue, get, cache): @@ -407,16 +408,16 @@ class Pool(object): try: task = get() except (OSError, EOFError):

if thread._state: assert thread._state == TERMINATE

if task is None:

job, i, obj = task @@ -429,11 +430,11 @@ class Pool(object): try: task = get() except (OSError, EOFError):

if task is None:

@@ -442,7 +443,7 @@ class Pool(object): pass if hasattr(outqueue, '_reader'):

@@ -454,7 +455,7 @@ class Pool(object): except (OSError, EOFError): pass

@staticmethod @@ -472,19 +473,19 @@ class Pool(object): ) def close(self):

def terminate(self):

def join(self):

@@ -495,7 +496,7 @@ class Pool(object): @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # task_handler may be blocked trying to put items on inqueue

@@ -505,12 +506,12 @@ class Pool(object): def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, worker_handler, task_handler, result_handler, cache): # this is guaranteed to only be called once

worker_handler._state = TERMINATE task_handler._state = TERMINATE

assert result_handler.is_alive() or len(cache) == 0 @@ -520,31 +521,31 @@ class Pool(object): # We must wait for the worker handler to exit before terminating # workers because we don't want workers to be restarted behind our back.

# Terminate workers which haven't already finished. if pool and hasattr(pool[0], 'terminate'):

if pool and hasattr(pool[0], 'terminate'):

def enter(self): @@ -730,7 +731,10 @@ class IMapUnorderedIterator(IMapIterator class ThreadPool(Pool):

def init(self, processes=None, initializer=None, initargs=()): Pool.init(self, processes, initializer, initargs)

new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/popen.py @@ -0,0 +1,78 @@ +import sys +import threading + +all = ['Popen', 'get_spawning_popen', 'set_spawning_popen',

+ +# +# Check that the current thread is spawning a child process +# + +_tls = threading.local() + +def get_spawning_popen():

+ +def set_spawning_popen(popen):

+ +def assert_spawning(obj):

+ +# +# +# + +_Popen = None + +def Popen(process_obj):

+ +def get_start_method():

+ +def set_start_method(meth=None, *, start_helpers=True):

+ + +if sys.platform == 'win32': +

+

+ +else:

+

new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/popen_fork.py @@ -0,0 +1,87 @@ +import os +import sys +import signal +import errno + +from . import util + +all = ['Popen'] + +# +# Start child process using fork +# + +class Popen(object):

+

+

+

+

+

+

+

new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/popen_forkserver.py @@ -0,0 +1,75 @@ +import io +import os + +from . import reduction +if not reduction.HAVE_SEND_HANDLE:

+from . import forkserver +from . import popen +from . import popen_fork +from . import spawn +from . import util + + +all = ['Popen'] + +# +# Wrapper for an fd used while launching a process +# + +class _DupFd(object):

+ +# +# Start child process using a server process +# + +class Popen(popen_fork.Popen):

+

+

+

+

+

+

new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/popen_spawn_posix.py @@ -0,0 +1,75 @@ +import fcntl +import io +import os + +from . import popen +from . import popen_fork +from . import reduction +from . import spawn +from . import util + +from . import current_process + +all = ['Popen'] + + +# +# Wrapper for an fd used while launching a process +# + +class _DupFd(object):

+ +# +# Start child process using a fresh interpreter +# + +class Popen(popen_fork.Popen):

+

+

+

+

+

new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/popen_spawn_win32.py @@ -0,0 +1,102 @@ +import os +import msvcrt +import signal +import sys +import _winapi + +from . import spawn +from . import popen +from . import reduction +from . import util + +all = ['Popen'] + +# +# +# + +TERMINATE = 0x10000 +WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) +WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") + +# +# We define a Popen class similar to the one from subprocess, but +# whose constructor takes a process object as its argument. +# + +class Popen(object):

+

+

+

+

+

+

+

+

+

+

+

+

--- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -43,7 +43,7 @@ def active_children(): Return list of process objects corresponding to live child processes ''' _cleanup()

# # @@ -51,9 +51,9 @@ def active_children(): def _cleanup(): # check for processes which have finished

#

The Process class

@@ -63,21 +63,16 @@ class Process(object): ''' Process objects represent activity that is run in a separate process

@@ -85,6 +80,8 @@ class Process(object): self._kwargs = dict(kwargs) self._name = name or type(self).name + '-' + [](#l22.51) ':'.join(str(i) for i in self._identity)

def run(self): @@ -101,16 +98,16 @@ class Process(object): assert self._popen is None, 'cannot start a process twice' assert self._parent_pid == os.getpid(), [](#l22.60) 'can only start a process object created by current process'

def terminate(self): ''' @@ -126,7 +123,7 @@ class Process(object): assert self._popen is not None, 'can only join a started process' res = self._popen.wait(timeout) if res is not None:

def is_alive(self): ''' @@ -154,7 +151,7 @@ class Process(object): ''' Return whether process is a daemon '''

@daemon.setter def daemon(self, daemonic): @@ -162,18 +159,18 @@ class Process(object): Set whether process is a daemon ''' assert self._popen is None, 'process has already started'

@property def authkey(self):

@authkey.setter def authkey(self, authkey): ''' Set authorization key of process '''

@property def exitcode(self): @@ -227,17 +224,17 @@ class Process(object): status = 'stopped[%s]' % _exitcode_to_name.get(status, status) return '<%s(%s, %s%s)>' % (type(self).name, self._name,

## def _bootstrap(self): from . import util

try:

@@ -285,8 +282,8 @@ class Process(object): class AuthenticationString(bytes): def reduce(self):

@@ -301,16 +298,19 @@ class _MainProcess(Process): def init(self): self._identity = ()

+ _current_process = _MainProcess() +_process_counter = itertools.count(1) +_children = set() del _MainProcess #

--- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -18,11 +18,15 @@ import weakref import errno from queue import Empty, Full + import _multiprocessing -from multiprocessing.connection import Pipe -from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition -from multiprocessing.util import debug, info, Finalize, register_after_fork -from multiprocessing.forking import assert_spawning, ForkingPickler + +from . import connection +from . import popen +from . import synchronize + +from .util import debug, info, Finalize, register_after_fork, is_exiting +from .reduction import ForkingPickler #

Queue type using a pipe, buffer and thread

@@ -34,14 +38,14 @@ class Queue(object): if maxsize <= 0: maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX self._maxsize = maxsize

@@ -51,7 +55,7 @@ class Queue(object): register_after_fork(self, Queue._after_fork) def getstate(self):

@@ -208,8 +212,6 @@ class Queue(object): @staticmethod def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe')

- nacquire = notempty.acquire nrelease = notempty.release nwait = notempty.wait @@ -279,8 +281,8 @@ class JoinableQueue(Queue): def init(self, maxsize=0): Queue.init(self, maxsize)

def getstate(self): return Queue.getstate(self) + (self._cond, self._unfinished_tasks) @@ -331,19 +333,19 @@ class JoinableQueue(Queue): class SimpleQueue(object): def init(self):

def empty(self): return not self._poll() def getstate(self):

def setstate(self, state):

--- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -1,6 +1,5 @@ # -# Module to allow connection and socket objects to be transferred -# between processes +# Module which deals with pickling of objects. #

multiprocessing/reduction.py

# @@ -8,27 +7,57 @@

Licensed to PSF under a Contributor Agreement.

# -all = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] - +import copyreg +import functools +import io import os -import sys +import pickle import socket -import threading -import struct -import signal +import sys -from multiprocessing import current_process -from multiprocessing.util import register_after_fork, debug, sub_debug -from multiprocessing.util import is_exiting, sub_warning +from . import popen +from . import util + +all = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] +HAVE_SEND_HANDLE = (sys.platform == 'win32' or

+ # -# +# Pickler subclass # -if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and

+class ForkingPickler(pickle.Pickler):

+

+

+

+

+ +register = ForkingPickler.register + +def dump(obj, file, protocol=None):

#

Platform specific definitions

@@ -36,20 +65,44 @@ if not(sys.platform == 'win32' or (hasat if sys.platform == 'win32': # Windows

+

+ def send_handle(conn, handle, destination_pid):

def recv_handle(conn):

class DupHandle(object):

@@ -62,9 +115,12 @@ if sys.platform == 'win32': self._pid = pid def detach(self):

@@ -74,207 +130,112 @@ if sys.platform == 'win32': finally: _winapi.CloseHandle(proc)

-

-

-

-

-

-

-

- else: # Unix

# On MacOSX we should acknowledge receipt of fds -- see Issue14669 ACKNOWLEDGE = sys.platform == 'darwin'

+

+ def send_handle(conn, handle, destination_pid):

def recv_handle(conn):

-

-

-

-

-

# -# Server which shares registered resources with clients +# Try making some callable types picklable # -class ResourceSharer(object):

-

+def _reduce_method(m):

+class _C:

+register(type(_C().f), _reduce_method)

+def _reduce_method_descriptor(m):

+register(type(list.append), _reduce_method_descriptor) +register(type(int.add), _reduce_method_descriptor) +

+def _reduce_partial(p):

+def _rebuild_partial(func, args, keywords):

+register(functools.partial, _reduce_partial) + +# +# Make sockets picklable +#

+if sys.platform == 'win32':

- -resource_sharer = ResourceSharer() +else:

new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/resource_sharer.py @@ -0,0 +1,158 @@ +# +# We use a background thread for sharing fds on Unix, and for sharing sockets on +# Windows. +# +# A client which wants to pickle a resource registers it with the resource +# sharer and gets an identifier in return. The unpickling process will connect +# to the resource sharer, sends the identifier and its pid, and then receives +# the resource. +# + +import os +import signal +import socket +import sys +import threading + +from . import process +from . import reduction +from . import util + +all = ['stop'] + + +if sys.platform == 'win32':

+

+

+ +else:

+

+

+ + +class _ResourceSharer(object):

+

+

+

+

+

+

+ + +_resource_sharer = _ResourceSharer() +stop = _resource_sharer.stop

new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -0,0 +1,135 @@ +# +# On Unix we run a server process which keeps track of unlinked +# semaphores. The server ignores SIGINT and SIGTERM and reads from a +# pipe. Every other process of the program has a copy of the writable +# end of the pipe, so we get EOF when all other processes have exited. +# Then the server process unlinks any remaining semaphore names. +# +# This is important because the system only supports a limited number +# of named semaphores, and they will not be automatically removed till +# the next reboot. Without this semaphore tracker process, "killall +# python" would probably leave unlinked semaphores. +# + +import errno +import os +import signal +import sys +import threading +import warnings +import _multiprocessing + +from . import spawn +from . import util +from . import current_process + +all = ['ensure_running', 'register', 'unregister'] + + +_lock = threading.Lock() + + +def ensure_running():

+

+ + +def register(name):

+ + +def unregister(name):

+ + +def _send(cmd, name):

+ + +def main(fd):

+

+

--- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -10,8 +10,11 @@ import ctypes import weakref -from multiprocessing import heap, RLock -from multiprocessing.forking import assert_spawning, ForkingPickler +from . import heap + +from .synchronize import RLock +from .reduction import ForkingPickler +from .popen import assert_spawning all = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']

new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/spawn.py @@ -0,0 +1,258 @@ +# +# Code used to start processes when using the spawn or forkserver +# start methods. +# +# multiprocessing/spawn.py +# +# Copyright (c) 2006-2008, R Oudkerk +# Licensed to PSF under a Contributor Agreement. +# + +import os +import pickle +import sys + +from . import process +from . import util +from . import popen + +all = ['_main', 'freeze_support', 'set_executable', 'get_executable',

+ +# +# _python_exe is the assumed path to the python executable. +# People embedding Python want to modify it. +# + +if sys.platform != 'win32':

+else:

+ +if WINSERVICE:

+else:

+ +def set_executable(exe):

+ +def get_executable():

+ +# +# +# + +def is_forking(argv):

+ + +def freeze_support():

+ + +def get_command_line():

+ + +def spawn_main():

+ + +def _main(fd):

+ + +def _check_not_importing_main():

+

+

+

+ + +def get_preparation_data(name):

+

+

+

+

+

+ +# +# Prepare current process +# + +old_main_modules = [] + +def prepare(data):

+

+

+

+

+

+

+

+

+

+ + +def import_main_path(main_path):

+

+

+

+

+

--- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -11,20 +11,24 @@ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' ] +import os import threading import sys - +import itertools +import tempfile import _multiprocessing -from multiprocessing.process import current_process -from multiprocessing.util import register_after_fork, debug -from multiprocessing.forking import assert_spawning, Popen + from time import time as _time +from . import popen +from . import process +from . import util +

Try to import the mp.synchronize module cleanly, if it fails

raise ImportError for platforms lacking a working sem_open implementation.

See issue 3770

try:

except (ImportError): raise ImportError("This platform lacks a functioning sem_open" + " implementation, therefore, the required" + @@ -44,15 +48,45 @@ SEM_VALUE_MAX = _multiprocessing.SemLock class SemLock(object):

+ def init(self, kind, value, maxvalue):

+

if sys.platform != 'win32': def _after_fork(obj): obj._semlock._after_fork()

+

+

def _make_methods(self): self.acquire = self._semlock.acquire @@ -65,15 +99,24 @@ class SemLock(object): return self._semlock.exit(*args) def getstate(self):

def setstate(self, state): self._semlock = _multiprocessing.SemLock._rebuild(*state)

+ #

Semaphore

# @@ -122,7 +165,7 @@ class Lock(SemLock): def repr(self): try: if self._semlock._is_mine():

@@ -147,7 +190,7 @@ class RLock(SemLock): def repr(self): try: if self._semlock._is_mine():

@@ -175,7 +218,7 @@ class Condition(object): self._make_methods() def getstate(self):

@@ -342,7 +385,7 @@ class Barrier(threading.Barrier): def init(self, parties, action=None, timeout=None): import struct

--- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -17,13 +17,13 @@ import threading # we want thread # cleanup function before multiprocessing does from subprocess import _args_from_interpreter_flags -from multiprocessing.process import current_process, active_children +from . import process all = [ 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 'log_to_stderr', 'get_temp_dir', 'register_after_fork', 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',

# @@ -71,8 +71,6 @@ def get_logger(): _logger = logging.getLogger(LOGGER_NAME) _logger.propagate = 0

# XXX multiprocessing should cleanup before logging if hasattr(atexit, 'unregister'): @@ -111,13 +109,14 @@ def log_to_stderr(level=None): def get_temp_dir(): # get name of a temp directory which will be automatically cleaned up

#

Support for reinitialization of objects when bootstrapping a child process

@@ -273,8 +272,8 @@ def is_exiting(): _exiting = False def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,

for p in active_children():

@@ -335,3 +334,54 @@ class ForkAwareLocal(threading.local): register_after_fork(self, lambda obj : obj.dict.clear()) def reduce(self): return type(self), () + +# +# Close fds except those specified +# + +try:

+except Exception:

+ +def close_all_fds_except(fds):

+ +# +# Start a program with only specified fds kept open +# + +def spawnv_passfds(path, args, passfds):

+ +# +# Return pipe with CLOEXEC set on fds +# + +def pipe():

rename from Lib/test/test_multiprocessing.py rename to Lib/test/_test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -43,7 +43,7 @@ from multiprocessing import util try: from multiprocessing import reduction

except ImportError: HAS_REDUCTION = False @@ -99,6 +99,9 @@ try: except: MAXFD = 256 +# To speed up tests when using the forkserver, we can preload these: +PRELOAD = ['main', 'test.test_multiprocessing_forkserver'] + #

Some tests require ctypes

# @@ -330,7 +333,6 @@ class _TestProcess(BaseTestCase): @classmethod def _test_recursion(cls, wconn, id):

@@ -378,7 +380,7 @@ class _TestProcess(BaseTestCase): self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) event.set() p.join()

# # @@ -2493,7 +2495,7 @@ class _TestPicklingConnections(BaseTestC @classmethod def tearDownClass(cls):

@classmethod @@ -2807,30 +2809,40 @@ class _TestFinalize(BaseTestCase):

Test that from ... import * works for each module

# -class _TestImportStar(BaseTestCase): -

+class _TestImportStar(unittest.TestCase): +

def test_import(self):

-

-

+

for name in modules: import(name) mod = sys.modules[name] -

+

@@ -2953,131 +2965,6 @@ class TestInvalidHandle(unittest.TestCas self.assertRaises((ValueError, OSError), multiprocessing.connection.Connection, -1) -# -# Functions used to create test cases from the base ones in this module -# - -def create_test_cases(Mixin, type):

-

- -# -# Create test cases -# - -class ProcessesMixin(object):

- -testcases_processes = create_test_cases(ProcessesMixin, type='processes') -globals().update(testcases_processes) - - -class ManagerMixin(object):

-

-

-

- -testcases_manager = create_test_cases(ManagerMixin, type='manager') -globals().update(testcases_manager) - - -class ThreadsMixin(object):

- -testcases_threads = create_test_cases(ThreadsMixin, type='threads') -globals().update(testcases_threads) class OtherTest(unittest.TestCase): @@ -3427,7 +3314,7 @@ class TestFlags(unittest.TestCase): def test_flags(self): import json, subprocess # start child process using unusual flags

@@ -3474,13 +3361,14 @@ class TestTimeouts(unittest.TestCase): class TestNoForkBomb(unittest.TestCase): def test_noforkbomb(self):

@@ -3514,6 +3402,72 @@ class TestForkAwareThreadLock(unittest.T self.assertLessEqual(new_size, old_size) # +# Check that non-forked child processes do not inherit unneeded fds/handles +# + +class TestCloseFds(unittest.TestCase): +

+

+

+

+

+

+ +#

Issue #17097: EINTR should be ignored by recv(), send(), accept() etc

# @@ -3557,10 +3511,10 @@ class TestIgnoreEINTR(unittest.TestCase) def handler(signum, frame): pass signal.signal(signal.SIGUSR1, handler)

@unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') def test_ignore_listener(self): @@ -3581,26 +3535,221 @@ class TestIgnoreEINTR(unittest.TestCase) finally: conn.close() -# -# -# - -def setUpModule():

+class TestStartMethod(unittest.TestCase):

- - -def tearDownModule():

- - -if name == 'main':

+

+ +# +# Check that killing process does not leak named semaphores +# + +@unittest.skipIf(sys.platform == "win32",

+class TestSemaphoreTracker(unittest.TestCase):

+ +# +# Mixins +# + +class ProcessesMixin(object):

+ + +class ManagerMixin(object):

+

+

+

+ + +class ThreadsMixin(object):

+ +# +# Functions used to create test cases from the base ones in this module +# + +def install_tests_in_module_dict(remote_globs, start_method):

+

+

+

+

+

--- a/Lib/test/mp_fork_bomb.py +++ b/Lib/test/mp_fork_bomb.py @@ -7,6 +7,11 @@ def foo():

correctly on Windows. However, we should get a RuntimeError rather

than the Windows equivalent of a fork bomb.

+if len(sys.argv) > 1:

+else:

+ p = multiprocessing.Process(target=foo) p.start() p.join()

--- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -149,7 +149,7 @@ try: except ImportError: threading = None try:

except ImportError: multiprocessing = None

new file mode 100644 --- /dev/null +++ b/Lib/test/test_multiprocessing_fork.py @@ -0,0 +1,7 @@ +import unittest +import test._test_multiprocessing + +test._test_multiprocessing.install_tests_in_module_dict(globals(), 'fork') + +if name == 'main':

new file mode 100644 --- /dev/null +++ b/Lib/test/test_multiprocessing_forkserver.py @@ -0,0 +1,7 @@ +import unittest +import test._test_multiprocessing + +test._test_multiprocessing.install_tests_in_module_dict(globals(), 'forkserver') + +if name == 'main':

new file mode 100644 --- /dev/null +++ b/Lib/test/test_multiprocessing_spawn.py @@ -0,0 +1,7 @@ +import unittest +import test._test_multiprocessing + +test._test_multiprocessing.install_tests_in_module_dict(globals(), 'spawn') + +if name == 'main':

--- a/Makefile.pre.in +++ b/Makefile.pre.in @@ -938,7 +938,9 @@ buildbottest: all platform QUICKTESTOPTS= $(TESTOPTS) -x test_subprocess test_io test_lib2to3 [](#l37.5) test_multibytecodec test_urllib2_localnet test_itertools [](#l37.6) - test_multiprocessing test_mailbox test_socket test_poll [](#l37.7) + test_multiprocessing_fork test_multiprocessing_spawn [](#l37.8) + test_multiprocessing_forkserver [](#l37.9) + test_mailbox test_socket test_poll [](#l37.10) test_select test_zipfile test_concurrent_futures quicktest: all platform (TESTRUNNER)(TESTRUNNER) (TESTRUNNER)(QUICKTESTOPTS)

--- a/Modules/_multiprocessing/multiprocessing.c +++ b/Modules/_multiprocessing/multiprocessing.c @@ -126,6 +126,7 @@ static PyMethodDef module_methods[] = { {"recv", multiprocessing_recv, METH_VARARGS, ""}, {"send", multiprocessing_send, METH_VARARGS, ""}, #endif

--- a/Modules/_multiprocessing/multiprocessing.h +++ b/Modules/_multiprocessing/multiprocessing.h @@ -98,5 +98,6 @@ PyObject *_PyMp_SetError(PyObject *Type, */ extern PyTypeObject _PyMp_SemLockType; +extern PyObject *_PyMp_sem_unlink(PyObject *ignore, PyObject args); #endif / MULTIPROCESSING_H */

--- a/Modules/_multiprocessing/semaphore.c +++ b/Modules/_multiprocessing/semaphore.c @@ -18,6 +18,7 @@ typedef struct { int count; int maxvalue; int kind;

} SemLockObject; #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid) @@ -397,7 +398,8 @@ semlock_release(SemLockObject *self, PyO */ static PyObject * -newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue) +newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue,

{ SemLockObject *self; @@ -409,21 +411,22 @@ newsemlockobject(PyTypeObject *type, SEM self->count = 0; self->last_tid = 0; self->maxvalue = maxvalue;

static PyObject * semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds) {

if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) { @@ -431,18 +434,23 @@ semlock_new(PyTypeObject *type, PyObject return NULL; }

SEM_CLEAR_ERROR();

@@ -451,6 +459,7 @@ semlock_new(PyTypeObject *type, PyObject failure: if (handle != SEM_FAILED) SEM_CLOSE(handle);

@@ -460,12 +469,30 @@ semlock_rebuild(PyTypeObject *type, PyOb { SEM_HANDLE handle; int kind, maxvalue;

+ +#ifndef MS_WINDOWS

+#endif +

} static void @@ -473,6 +500,7 @@ semlock_dealloc(SemLockObject* self) { if (self->handle != SEM_FAILED) SEM_CLOSE(self->handle);

@@ -574,6 +602,8 @@ static PyMemberDef semlock_members[] = { ""}, {"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY, ""},

+

+

+

+}