cpython: 3b82e0d83bf9 (original) (raw)
deleted file mode 100644 --- a/Doc/includes/mp_benchmarks.py +++ /dev/null @@ -1,239 +0,0 @@ -# -# Simple benchmarks for the multiprocessing package -# -# Copyright (c) 2006-2008, R Oudkerk -# All rights reserved. -# - -import time -import multiprocessing -import threading -import queue -import gc - -_timer = time.perf_counter - -delta = 1 - - -#### TEST_QUEUESPEED - -def queuespeed_func(q, c, iterations):
- -def test_queuespeed(Process, q, c):
p = Process(target=queuespeed_func, args=(q, c, iterations))[](#l1.43)
c.acquire()[](#l1.44)
p.start()[](#l1.45)
c.wait()[](#l1.46)
c.release()[](#l1.47)
result = None[](#l1.49)
t = _timer()[](#l1.50)
while result != 'STOP':[](#l1.52)
result = q.get()[](#l1.53)
elapsed = _timer() - t[](#l1.55)
p.join()[](#l1.57)
- print(iterations, 'objects passed through the queue in', elapsed, 'seconds')
- print('average number/sec:', iterations/elapsed)
- - -#### TEST_PIPESPEED - -def pipe_func(c, cond, iterations):
p = multiprocessing.Process(target=pipe_func,[](#l1.85)
args=(d, cond, iterations))[](#l1.86)
cond.acquire()[](#l1.87)
p.start()[](#l1.88)
cond.wait()[](#l1.89)
cond.release()[](#l1.90)
result = None[](#l1.92)
t = _timer()[](#l1.93)
while result != 'STOP':[](#l1.95)
result = c.recv()[](#l1.96)
elapsed = _timer() - t[](#l1.98)
p.join()[](#l1.99)
- print(iterations, 'objects passed through connection in',elapsed,'seconds')
- print('average number/sec:', iterations/elapsed)
- - -#### TEST_SEQSPEED - -def test_seqspeed(seq):
t = _timer()[](#l1.114)
for i in range(iterations):[](#l1.116)
a = seq[5][](#l1.117)
elapsed = _timer() - t[](#l1.119)
- print(iterations, 'iterations in', elapsed, 'seconds')
- print('average number/sec:', iterations/elapsed)
- - -#### TEST_LOCK - -def test_lockspeed(l):
t = _timer()[](#l1.134)
for i in range(iterations):[](#l1.136)
l.acquire()[](#l1.137)
l.release()[](#l1.138)
elapsed = _timer() - t[](#l1.140)
- print(iterations, 'iterations in', elapsed, 'seconds')
- print('average number/sec:', iterations/elapsed)
- - -#### TEST_CONDITION - -def conditionspeed_func(c, N):
- -def test_conditionspeed(Process, c):
c.acquire()[](#l1.165)
p = Process(target=conditionspeed_func, args=(c, iterations))[](#l1.166)
p.start()[](#l1.167)
c.wait()[](#l1.169)
t = _timer()[](#l1.171)
for i in range(iterations):[](#l1.173)
c.notify()[](#l1.174)
c.wait()[](#l1.175)
elapsed = _timer() - t[](#l1.177)
c.release()[](#l1.179)
p.join()[](#l1.180)
- print(iterations * 2, 'waits in', elapsed, 'seconds')
- print('average number/sec:', iterations * 2 / elapsed)
- print('\n\t######## testing Queue.Queue\n')
- test_queuespeed(threading.Thread, queue.Queue(),
threading.Condition())[](#l1.194)
- print('\n\t######## testing multiprocessing.Queue\n')
- test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
multiprocessing.Condition())[](#l1.197)
- print('\n\t######## testing Queue managed by server process\n')
- test_queuespeed(multiprocessing.Process, manager.Queue(),
manager.Condition())[](#l1.200)
- print('\n\t######## testing multiprocessing.Pipe\n')
- test_pipespeed()
- print('\n\t######## testing list\n')
- test_seqspeed(list(range(10)))
- print('\n\t######## testing list managed by server process\n')
- test_seqspeed(manager.list(list(range(10))))
- print('\n\t######## testing Array("i", ..., lock=False)\n')
- test_seqspeed(multiprocessing.Array('i', list(range(10)), lock=False))
- print('\n\t######## testing Array("i", ..., lock=True)\n')
- test_seqspeed(multiprocessing.Array('i', list(range(10)), lock=True))
- print('\n\t######## testing threading.Lock\n')
- test_lockspeed(threading.Lock())
- print('\n\t######## testing threading.RLock\n')
- test_lockspeed(threading.RLock())
- print('\n\t######## testing multiprocessing.Lock\n')
- test_lockspeed(multiprocessing.Lock())
- print('\n\t######## testing multiprocessing.RLock\n')
- test_lockspeed(multiprocessing.RLock())
- print('\n\t######## testing lock managed by server process\n')
- test_lockspeed(manager.Lock())
- print('\n\t######## testing rlock managed by server process\n')
- test_lockspeed(manager.RLock())
- print('\n\t######## testing threading.Condition\n')
- test_conditionspeed(threading.Thread, threading.Condition())
- print('\n\t######## testing multiprocessing.Condition\n')
- test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
- print('\n\t######## testing condition managed by a server process\n')
- test_conditionspeed(multiprocessing.Process, manager.Condition())
--- a/Doc/includes/mp_newtype.py
+++ b/Doc/includes/mp_newtype.py
@@ -1,11 +1,3 @@
-#
-# This module shows how to use arbitrary callables with a subclass of
-# BaseManager
.
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
@@ -27,12 +19,10 @@ def baz():
Proxy type for generator objects
class GeneratorProxy(BaseProxy):
Function to return the operator module
@@ -90,8 +80,6 @@ def test(): op = manager.operator() print('op.add(23, 45) =', op.add(23, 45)) print('op.pow(2, 94) =', op.pow(2, 94))
- print('op.getslice(range(10), 2, 6) =', op.getslice(list(range(10)), 2, 6))
- print('op.repeat(range(5), 3) =', op.repeat(list(range(5)), 3)) print('op.exposed =', op.exposed)
--- a/Doc/includes/mp_pool.py
+++ b/Doc/includes/mp_pool.py
@@ -1,10 +1,3 @@
-#
-# A test of multiprocessing.Pool
class
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
import multiprocessing
import time
import random
@@ -46,269 +39,115 @@ def noop(x):
#
def test():
- PROCESSES = 4 print('Creating pool with %d processes\n' % PROCESSES)
- TASKS = [(mul, (i, 7)) for i in range(10)] + [](#l3.34)
[(plus, (i, 8)) for i in range(10)][](#l3.35)
- results = [pool.apply_async(calculate, t) for t in TASKS]
- imap_it = pool.imap(calculatestar, TASKS)
- imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
- print('Ordered results using pool.apply_async():')
- for r in results:
print('\t', r.get())[](#l3.43)
- print()
- print('Unordered results using pool.imap_unordered():')
- for x in imap_unordered_it:
print('\t', x)[](#l3.57)
- print()
TASKS = [(mul, (i, 7)) for i in range(10)] + \[](#l3.59)
[(plus, (i, 8)) for i in range(10)][](#l3.60)
- print('Ordered results using pool.map() --- will block till complete:')
- for x in pool.map(calculatestar, TASKS):
print('\t', x)[](#l3.64)
- print()
results = [pool.apply_async(calculate, t) for t in TASKS][](#l3.73)
imap_it = pool.imap(calculatestar, TASKS)[](#l3.74)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)[](#l3.75)
- t = time.time()
- A = list(map(pow3, range(N)))
- print('\tmap(pow3, range(%d)):\n\t\t%s seconds' % [](#l3.79)
(N, time.time() - t))[](#l3.80)
print('Ordered results using pool.apply_async():')[](#l3.81)
for r in results:[](#l3.82)
print('\t', r.get())[](#l3.83)
print()[](#l3.84)
- t = time.time()
- B = pool.map(pow3, range(N))
- print('\tpool.map(pow3, range(%d)):\n\t\t%s seconds' % [](#l3.88)
(N, time.time() - t))[](#l3.89)
- t = time.time()
- C = list(pool.imap(pow3, range(N), chunksize=N//8))
- print('\tlist(pool.imap(pow3, range(%d), chunksize=%d)):\n\t\t%s' [](#l3.93)
' seconds' % (N, N//8, time.time() - t))[](#l3.94)
print('Ordered results using pool.imap():')[](#l3.98)
for x in imap_it:[](#l3.99)
print('\t', x)[](#l3.100)
print()[](#l3.101)
- t = time.time()
- A = list(map(noop, L))
- print('\tmap(noop, L):\n\t\t%s seconds' % [](#l3.109)
(time.time() - t))[](#l3.110)
- t = time.time()
- B = pool.map(noop, L)
- print('\tpool.map(noop, L):\n\t\t%s seconds' % [](#l3.114)
(time.time() - t))[](#l3.115)
print('Unordered results using pool.imap_unordered():')[](#l3.116)
for x in imap_unordered_it:[](#l3.117)
print('\t', x)[](#l3.118)
print()[](#l3.119)
- t = time.time()
- C = list(pool.imap(noop, L, chunksize=len(L)//8))
- print('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % [](#l3.123)
(len(L)//8, time.time() - t))[](#l3.124)
print('Ordered results using pool.map() --- will block till complete:')[](#l3.128)
for x in pool.map(calculatestar, TASKS):[](#l3.129)
print('\t', x)[](#l3.130)
print()[](#l3.131)
#[](#l3.140)
# Test error handling[](#l3.141)
#[](#l3.142)
- try:
print(pool.apply(f, (5,)))[](#l3.145)
- except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')[](#l3.147)
- else:
raise AssertionError('expected ZeroDivisionError')[](#l3.149)
- try:
print(pool.map(f, list(range(10))))[](#l3.152)
- except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')[](#l3.154)
- else:
raise AssertionError('expected ZeroDivisionError')[](#l3.156)
print('Testing error handling:')[](#l3.157)
- try:
print(list(pool.imap(f, list(range(10)))))[](#l3.160)
- except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')[](#l3.162)
- else:
raise AssertionError('expected ZeroDivisionError')[](#l3.164)
try:[](#l3.165)
print(pool.apply(f, (5,)))[](#l3.166)
except ZeroDivisionError:[](#l3.167)
print('\tGot ZeroDivisionError as expected from pool.apply()')[](#l3.168)
else:[](#l3.169)
raise AssertionError('expected ZeroDivisionError')[](#l3.170)
print(pool.map(f, list(range(10))))[](#l3.176) except ZeroDivisionError:[](#l3.177)
if i == 5:[](#l3.178)
pass[](#l3.179)
except StopIteration:[](#l3.180)
break[](#l3.181)
print('\tGot ZeroDivisionError as expected from pool.map()')[](#l3.182) else:[](#l3.183)
if i == 5:[](#l3.184)
raise AssertionError('expected ZeroDivisionError')[](#l3.185)
raise AssertionError('expected ZeroDivisionError')[](#l3.194)
- print('Testing ApplyResult.get() with timeout:', end=' ')
- res = pool.apply_async(calculate, TASKS[0])
- while 1:
sys.stdout.flush()[](#l3.199)
try:[](#l3.200)
sys.stdout.write('\n\t%s' % res.get(0.02))[](#l3.201)
break[](#l3.202)
except multiprocessing.TimeoutError:[](#l3.203)
sys.stdout.write('.')[](#l3.204)
- print()
- print()
- print('Testing IMapIterator.next() with timeout:', end=' ')
- it = pool.imap(calculatestar, TASKS)
- while 1:
sys.stdout.flush()[](#l3.211) try:[](#l3.212)
sys.stdout.write('\n\t%s' % it.next(0.02))[](#l3.213)
except StopIteration:[](#l3.214)
break[](#l3.215)
except multiprocessing.TimeoutError:[](#l3.216)
sys.stdout.write('.')[](#l3.217)
- print()
- print()
print(list(pool.imap(f, list(range(10)))))[](#l3.226)
except ZeroDivisionError:[](#l3.227)
print('\tGot ZeroDivisionError as expected from list(pool.imap())')[](#l3.228)
else:[](#l3.229)
raise AssertionError('expected ZeroDivisionError')[](#l3.230)
it = pool.imap(f, list(range(10)))[](#l3.240)
for i in range(10):[](#l3.241)
try:[](#l3.242)
x = next(it)[](#l3.243)
except ZeroDivisionError:[](#l3.244)
if i == 5:[](#l3.245)
pass[](#l3.246)
except StopIteration:[](#l3.247)
break[](#l3.248)
else:[](#l3.249)
if i == 5:[](#l3.250)
raise AssertionError('expected ZeroDivisionError')[](#l3.251)
- if A == B:
print('\tcallbacks succeeded\n')[](#l3.254)
- else:
print('\t*** callbacks failed\n\t\t%s != %s\n' % (A, B))[](#l3.256)
assert i == 9[](#l3.261)
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')[](#l3.262)
print()[](#l3.263)
#[](#l3.292)
# Testing timeouts[](#l3.293)
#[](#l3.294)
- pool = multiprocessing.Pool(2)
- DELTA = 0.1
- ignore = pool.apply(pow3, [2])
- results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
- pool.terminate()
- pool.join()
print('Testing ApplyResult.get() with timeout:', end=' ')[](#l3.311)
res = pool.apply_async(calculate, TASKS[0])[](#l3.312)
while 1:[](#l3.313)
sys.stdout.flush()[](#l3.314)
try:[](#l3.315)
sys.stdout.write('\n\t%s' % res.get(0.02))[](#l3.316)
break[](#l3.317)
except multiprocessing.TimeoutError:[](#l3.318)
sys.stdout.write('.')[](#l3.319)
print()[](#l3.320)
print()[](#l3.321)
- pool = multiprocessing.Pool(2)
- DELTA = 0.1
- processes = pool._pool
- ignore = pool.apply(pow3, [2])
- results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
print('Testing IMapIterator.next() with timeout:', end=' ')[](#l3.339)
it = pool.imap(calculatestar, TASKS)[](#l3.340)
while 1:[](#l3.341)
sys.stdout.flush()[](#l3.342)
try:[](#l3.343)
sys.stdout.write('\n\t%s' % it.next(0.02))[](#l3.344)
except StopIteration:[](#l3.345)
break[](#l3.346)
except multiprocessing.TimeoutError:[](#l3.347)
sys.stdout.write('.')[](#l3.348)
print()[](#l3.349)
print()[](#l3.350)
if name == 'main': multiprocessing.freeze_support() -
- if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print(' Using processes '.center(79, '-'))[](#l3.359)
- elif sys.argv[1] == 'threads':
print(' Using threads '.center(79, '-'))[](#l3.361)
import multiprocessing.dummy as multiprocessing[](#l3.362)
- else:
print('Usage:\n\t%s [processes | threads]' % sys.argv[0])[](#l3.364)
raise SystemExit(2)[](#l3.365)
deleted file mode 100644
--- a/Doc/includes/mp_synchronize.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#
-# A test file for the multiprocessing
package
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-import time
-import sys
-import random
-from queue import Empty
-
-import multiprocessing # may get overwritten
-
-
-#### TEST_VALUE
-
-def value_func(running, mutex):
- mutex.acquire()
- print('\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished')
- running.value -= 1
- mutex.release()
- for i in range(TASKS):
p = multiprocessing.Process(target=value_func, args=(running, mutex))[](#l4.37)
p.start()[](#l4.38)
- while running.value > 0:
time.sleep(0.08)[](#l4.41)
mutex.acquire()[](#l4.42)
print(running.value, end=' ')[](#l4.43)
sys.stdout.flush()[](#l4.44)
mutex.release()[](#l4.45)
- - -#### TEST_QUEUE - -def queue_func(queue):
- for i in range(30):
time.sleep(0.5 * random.random())[](#l4.55)
queue.put(i*i)[](#l4.56)
- queue.put('STOP')
- o = None
- while o != 'STOP':
try:[](#l4.67)
o = q.get(timeout=0.3)[](#l4.68)
print(o, end=' ')[](#l4.69)
sys.stdout.flush()[](#l4.70)
except Empty:[](#l4.71)
print('TIMEOUT')[](#l4.72)
- - -#### TEST_CONDITION - -def condition_func(cond):
- cond.acquire()
- print('\t' + str(cond))
- time.sleep(2)
- print('\tchild is notifying')
- print('\t' + str(cond))
- cond.notify()
- cond.release()
- - -#### TEST_SEMAPHORE - -def semaphore_func(sema, mutex, running):
- mutex.acquire()
- running.value -= 1
- print('%s has finished' % multiprocessing.current_process())
- mutex.release()
- sema = multiprocessing.Semaphore(3)
- mutex = multiprocessing.RLock()
- running = multiprocessing.Value('i', 0)
- processes = [
multiprocessing.Process(target=semaphore_func,[](#l4.140)
args=(sema, mutex, running))[](#l4.141)
for i in range(10)[](#l4.142)
][](#l4.143)
- - -#### TEST_JOIN_TIMEOUT - -def join_timeout_func():
- while 1:
p.join(timeout=1)[](#l4.166)
if not p.is_alive():[](#l4.167)
break[](#l4.168)
print('.', end=' ')[](#l4.169)
sys.stdout.flush()[](#l4.170)
- - -#### TEST_EVENT - -def event_func(event):
- print('\t%r is waiting' % multiprocessing.current_process())
- event.wait()
- print('\t%r has woken up' % multiprocessing.current_process())
- processes = [multiprocessing.Process(target=event_func, args=(event,))
for i in range(5)][](#l4.184)
- - -#### TEST_SHAREDVALUES - -def sharedvalues_func(values, arrays, shared_values, shared_arrays):
- for i in range(len(values)):
v = values[i][1][](#l4.203)
sv = shared_values[i].value[](#l4.204)
assert v == sv[](#l4.205)
- for i in range(len(values)):
a = arrays[i][1][](#l4.208)
sa = list(shared_arrays[i][:])[](#l4.209)
assert a == sa[](#l4.210)
- values = [
('i', 10),[](#l4.216)
('h', -2),[](#l4.217)
('d', 1.25)[](#l4.218)
][](#l4.219)
- arrays = [
('i', list(range(100))),[](#l4.221)
('d', [0.25 * i for i in range(100)]),[](#l4.222)
('H', list(range(1000)))[](#l4.223)
][](#l4.224)
- shared_values = [multiprocessing.Value(id, v) for id, v in values]
- shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
- p = multiprocessing.Process(
target=sharedvalues_func,[](#l4.230)
args=(values, arrays, shared_values, shared_arrays)[](#l4.231)
)[](#l4.232)
- p.start()
- p.join()
- - -#### - -def test(namespace=multiprocessing):
- for func in [test_value, test_queue, test_condition,
test_semaphore, test_join_timeout, test_event,[](#l4.247)
test_sharedvalues]:[](#l4.248)
print('\n\t######## %s\n' % func.__name__)[](#l4.250)
func()[](#l4.251)
- ignore = multiprocessing.active_children() # cleanup any old processes
- if hasattr(multiprocessing, '_debug_info'):
info = multiprocessing._debug_info()[](#l4.255)
if info:[](#l4.256)
print(info)[](#l4.257)
raise ValueError('there should be no positive refcounts left')[](#l4.258)
- if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print(' Using processes '.center(79, '-'))[](#l4.267)
namespace = multiprocessing[](#l4.268)
- elif sys.argv[1] == 'manager':
print(' Using processes and a manager '.center(79, '-'))[](#l4.270)
namespace = multiprocessing.Manager()[](#l4.271)
namespace.Process = multiprocessing.Process[](#l4.272)
namespace.current_process = multiprocessing.current_process[](#l4.273)
namespace.active_children = multiprocessing.active_children[](#l4.274)
- elif sys.argv[1] == 'threads':
print(' Using threads '.center(79, '-'))[](#l4.276)
import multiprocessing.dummy as namespace[](#l4.277)
- else:
print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0])[](#l4.279)
raise SystemExit(2)[](#l4.280)
deleted file mode 100644
--- a/Doc/includes/mp_webserver.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Example where a pool of http servers share a single listening socket
-#
-# On Windows this module depends on the ability to pickle a socket
-# object so that the worker processes can inherit a copy of the server
-# object. (We import multiprocessing.reduction
to enable this pickling.)
-#
-# Not sure if we should synchronize access to socket.accept()
method by
-# using a process-shared lock -- does not seem to be necessary.
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-import os
-import sys
-
-from multiprocessing import Process, current_process, freeze_support
-from http.server import HTTPServer
-from http.server import SimpleHTTPRequestHandler
-
-if sys.platform == 'win32':
- - -class RequestHandler(SimpleHTTPRequestHandler):
we override log_message() to show which process is handling the request
- def log_message(self, format, *args):
note(format, *args)[](#l5.37)
- note('starting server')
- try:
server.serve_forever()[](#l5.42)
- except KeyboardInterrupt:
pass[](#l5.44)
- - -def runpool(address, number_of_processes):
create a single server object -- children will each inherit a copy
- server = HTTPServer(address, RequestHandler)
create child processes to act as workers
- for i in range(number_of_processes - 1):
Process(target=serve_forever, args=(server,)).start()[](#l5.53)
- DIR = os.path.join(os.path.dirname(file), '..')
- ADDRESS = ('localhost', 8000)
- NUMBER_OF_PROCESSES = 4
- print('Serving at http://%s:%d using %d worker processes' % [](#l5.64)
(ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES))[](#l5.65)
- print('To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32'])
--- a/Doc/includes/mp_workers.py
+++ b/Doc/includes/mp_workers.py
@@ -1,16 +1,3 @@
-#
-# Simple example which uses a pool of workers to carry out some tasks.
-#
-# Notice that the results will probably not come out of the output
-# queue in the same in the same order as the corresponding tasks were
-# put on the input queue. If it is important to get the results back
-# in the original order then consider using Pool.map()
or
-# Pool.imap()
(which will save on the amount of code needed anyway).
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
import time
import random
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -93,11 +93,80 @@ To show the individual process IDs invol
p.start()
p.join()
-For an explanation of why (on Windows) the if __name__ == '__main__'
part is
+For an explanation of why the if __name__ == '__main__'
part is
necessary, see :ref:multiprocessing-programming
.
+Start methods
+~~~~~~~~~~~~~
+
+Depending on the platform, :mod:multiprocessing
supports three ways
+to start a process. These start methods are
+
- spawn
- The parent process starts a fresh python interpreter process. The
- child process will only inherit those resources necessary to run
- the process objects :meth:
~Process.run
method. In particular, - unnecessary file descriptors and handles from the parent process
- will not be inherited. Starting a process using this method is
- rather slow compared to using fork or forkserver.
- fork
- The parent process uses :func:
os.fork
to fork the Python - interpreter. The child process, when it begins, is effectively
- identical to the parent process. All resources of the parent are
- inherited by the child process. Note that safely forking a
- multithreaded process is problematic.
- forkserver
- When the program starts and selects the forkserver start method,
- a server process is started. From then on, whenever a new process
- is need the parent process connects to the server and requests
- that it fork a new process. The fork server process is single
- threaded so it is safe for it to use :func:
os.fork
. No - unnecessary resources are inherited.
+
+Before Python 3.4 fork was the only option available on Unix. Also,
+prior to Python 3.4, child processes would inherit all the parents
+inheritable handles on Windows.
+
+On Unix using the spawn or forkserver start methods will also
+start a semaphore tracker process which tracks the unlinked named
+semaphores created by processes of the program. When all processes
+have exited the semaphore tracker unlinks any remaining semaphores.
+Usually there should be none, but if a process was killed by a signal
+there may some "leaked" semaphores. (Unlinking the named semaphores
+is a serious matter since the system allows only a limited number, and
+they will not be automatically unlinked until the next reboot.)
+
+To select the a start method you use the :func:set_start_method
in
+the if __name__ == '__main__'
clause of the main module. For
+example::
+
import multiprocessing as mp[](#l7.66)
def foo():[](#l7.68)
print('hello')[](#l7.69)
if __name__ == '__main__':[](#l7.71)
mp.set_start_method('spawn')[](#l7.72)
p = mp.Process(target=foo)[](#l7.73)
p.start()[](#l7.74)
p.join()[](#l7.75)
+
+:func:set_start_method
should not be used more than once in the
+program.
+
+
+
Exchanging objects between processes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -274,15 +343,31 @@ processes in a few different ways. For example:: from multiprocessing import Pool
with Pool(processes=4) as pool: # start 4 worker processes[](#l7.95)
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously[](#l7.96)
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow[](#l7.97)
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"[](#l7.98)
# start 4 worker processes[](#l7.99)
with Pool(processes=4) as pool:[](#l7.100)
# print "[0, 1, 4,..., 81]"[](#l7.102)
print(pool.map(f, range(10)))[](#l7.103)
# print same numbers in arbitrary order[](#l7.105)
for i in pool.imap_unordered(f, range(10)):[](#l7.106)
print(i)[](#l7.107)
# evaluate "f(10)" asynchronously[](#l7.109)
res = pool.apply_async(f, [10])[](#l7.110)
print(res.get(timeout=1)) # prints "100"[](#l7.111)
# make worker sleep for 10 secs[](#l7.113)
res = pool.apply_async(sleep, 10)[](#l7.114)
print(res.get(timeout=1)) # raises multiprocessing.TimeoutError[](#l7.115)
# exiting the 'with'-block has stopped the pool[](#l7.117)
Note that the methods of a pool should only ever be used by the
process which created it.
@@ -763,6 +848,24 @@ Miscellaneous
If the module is being run normally by the Python interpreter then
:func:freeze_support
has no effect.
+.. function:: get_all_start_methods()
+
- Returns a list of the supported start methods, the first of which
- is the default. The possible start methods are
'fork'
, 'spawn'
and'forkserver'
. On Windows only'spawn'
is- available. On Unix
'fork'
and'spawn'
are always - supported, with
'fork'
being the default. + - .. versionadded:: 3.4 +
+.. function:: get_start_method() +
- Return the current start method. This can be
'fork'
, 'spawn'
or'forkserver'
.'fork'
is the default on- Unix, while
'spawn'
is the default on Windows. + - .. versionadded:: 3.4 +
.. function:: set_executable() Sets the path of the Python interpreter to use when starting a child process. @@ -771,8 +874,21 @@ Miscellaneous set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
- before they can create child processes. +
- .. versionchanged:: 3.4
Now supported on Unix when the ``'spawn'`` start method is used.[](#l7.155)
+ +.. function:: set_start_method(method) +
- Set the method which should be used to start child processes.
- method can be
'fork'
,'spawn'
or'forkserver'
. + - Note that this should be called at most once, and it should be
- protected inside the
if __name__ == '__main__'
clause of the - main module. +
- .. versionadded:: 3.4
.. note::
@@ -2175,43 +2291,8 @@ Below is an example session with logging
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
-In addition to having these two logging functions, the multiprocessing also
-exposes two additional logging level attributes. These are :const:SUBWARNING
-and :const:SUBDEBUG
. The table below illustrates where theses fit in the
-normal level hierarchy.
-
-+----------------+----------------+
-| Level | Numeric value |
-+================+================+
-| SUBWARNING
| 25 |
-+----------------+----------------+
-| SUBDEBUG
| 5 |
-+----------------+----------------+
-
For a full table of logging levels, see the :mod:logging
module.
-These additional logging levels are used primarily for certain debug messages
-within the multiprocessing module. Below is the same example as above, except
-with :const:SUBDEBUG
enabled::
-
- [WARNING/MainProcess] doomed
- [INFO/SyncManager-...] child process calling self.run()
- [INFO/SyncManager-...] created temp directory /.../pymp-...
- [INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
- [SUBDEBUG/MainProcess] finalizer calling ...
- [INFO/MainProcess] sending shutdown message to manager
- [DEBUG/SyncManager-...] manager received shutdown message
- [SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
- [SUBDEBUG/SyncManager-...] finalizer calling ...
- [SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
- [SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
- [INFO/SyncManager-...] manager exiting with exitcode 0
The :mod:multiprocessing.dummy
module
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -2232,8 +2313,10 @@ There are certain guidelines and idioms
:mod:multiprocessing
.
-All platforms
-~~~~~~~~~~~~~
+All start methods
+~~~~~~~~~~~~~~~~~
+
+The following applies to all start methods.
Avoid shared state
@@ -2266,11 +2349,13 @@ Joining zombie processes
Better to inherit than pickle/unpickle
- On Windows many types from :mod:
multiprocessing
need to be picklable so - that child processes can use them. However, one should generally avoid
- sending shared objects to other processes using pipes or queues. Instead
- you should arrange the program so that a process which needs access to a
- shared resource created elsewhere can inherit it from an ancestor process.
- When using the spawn or forkserver start methods many types
- from :mod:
multiprocessing
need to be picklable so that child - processes can use them. However, one should generally avoid
- sending shared objects to other processes using pipes or queues.
- Instead you should arrange the program so that a process which
- needs access to a shared resource created elsewhere can inherit it
- from an ancestor process.
Avoid terminating processes @@ -2314,15 +2399,17 @@ Joining processes that use queues Explicitly pass resources to child processes
- On Unix a child process can make use of a shared resource created in a
- parent process using a global resource. However, it is better to pass the
- object as an argument to the constructor for the child process.
- Apart from making the code (potentially) compatible with Windows this also
- ensures that as long as the child process is still alive the object will not
- be garbage collected in the parent process. This might be important if some
- resource is freed when the object is garbage collected in the parent
- process.
- On Unix using the fork start method, a child process can make
- use of a shared resource created in a parent process using a
- global resource. However, it is better to pass the object as an
- argument to the constructor for the child process.
- Apart from making the code (potentially) compatible with Windows
- and the other start methods this also ensures that as long as the
- child process is still alive the object will not be garbage
- collected in the parent process. This might be important if some
- resource is freed when the object is garbage collected in the
- parent process.
So for instance ::
@@ -2381,17 +2468,19 @@ Beware of replacing :data:sys.stdin
wi
For more information, see :issue:5155
, :issue:5313
and :issue:5331
-Windows
-~~~~~~~
-
-Since Windows lacks :func:os.fork
it has a few extra restrictions:
+The spawn and forkserver start methods
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+There are a few extra restriction which don't apply to the fork
+start method.
More picklability
- Ensure that all arguments to :meth:
Process.__init__
are picklable. This - means, in particular, that bound or unbound methods cannot be used directly
- as the
target
argument on Windows --- just define a function and use - that instead.
- Ensure that all arguments to :meth:
Process.__init__
are - picklable. This means, in particular, that bound or unbound
- methods cannot be used directly as the
target
(unless you use - the fork start method) --- just define a function and use that
- instead.
Also, if you subclass :class:Process
then make sure that instances will be
picklable when the :meth:Process.start
method is called.
@@ -2411,7 +2500,8 @@ Safe importing of main module
interpreter without causing unintended side effects (such a starting a new
process).
- For example, using the spawn or forkserver start method
- running the following module would fail with a
:exc:
RuntimeError
:: from multiprocessing import Process
@@ -2425,13 +2515,14 @@ Safe importing of main module
Instead one should protect the "entry point" of the program by using if[](#l7.312) __name__ == '__main__':
as follows::
from multiprocessing import Process, freeze_support[](#l7.315)
from multiprocessing import Process, freeze_support, set_start_method[](#l7.316)
def foo(): print('hello') if name == 'main': freeze_support()
set_start_method('spawn')[](#l7.323) p = Process(target=foo)[](#l7.324) p.start()[](#l7.325)
@@ -2462,26 +2553,7 @@ Using :class:Pool
:
:language: python3
-Synchronization types like locks, conditions and queues:
-
-.. literalinclude:: ../includes/mp_synchronize.py
-
An example showing how to use queues to feed tasks to a collection of worker
processes and collect the results:
.. literalinclude:: ../includes/mp_workers.py
-
-
-An example of how a pool of worker processes can each run a
-:class:~http.server.SimpleHTTPRequestHandler
instance while sharing a single
-listening socket.
-
-.. literalinclude:: ../includes/mp_webserver.py
-
-
-Some simple benchmarks comparing :mod:multiprocessing
with :mod:threading
:
-
-.. literalinclude:: ../includes/mp_benchmarks.py
-
--- a/Doc/whatsnew/3.4.rst +++ b/Doc/whatsnew/3.4.rst @@ -108,6 +108,8 @@ Significantly Improved Library Modules:
- Single-dispatch generic functions (:pep:
443
) - SHA-3 (Keccak) support for :mod:
hashlib
. - TLSv1.1 and TLSv1.2 support for :mod:
ssl
. +* :mod:multiprocessing
now has option to avoid using :func:os.fork
- on Unix (:issue:
8713
). Security improvements: @@ -254,6 +256,17 @@ mmap objects can now be weakref'ed. (Contributed by Valerie Lambert in :issue:4885
.) +multiprocessing +--------------- + +On Unix two new start methods have been added for starting processes +using :mod:multiprocessing
. These make the mixing of processes with +threads more robust. See :issue:8713
. + +Also, except when using the old fork start method, child processes +will no longer inherit unneeded handles/file descriptors from their parents. + + poplib ------
--- a/Lib/multiprocessing/init.py +++ b/Lib/multiprocessing/init.py @@ -21,6 +21,8 @@ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool', 'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
- 'set_executable', 'set_start_method', 'get_start_method',
- 'get_all_start_methods', 'set_forkserver_preload' ]
# @@ -30,8 +32,14 @@ import os import sys -from multiprocessing.process import Process, current_process, active_children -from multiprocessing.util import SUBDEBUG, SUBWARNING +from .process import Process, current_process, active_children + +# +# XXX These should not really be documented or public. +# + +SUBDEBUG = 5 +SUBWARNING = 25 #
Alias for main module -- will be reset by bootstrapping child processes
@@ -56,8 +64,6 @@ class TimeoutError(ProcessError): class AuthenticationError(ProcessError): pass -import _multiprocessing - #
Definitions not depending on native semaphores
#
@@ -69,7 +75,7 @@ def Manager():
The managers methods such as Lock()
, Condition()
and Queue()
can be used to create shared objects.
'''
- from .managers import SyncManager m = SyncManager() m.start() return m @@ -78,7 +84,7 @@ def Pipe(duplex=True): ''' Returns two connection object connected by a pipe '''
- from .connection import Pipe return Pipe(duplex) def cpu_count(): @@ -97,21 +103,21 @@ def freeze_support(): If so then run code specified by commandline and exit. ''' if sys.platform == 'win32' and getattr(sys, 'frozen', False):
from multiprocessing.forking import freeze_support[](#l9.60)
from .spawn import freeze_support[](#l9.61) freeze_support()[](#l9.62)
def get_logger(): ''' Return package logger -- if it does not already exist then it is created '''
- from .util import get_logger return get_logger() def log_to_stderr(level=None): ''' Turn on logging and add a handler which prints to stderr '''
- from .util import log_to_stderr
return log_to_stderr(level)
def allow_connection_pickling():
@@ -120,7 +126,7 @@ def allow_connection_pickling():
'''
This is undocumented. In previous versions of multiprocessing
its only effect was to make socket objects inheritable on Windows.
Definitions depending on native semaphores
@@ -130,120 +136,151 @@ def Lock(): ''' Returns a non-recursive lock object '''
- from .synchronize import RLock return RLock() def Condition(lock=None): ''' Returns a condition object '''
- from .synchronize import Condition return Condition(lock) def Semaphore(value=1): ''' Returns a semaphore object '''
- from .synchronize import Semaphore return Semaphore(value) def BoundedSemaphore(value=1): ''' Returns a bounded semaphore object '''
- from .synchronize import BoundedSemaphore return BoundedSemaphore(value) def Event(): ''' Returns an event object '''
- from .synchronize import Event return Event() def Barrier(parties, action=None, timeout=None): ''' Returns a barrier object '''
- from .synchronize import Barrier return Barrier(parties, action, timeout) def Queue(maxsize=0): ''' Returns a queue object '''
- from .queues import Queue return Queue(maxsize) def JoinableQueue(maxsize=0): ''' Returns a queue object '''
- from .queues import JoinableQueue return JoinableQueue(maxsize) def SimpleQueue(): ''' Returns a queue object '''
- from .queues import SimpleQueue return SimpleQueue() def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None): ''' Returns a process pool object '''
- from .pool import Pool return Pool(processes, initializer, initargs, maxtasksperchild) def RawValue(typecode_or_type, *args): ''' Returns a shared object '''
- from .sharedctypes import RawValue return RawValue(typecode_or_type, *args) def RawArray(typecode_or_type, size_or_initializer): ''' Returns a shared array '''
- from .sharedctypes import RawArray return RawArray(typecode_or_type, size_or_initializer) def Value(typecode_or_type, *args, lock=True): ''' Returns a synchronized shared object '''
- from .sharedctypes import Value return Value(typecode_or_type, *args, lock=lock) def Array(typecode_or_type, size_or_initializer, *, lock=True): ''' Returns a synchronized shared array '''
# # # -if sys.platform == 'win32': +def set_executable(executable):
- '''
- Sets the path to a python.exe or pythonw.exe binary used to run
- child processes instead of sys.executable when using the 'spawn'
- start method. Useful for people embedding Python.
- '''
- from .spawn import set_executable
- set_executable(executable)
+ +def set_start_method(method):
- '''
- Set method for starting processes: 'fork', 'spawn' or 'forkserver'.
- '''
- from .popen import set_start_method
- set_start_method(method)
- def set_executable(executable):
'''[](#l9.232)
Sets the path to a python.exe or pythonw.exe binary used to run[](#l9.233)
child processes on Windows instead of sys.executable.[](#l9.234)
Useful for people embedding Python.[](#l9.235)
'''[](#l9.236)
from multiprocessing.forking import set_executable[](#l9.237)
set_executable(executable)[](#l9.238)
- '''
- Get method for starting processes: 'fork', 'spawn' or 'forkserver'.
- '''
- from .popen import get_start_method
- return get_start_method()
+ +def get_all_start_methods():
- '''
- Get list of availables start methods, default first.
- '''
- from .popen import get_all_start_methods
- return get_all_start_methods()
+def set_forkserver_preload(module_names):
- '''
- Set list of module names to try to load in the forkserver process
- when it is started. Properly chosen this can significantly reduce
- the cost of starting a new process using the forkserver method.
- The default list is ['main'].
- '''
- try:
from .forkserver import set_forkserver_preload[](#l9.262)
- except ImportError:
pass[](#l9.264)
- else:
set_forkserver_preload(module_names)[](#l9.266)
--- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -21,9 +21,13 @@ import tempfile import itertools import _multiprocessing -from multiprocessing import current_process, AuthenticationError, BufferTooShort -from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug -from multiprocessing.forking import ForkingPickler + +from . import reduction +from . import util + +from . import AuthenticationError, BufferTooShort +from .reduction import ForkingPickler + try: import _winapi from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE @@ -71,7 +75,7 @@ def arbitrary_address(family): if family == 'AF_INET': return ('localhost', 0) elif family == 'AF_UNIX':
return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())[](#l10.24)
elif family == 'AF_PIPE': return tempfile.mktemp(prefix=r'\.\pipe\pyc-%d-%d-' % (os.getpid(), next(_mmap_counter)))return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())[](#l10.25)
@@ -505,7 +509,7 @@ if sys.platform != 'win32': c1 = Connection(s1.detach()) c2 = Connection(s2.detach()) else:
fd1, fd2 = os.pipe()[](#l10.33)
fd1, fd2 = util.pipe()[](#l10.34) c1 = Connection(fd1, writable=False)[](#l10.35) c2 = Connection(fd2, readable=False)[](#l10.36)
@@ -577,7 +581,7 @@ class SocketListener(object): self._last_accepted = None if family == 'AF_UNIX':
self._unlink = Finalize([](#l10.42)
self._unlink = util.Finalize([](#l10.43) self, os.unlink, args=(address,), exitpriority=0[](#l10.44) )[](#l10.45) else:[](#l10.46)
@@ -625,8 +629,8 @@ if sys.platform == 'win32': self._handle_queue = [self._new_handle(first=True)] self._last_accepted = None
sub_debug('listener created with address=%r', self._address)[](#l10.51)
self.close = Finalize([](#l10.52)
util.sub_debug('listener created with address=%r', self._address)[](#l10.53)
self.close = util.Finalize([](#l10.54) self, PipeListener._finalize_pipe_listener,[](#l10.55) args=(self._handle_queue, self._address), exitpriority=0[](#l10.56) )[](#l10.57)
@@ -668,7 +672,7 @@ if sys.platform == 'win32': @staticmethod def _finalize_pipe_listener(queue, address):
sub_debug('closing listener with address=%r', address)[](#l10.62)
util.sub_debug('closing listener with address=%r', address)[](#l10.63) for handle in queue:[](#l10.64) _winapi.CloseHandle(handle)[](#l10.65)
@@ -919,15 +923,32 @@ else: # if sys.platform == 'win32':
- from . import reduction
- ForkingPickler.register(socket.socket, reduction.reduce_socket)
- ForkingPickler.register(Connection, reduction.reduce_connection)
- ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection)
- def reduce_connection(conn):
handle = conn.fileno()[](#l10.76)
with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:[](#l10.77)
from . import resource_sharer[](#l10.78)
ds = resource_sharer.DupSocket(s)[](#l10.79)
return rebuild_connection, (ds, conn.readable, conn.writable)[](#l10.80)
- def rebuild_connection(ds, readable, writable):
sock = ds.detach()[](#l10.82)
return Connection(sock.detach(), readable, writable)[](#l10.83)
- reduction.register(Connection, reduce_connection)
- def reduce_pipe_connection(conn):
access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |[](#l10.87)
(_winapi.FILE_GENERIC_WRITE if conn.writable else 0))[](#l10.88)
dh = reduction.DupHandle(conn.fileno(), access)[](#l10.89)
return rebuild_pipe_connection, (dh, conn.readable, conn.writable)[](#l10.90)
- def rebuild_pipe_connection(dh, readable, writable):
handle = dh.detach()[](#l10.92)
return PipeConnection(handle, readable, writable)[](#l10.93)
- reduction.register(PipeConnection, reduce_pipe_connection)
- try:
from . import reduction[](#l10.98)
- except ImportError:
pass[](#l10.100)
- else:
ForkingPickler.register(socket.socket, reduction.reduce_socket)[](#l10.102)
ForkingPickler.register(Connection, reduction.reduce_connection)[](#l10.103)
- def reduce_connection(conn):
df = reduction.DupFd(conn.fileno())[](#l10.105)
return rebuild_connection, (df, conn.readable, conn.writable)[](#l10.106)
- def rebuild_connection(df, readable, writable):
fd = df.detach()[](#l10.108)
return Connection(fd, readable, writable)[](#l10.109)
- reduction.register(Connection, reduce_connection)
--- a/Lib/multiprocessing/dummy/init.py +++ b/Lib/multiprocessing/dummy/init.py @@ -22,7 +22,7 @@ import sys import weakref import array -from multiprocessing.dummy.connection import Pipe +from .connection import Pipe from threading import Lock, RLock, Semaphore, BoundedSemaphore from threading import Event, Condition, Barrier from queue import Queue @@ -113,7 +113,7 @@ def shutdown(): pass def Pool(processes=None, initializer=None, initargs=()):
- from ..pool import ThreadPool return ThreadPool(processes, initializer, initargs) JoinableQueue = Queue
deleted file mode 100644 --- a/Lib/multiprocessing/forking.py +++ /dev/null @@ -1,477 +0,0 @@ -# -# Module for starting a process object using os.fork() or CreateProcess() -# -# multiprocessing/forking.py -# -# Copyright (c) 2006-2008, R Oudkerk -# Licensed to PSF under a Contributor Agreement. -# - -import io -import os -import pickle -import sys -import signal -import errno - -from multiprocessing import util, process - -all = ['Popen', 'assert_spawning', 'duplicate', 'close', 'ForkingPickler'] - -# -# Check that the current thread is spawning a child process -# - -def assert_spawning(self):
- if not Popen.thread_is_spawning():
raise RuntimeError([](#l12.31)
'%s objects should only be shared between processes'[](#l12.32)
' through inheritance' % type(self).__name__[](#l12.33)
)[](#l12.34)
- -# -# Try making some callable types picklable -# - -from pickle import Pickler -from copyreg import dispatch_table - -class ForkingPickler(Pickler):
- _extra_reducers = {}
- def init(self, *args):
Pickler.__init__(self, *args)[](#l12.46)
self.dispatch_table = dispatch_table.copy()[](#l12.47)
self.dispatch_table.update(self._extra_reducers)[](#l12.48)
- @classmethod
- def register(cls, type, reduce):
cls._extra_reducers[type] = reduce[](#l12.51)
- @staticmethod
- def dumps(obj):
buf = io.BytesIO()[](#l12.55)
ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)[](#l12.56)
return buf.getbuffer()[](#l12.57)
- if m.self is None:
return getattr, (m.__class__, m.__func__.__name__)[](#l12.64)
- else:
return getattr, (m.__self__, m.__func__.__name__)[](#l12.66)
-ForkingPickler.register(type(_C().f), _reduce_method) - - -def _reduce_method_descriptor(m):
-ForkingPickler.register(type(list.append), _reduce_method_descriptor) -ForkingPickler.register(type(int.add), _reduce_method_descriptor) - -try:
- def _reduce_partial(p):
return _rebuild_partial, (p.func, p.args, p.keywords or {})[](#l12.84)
- def _rebuild_partial(func, args, keywords):
return partial(func, *args, **keywords)[](#l12.86)
- ForkingPickler.register(partial, _reduce_partial)
- -# -# Unix -# - -if sys.platform != 'win32':
- #
We define a Popen class similar to the one from subprocess, but
whose constructor takes a process object as its argument.
- #
def __init__(self, process_obj):[](#l12.104)
sys.stdout.flush()[](#l12.105)
sys.stderr.flush()[](#l12.106)
self.returncode = None[](#l12.107)
r, w = os.pipe()[](#l12.109)
self.sentinel = r[](#l12.110)
self.pid = os.fork()[](#l12.112)
if self.pid == 0:[](#l12.113)
os.close(r)[](#l12.114)
if 'random' in sys.modules:[](#l12.115)
import random[](#l12.116)
random.seed()[](#l12.117)
code = process_obj._bootstrap()[](#l12.118)
os._exit(code)[](#l12.119)
# `w` will be closed when the child exits, at which point `r`[](#l12.121)
# will become ready for reading (using e.g. select()).[](#l12.122)
os.close(w)[](#l12.123)
util.Finalize(self, os.close, (r,))[](#l12.124)
def poll(self, flag=os.WNOHANG):[](#l12.126)
if self.returncode is None:[](#l12.127)
while True:[](#l12.128)
try:[](#l12.129)
pid, sts = os.waitpid(self.pid, flag)[](#l12.130)
except OSError as e:[](#l12.131)
if e.errno == errno.EINTR:[](#l12.132)
continue[](#l12.133)
# Child process not yet created. See #1731717[](#l12.134)
# e.errno == errno.ECHILD == 10[](#l12.135)
return None[](#l12.136)
else:[](#l12.137)
break[](#l12.138)
if pid == self.pid:[](#l12.139)
if os.WIFSIGNALED(sts):[](#l12.140)
self.returncode = -os.WTERMSIG(sts)[](#l12.141)
else:[](#l12.142)
assert os.WIFEXITED(sts)[](#l12.143)
self.returncode = os.WEXITSTATUS(sts)[](#l12.144)
return self.returncode[](#l12.145)
def wait(self, timeout=None):[](#l12.147)
if self.returncode is None:[](#l12.148)
if timeout is not None:[](#l12.149)
from .connection import wait[](#l12.150)
if not wait([self.sentinel], timeout):[](#l12.151)
return None[](#l12.152)
# This shouldn't block if wait() returned successfully.[](#l12.153)
return self.poll(os.WNOHANG if timeout == 0.0 else 0)[](#l12.154)
return self.returncode[](#l12.155)
def terminate(self):[](#l12.157)
if self.returncode is None:[](#l12.158)
try:[](#l12.159)
os.kill(self.pid, signal.SIGTERM)[](#l12.160)
except OSError:[](#l12.161)
if self.wait(timeout=0.1) is None:[](#l12.162)
raise[](#l12.163)
@staticmethod[](#l12.165)
def thread_is_spawning():[](#l12.166)
return False[](#l12.167)
- TERMINATE = 0x10000
- WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
- WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
- #
_python_exe is the assumed path to the python executable.
People embedding Python want to modify it.
- #
- if WINSERVICE:
_python_exe = os.path.join(sys.exec_prefix, 'python.exe')[](#l12.199)
- else:
_python_exe = sys.executable[](#l12.201)
- def duplicate(handle, target_process=None, inheritable=False):
if target_process is None:[](#l12.212)
target_process = _winapi.GetCurrentProcess()[](#l12.213)
return _winapi.DuplicateHandle([](#l12.214)
_winapi.GetCurrentProcess(), handle, target_process,[](#l12.215)
0, inheritable, _winapi.DUPLICATE_SAME_ACCESS[](#l12.216)
)[](#l12.217)
- #
We define a Popen class similar to the one from subprocess, but
whose constructor takes a process object as its argument.
- #
- class Popen(object):
'''[](#l12.225)
Start a subprocess to run the code of a process object[](#l12.226)
'''[](#l12.227)
_tls = _thread._local()[](#l12.228)
def __init__(self, process_obj):[](#l12.230)
cmd = ' '.join('"%s"' % x for x in get_command_line())[](#l12.231)
prep_data = get_preparation_data(process_obj._name)[](#l12.232)
# create pipe for communication with child[](#l12.234)
rfd, wfd = os.pipe()[](#l12.235)
# get handle for read end of the pipe and make it inheritable[](#l12.237)
rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)[](#l12.238)
os.close(rfd)[](#l12.239)
with open(wfd, 'wb', closefd=True) as to_child:[](#l12.241)
# start process[](#l12.242)
try:[](#l12.243)
hp, ht, pid, tid = _winapi.CreateProcess([](#l12.244)
_python_exe, cmd + (' %s' % rhandle),[](#l12.245)
None, None, 1, 0, None, None, None[](#l12.246)
)[](#l12.247)
_winapi.CloseHandle(ht)[](#l12.248)
finally:[](#l12.249)
close(rhandle)[](#l12.250)
# set attributes of self[](#l12.252)
self.pid = pid[](#l12.253)
self.returncode = None[](#l12.254)
self._handle = hp[](#l12.255)
self.sentinel = int(hp)[](#l12.256)
util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))[](#l12.257)
# send information to child[](#l12.259)
Popen._tls.process_handle = int(hp)[](#l12.260)
try:[](#l12.261)
dump(prep_data, to_child, HIGHEST_PROTOCOL)[](#l12.262)
dump(process_obj, to_child, HIGHEST_PROTOCOL)[](#l12.263)
finally:[](#l12.264)
del Popen._tls.process_handle[](#l12.265)
@staticmethod[](#l12.267)
def thread_is_spawning():[](#l12.268)
return getattr(Popen._tls, 'process_handle', None) is not None[](#l12.269)
@staticmethod[](#l12.271)
def duplicate_for_child(handle):[](#l12.272)
return duplicate(handle, Popen._tls.process_handle)[](#l12.273)
def wait(self, timeout=None):[](#l12.275)
if self.returncode is None:[](#l12.276)
if timeout is None:[](#l12.277)
msecs = _winapi.INFINITE[](#l12.278)
else:[](#l12.279)
msecs = max(0, int(timeout * 1000 + 0.5))[](#l12.280)
res = _winapi.WaitForSingleObject(int(self._handle), msecs)[](#l12.282)
if res == _winapi.WAIT_OBJECT_0:[](#l12.283)
code = _winapi.GetExitCodeProcess(self._handle)[](#l12.284)
if code == TERMINATE:[](#l12.285)
code = -signal.SIGTERM[](#l12.286)
self.returncode = code[](#l12.287)
return self.returncode[](#l12.289)
def poll(self):[](#l12.291)
return self.wait(timeout=0)[](#l12.292)
def terminate(self):[](#l12.294)
if self.returncode is None:[](#l12.295)
try:[](#l12.296)
_winapi.TerminateProcess(int(self._handle), TERMINATE)[](#l12.297)
except OSError:[](#l12.298)
if self.wait(timeout=1.0) is None:[](#l12.299)
raise[](#l12.300)
- def is_forking(argv):
'''[](#l12.307)
Return whether commandline indicates we are forking[](#l12.308)
'''[](#l12.309)
if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':[](#l12.310)
assert len(argv) == 3[](#l12.311)
return True[](#l12.312)
else:[](#l12.313)
return False[](#l12.314)
- def freeze_support():
'''[](#l12.318)
Run code for process object if this in not the main process[](#l12.319)
'''[](#l12.320)
if is_forking(sys.argv):[](#l12.321)
main()[](#l12.322)
sys.exit()[](#l12.323)
- def get_command_line():
'''[](#l12.327)
Returns prefix of command line used for spawning a child process[](#l12.328)
'''[](#l12.329)
if getattr(process.current_process(), '_inheriting', False):[](#l12.330)
raise RuntimeError('''[](#l12.331)
Attempt to start a new process before the current process[](#l12.332)
has finished its bootstrapping phase.[](#l12.333)
This probably means that you are on Windows and you have[](#l12.335)
forgotten to use the proper idiom in the main module:[](#l12.336)
if __name__ == '__main__':[](#l12.338)
freeze_support()[](#l12.339)
...[](#l12.340)
The "freeze_support()" line can be omitted if the program[](#l12.342)
is not going to be frozen to produce a Windows executable.''')[](#l12.343)
if getattr(sys, 'frozen', False):[](#l12.345)
return [sys.executable, '--multiprocessing-fork'][](#l12.346)
else:[](#l12.347)
prog = 'from multiprocessing.forking import main; main()'[](#l12.348)
opts = util._args_from_interpreter_flags()[](#l12.349)
return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork'][](#l12.350)
- def main():
'''[](#l12.354)
Run code specifed by data received over pipe[](#l12.355)
'''[](#l12.356)
assert is_forking(sys.argv)[](#l12.357)
handle = int(sys.argv[-1])[](#l12.359)
fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)[](#l12.360)
from_parent = os.fdopen(fd, 'rb')[](#l12.361)
process.current_process()._inheriting = True[](#l12.363)
preparation_data = load(from_parent)[](#l12.364)
prepare(preparation_data)[](#l12.365)
self = load(from_parent)[](#l12.366)
process.current_process()._inheriting = False[](#l12.367)
from_parent.close()[](#l12.369)
exitcode = self._bootstrap()[](#l12.371)
sys.exit(exitcode)[](#l12.372)
- def get_preparation_data(name):
'''[](#l12.376)
Return info about parent needed by child to unpickle process object[](#l12.377)
'''[](#l12.378)
from .util import _logger, _log_to_stderr[](#l12.379)
d = dict([](#l12.381)
name=name,[](#l12.382)
sys_path=sys.path,[](#l12.383)
sys_argv=sys.argv,[](#l12.384)
log_to_stderr=_log_to_stderr,[](#l12.385)
orig_dir=process.ORIGINAL_DIR,[](#l12.386)
authkey=process.current_process().authkey,[](#l12.387)
)[](#l12.388)
if _logger is not None:[](#l12.390)
d['log_level'] = _logger.getEffectiveLevel()[](#l12.391)
if not WINEXE and not WINSERVICE:[](#l12.393)
main_path = getattr(sys.modules['__main__'], '__file__', None)[](#l12.394)
if not main_path and sys.argv[0] not in ('', '-c'):[](#l12.395)
main_path = sys.argv[0][](#l12.396)
if main_path is not None:[](#l12.397)
if not os.path.isabs(main_path) and \[](#l12.398)
process.ORIGINAL_DIR is not None:[](#l12.399)
main_path = os.path.join(process.ORIGINAL_DIR, main_path)[](#l12.400)
d['main_path'] = os.path.normpath(main_path)[](#l12.401)
return d[](#l12.403)
- -# -# Prepare current process -# - -old_main_modules = [] - -def prepare(data):
- '''
- Try to get current process ready to unpickle process object
- '''
- old_main_modules.append(sys.modules['main'])
- if 'main_path' in data:
# XXX (ncoghlan): The following code makes several bogus[](#l12.442)
# assumptions regarding the relationship between __file__[](#l12.443)
# and a module's real name. See PEP 302 and issue #10845[](#l12.444)
main_path = data['main_path'][](#l12.445)
main_name = os.path.splitext(os.path.basename(main_path))[0][](#l12.446)
if main_name == '__init__':[](#l12.447)
main_name = os.path.basename(os.path.dirname(main_path))[](#l12.448)
if main_name == '__main__':[](#l12.450)
main_module = sys.modules['__main__'][](#l12.451)
main_module.__file__ = main_path[](#l12.452)
elif main_name != 'ipython':[](#l12.453)
# Main modules not actually called __main__.py may[](#l12.454)
# contain additional code that should still be executed[](#l12.455)
import importlib[](#l12.456)
import types[](#l12.457)
if main_path is None:[](#l12.459)
dirs = None[](#l12.460)
elif os.path.basename(main_path).startswith('__init__.py'):[](#l12.461)
dirs = [os.path.dirname(os.path.dirname(main_path))][](#l12.462)
else:[](#l12.463)
dirs = [os.path.dirname(main_path)][](#l12.464)
assert main_name not in sys.modules, main_name[](#l12.466)
sys.modules.pop('__mp_main__', None)[](#l12.467)
# We should not try to load __main__[](#l12.468)
# since that would execute 'if __name__ == "__main__"'[](#l12.469)
# clauses, potentially causing a psuedo fork bomb.[](#l12.470)
loader = importlib.find_loader(main_name, path=dirs)[](#l12.471)
main_module = types.ModuleType(main_name)[](#l12.472)
try:[](#l12.473)
loader.init_module_attrs(main_module)[](#l12.474)
except AttributeError: # init_module_attrs is optional[](#l12.475)
pass[](#l12.476)
main_module.__name__ = '__mp_main__'[](#l12.477)
code = loader.get_code(main_name)[](#l12.478)
exec(code, main_module.__dict__)[](#l12.479)
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module[](#l12.481)
new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/forkserver.py @@ -0,0 +1,238 @@ +import errno +import os +import select +import signal +import socket +import struct +import sys +import threading + +from . import connection +from . import process +from . import reduction +from . import spawn +from . import util + +all = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
'set_forkserver_preload'][](#l13.21)
+ +# +# +# + +MAXFDS_TO_SEND = 256 +UNSIGNED_STRUCT = struct.Struct('Q') # large enough for pid_t + +_inherited_fds = None +_lock = threading.Lock() +_preload_modules = ['main'] + + +# +# Public function +# + +def set_forkserver_preload(modules_names):
- '''Set list of module names to try to load in forkserver process.'''
- global _preload_modules
- _preload_modules = modules_names
+ + +def connect_to_new_process(fds):
- Returns a pair of fds (status_r, data_w). The calling process can read
- the child process's pid and (eventually) its returncode from status_r.
- The calling process should write to data_w the pickled preparation and
- process data.
- '''
- if len(fds) + 3 >= MAXFDS_TO_SEND:
raise ValueError('too many fds')[](#l13.62)
- address, alive_w = process.current_process()._config['forkserver_info']
- with socket.socket(socket.AF_UNIX) as client:
client.connect(address)[](#l13.65)
parent_r, child_w = util.pipe()[](#l13.66)
child_r, parent_w = util.pipe()[](#l13.67)
allfds = [child_r, child_w, alive_w][](#l13.68)
allfds += fds[](#l13.69)
try:[](#l13.70)
reduction.sendfds(client, allfds)[](#l13.71)
return parent_r, parent_w[](#l13.72)
except:[](#l13.73)
os.close(parent_r)[](#l13.74)
os.close(parent_w)[](#l13.75)
raise[](#l13.76)
finally:[](#l13.77)
os.close(child_r)[](#l13.78)
os.close(child_w)[](#l13.79)
- This can be called from any process. Note that usually a child
- process will just reuse the forkserver started by its parent, so
- ensure_running() will do nothing.
- '''
- with _lock:
config = process.current_process()._config[](#l13.90)
if config.get('forkserver_info') is not None:[](#l13.91)
return[](#l13.92)
assert all(type(mod) is str for mod in _preload_modules)[](#l13.94)
semaphore_tracker_fd = config['semaphore_tracker_fd'][](#l13.95)
cmd = ('from multiprocessing.forkserver import main; ' +[](#l13.96)
'main(%d, %d, %r, **%r)')[](#l13.97)
if _preload_modules:[](#l13.99)
desired_keys = {'main_path', 'sys_path'}[](#l13.100)
data = spawn.get_preparation_data('ignore')[](#l13.101)
data = dict((x,y) for (x,y) in data.items() if x in desired_keys)[](#l13.102)
else:[](#l13.103)
data = {}[](#l13.104)
with socket.socket(socket.AF_UNIX) as listener:[](#l13.106)
address = connection.arbitrary_address('AF_UNIX')[](#l13.107)
listener.bind(address)[](#l13.108)
os.chmod(address, 0o600)[](#l13.109)
listener.listen(100)[](#l13.110)
# all client processes own the write end of the "alive" pipe;[](#l13.112)
# when they all terminate the read end becomes ready.[](#l13.113)
alive_r, alive_w = os.pipe()[](#l13.114)
config['forkserver_info'] = (address, alive_w)[](#l13.115)
fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd][](#l13.116)
cmd %= (listener.fileno(), alive_r, _preload_modules, data)[](#l13.117)
exe = spawn.get_executable()[](#l13.118)
args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd][](#l13.119)
pid = util.spawnv_passfds(exe, args, fds_to_pass)[](#l13.120)
+ + +def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
- '''Run forkserver.'''
- if preload:
if '__main__' in preload and main_path is not None:[](#l13.126)
process.current_process()._inheriting = True[](#l13.127)
try:[](#l13.128)
spawn.import_main_path(main_path)[](#l13.129)
finally:[](#l13.130)
del process.current_process()._inheriting[](#l13.131)
for modname in preload:[](#l13.132)
try:[](#l13.133)
__import__(modname)[](#l13.134)
except ImportError:[](#l13.135)
pass[](#l13.136)
close sys.stdin
- if sys.stdin is not None:
try:[](#l13.140)
sys.stdin.close()[](#l13.141)
sys.stdin = open(os.devnull)[](#l13.142)
except (OSError, ValueError):[](#l13.143)
pass[](#l13.144)
ignoring SIGCHLD means no need to reap zombie processes
- handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
- with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener:
readers = [listener, alive_r][](#l13.149)
while True:[](#l13.151)
try:[](#l13.152)
rfds, wfds, xfds = select.select(readers, [], [])[](#l13.153)
if alive_r in rfds:[](#l13.155)
# EOF because no more client processes left[](#l13.156)
assert os.read(alive_r, 1) == b''[](#l13.157)
raise SystemExit[](#l13.158)
assert listener in rfds[](#l13.160)
with listener.accept()[0] as s:[](#l13.161)
code = 1[](#l13.162)
if os.fork() == 0:[](#l13.163)
try:[](#l13.164)
_serve_one(s, listener, alive_r, handler)[](#l13.165)
except Exception:[](#l13.166)
sys.excepthook(*sys.exc_info())[](#l13.167)
sys.stderr.flush()[](#l13.168)
finally:[](#l13.169)
os._exit(code)[](#l13.170)
except InterruptedError:[](#l13.172)
pass[](#l13.173)
except OSError as e:[](#l13.174)
if e.errno != errno.ECONNABORTED:[](#l13.175)
raise[](#l13.176)
+ +# +# Code to bootstrap new process +# + +def _serve_one(s, listener, alive_r, handler):
close unnecessary stuff and reset SIGCHLD handler
- listener.close()
- os.close(alive_r)
- signal.signal(signal.SIGCHLD, handler)
receive fds from parent process
- fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
- s.close()
- assert len(fds) <= MAXFDS_TO_SEND
- child_r, child_w, alive_w, *_inherited_fds = fds
reseed random number generator
- if 'random' in sys.modules:
import random[](#l13.201)
random.seed()[](#l13.202)
+ +# +# Read and write unsigned numbers +# + +def read_unsigned(fd):
- data = b''
- length = UNSIGNED_STRUCT.size
- while len(data) < length:
while True:[](#l13.218)
try:[](#l13.219)
s = os.read(fd, length - len(data))[](#l13.220)
except InterruptedError:[](#l13.221)
pass[](#l13.222)
else:[](#l13.223)
break[](#l13.224)
if not s:[](#l13.225)
raise EOFError('unexpected EOF')[](#l13.226)
data += s[](#l13.227)
- return UNSIGNED_STRUCT.unpack(data)[0]
- msg = UNSIGNED_STRUCT.pack(n)
- while msg:
while True:[](#l13.233)
try:[](#l13.234)
nbytes = os.write(fd, msg)[](#l13.235)
except InterruptedError:[](#l13.236)
pass[](#l13.237)
else:[](#l13.238)
break[](#l13.239)
if nbytes == 0:[](#l13.240)
raise RuntimeError('should not get here')[](#l13.241)
msg = msg[nbytes:][](#l13.242)
--- a/Lib/multiprocessing/heap.py +++ b/Lib/multiprocessing/heap.py @@ -8,15 +8,17 @@ # import bisect +import itertools import mmap import os import sys +import tempfile import threading -import itertools +import _multiprocessing -import _multiprocessing -from multiprocessing.util import Finalize, info -from multiprocessing.forking import assert_spawning +from . import popen +from . import reduction +from . import util all = ['BufferWrapper'] @@ -30,17 +32,25 @@ if sys.platform == 'win32': class Arena(object):
_counter = itertools.count()[](#l14.29)
_rand = tempfile._RandomNameSequence()[](#l14.30)
def init(self, size): self.size = size
self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter))[](#l14.34)
self.buffer = mmap.mmap(-1, self.size, tagname=self.name)[](#l14.35)
assert _winapi.GetLastError() == 0, 'tagname already in use'[](#l14.36)
for i in range(100):[](#l14.37)
name = 'pym-%d-%s' % (os.getpid(), next(self._rand))[](#l14.38)
buf = mmap.mmap(-1, size, tagname=name)[](#l14.39)
if _winapi.GetLastError() == 0:[](#l14.40)
break[](#l14.41)
# We have reopened a preexisting mmap.[](#l14.42)
buf.close()[](#l14.43)
else:[](#l14.44)
raise FileExistsError('Cannot find name for new mmap')[](#l14.45)
self.name = name[](#l14.46)
self.buffer = buf[](#l14.47) self._state = (self.size, self.name)[](#l14.48)
assert_spawning(self)[](#l14.51)
popen.assert_spawning(self)[](#l14.52) return self._state[](#l14.53)
def setstate(self, state): @@ -52,10 +62,28 @@ else: class Arena(object):
def __init__(self, size):[](#l14.60)
self.buffer = mmap.mmap(-1, size)[](#l14.61)
def __init__(self, size, fd=-1):[](#l14.62) self.size = size[](#l14.63)
self.name = None[](#l14.64)
self.fd = fd[](#l14.65)
if fd == -1:[](#l14.66)
self.fd, name = tempfile.mkstemp([](#l14.67)
prefix='pym-%d-'%os.getpid(), dir=util.get_temp_dir())[](#l14.68)
os.unlink(name)[](#l14.69)
util.Finalize(self, os.close, (self.fd,))[](#l14.70)
with open(self.fd, 'wb', closefd=False) as f:[](#l14.71)
f.write(b'\0'*size)[](#l14.72)
self.buffer = mmap.mmap(self.fd, self.size)[](#l14.73)
- def reduce_arena(a):
if a.fd == -1:[](#l14.76)
raise ValueError('Arena is unpicklable because '[](#l14.77)
'forking was enabled when it was created')[](#l14.78)
return rebuild_arena, (a.size, reduction.DupFd(a.fd))[](#l14.79)
Class allowing allocation of chunks of memory from arenas
@@ -90,7 +118,7 @@ class Heap(object): if i == len(self._lengths): length = self._roundup(max(self._size, size), mmap.PAGESIZE) self._size *= 2
info('allocating a new mmap of length %d', length)[](#l14.92)
util.info('allocating a new mmap of length %d', length)[](#l14.93) arena = Arena(length)[](#l14.94) self._arenas.append(arena)[](#l14.95) return (arena, 0, length)[](#l14.96)
@@ -216,7 +244,7 @@ class BufferWrapper(object): assert 0 <= size < sys.maxsize block = BufferWrapper._heap.malloc(size) self._state = (block, size)
Finalize(self, BufferWrapper._heap.free, args=(block,))[](#l14.101)
util.Finalize(self, BufferWrapper._heap.free, args=(block,))[](#l14.102)
def create_memoryview(self): (arena, start, stop), size = self._state
--- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -19,11 +19,15 @@ import threading import array import queue +from time import time as _time from traceback import format_exc -from multiprocessing import Process, current_process, active_children, Pool, util, connection -from multiprocessing.process import AuthenticationString -from multiprocessing.forking import Popen, ForkingPickler -from time import time as _time + +from . import connection +from . import pool +from . import process +from . import popen +from . import reduction +from . import util #
Register some things for pickling
@@ -31,16 +35,14 @@ from time import time as _time def reduce_array(a): return array.array, (a.typecode, a.tobytes()) -ForkingPickler.register(array.array, reduce_array) +reduction.register(array.array, reduce_array) view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] if view_types[0] is not list: # only needed in Py3.0 def rebuild_as_list(obj): return list, (list(obj),) for view_type in view_types:
ForkingPickler.register(view_type, rebuild_as_list)[](#l15.35)
import copyreg[](#l15.36)
copyreg.pickle(view_type, rebuild_as_list)[](#l15.37)
reduction.register(view_type, rebuild_as_list)[](#l15.38)
Type for identifying shared objects
@@ -130,7 +132,7 @@ class Server(object): def init(self, registry, address, authkey, serializer): assert isinstance(authkey, bytes) self.registry = registry
self.authkey = AuthenticationString(authkey)[](#l15.46)
self.authkey = process.AuthenticationString(authkey)[](#l15.47) Listener, Client = listener_client[serializer][](#l15.48)
# do authentication later @@ -146,7 +148,7 @@ class Server(object): Run the server forever ''' self.stop_event = threading.Event()
current_process()._manager_server = self[](#l15.55)
process.current_process()._manager_server = self[](#l15.56) try:[](#l15.57) accepter = threading.Thread(target=self.accepter)[](#l15.58) accepter.daemon = True[](#l15.59)
@@ -438,9 +440,9 @@ class BaseManager(object): def init(self, address=None, authkey=None, serializer='pickle'): if authkey is None:
authkey = current_process().authkey[](#l15.64)
authkey = process.current_process().authkey[](#l15.65) self._address = address # XXX not final address if eg ('', 0)[](#l15.66)
self._authkey = AuthenticationString(authkey)[](#l15.67)
self._authkey = process.AuthenticationString(authkey)[](#l15.68) self._state = State()[](#l15.69) self._state.value = State.INITIAL[](#l15.70) self._serializer = serializer[](#l15.71)
@@ -476,7 +478,7 @@ class BaseManager(object): reader, writer = connection.Pipe(duplex=False) # spawn process which runs a server
self._process = Process([](#l15.76)
self._process = process.Process([](#l15.77) target=type(self)._run_server,[](#l15.78) args=(self._registry, self._address, self._authkey,[](#l15.79) self._serializer, writer, initializer, initargs),[](#l15.80)
@@ -691,11 +693,11 @@ class BaseProxy(object): self._Client = listener_client[serializer][1] if authkey is not None:
self._authkey = AuthenticationString(authkey)[](#l15.85)
self._authkey = process.AuthenticationString(authkey)[](#l15.86) elif self._manager is not None:[](#l15.87) self._authkey = self._manager._authkey[](#l15.88) else:[](#l15.89)
self._authkey = current_process().authkey[](#l15.90)
self._authkey = process.current_process().authkey[](#l15.91)
if incref: self._incref() @@ -704,7 +706,7 @@ class BaseProxy(object): def _connect(self): util.debug('making connection to manager')
name = current_process().name[](#l15.99)
name = process.current_process().name[](#l15.100) if threading.current_thread().name != 'MainThread':[](#l15.101) name += '|' + threading.current_thread().name[](#l15.102) conn = self._Client(self._token.address, authkey=self._authkey)[](#l15.103)
@@ -798,7 +800,7 @@ class BaseProxy(object): def reduce(self): kwds = {}
if Popen.thread_is_spawning():[](#l15.108)
if popen.get_spawning_popen() is not None:[](#l15.109) kwds['authkey'] = self._authkey[](#l15.110)
if getattr(self, '_isauto', False): @@ -835,14 +837,14 @@ def RebuildProxy(func, token, serializer If possible the shared object is returned, or otherwise a proxy for it. '''
if server and server.address == token.address: return server.id_to_obj[token.id][0] else: incref = ( kwds.pop('incref', True) and
not getattr(current_process(), '_inheriting', False)[](#l15.125)
not getattr(process.current_process(), '_inheriting', False)[](#l15.126) )[](#l15.127) return func(token, serializer, incref=incref, **kwds)[](#l15.128)
@@ -889,7 +891,7 @@ def AutoProxy(token, serializer, manager if authkey is None and manager is not None: authkey = manager._authkey if authkey is None:
authkey = current_process().authkey[](#l15.134)
authkey = process.current_process().authkey[](#l15.135)
ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, @@ -1109,7 +1111,7 @@ SyncManager.register('BoundedSemaphore', AcquirerProxy) SyncManager.register('Condition', threading.Condition, ConditionProxy) SyncManager.register('Barrier', threading.Barrier, BarrierProxy) -SyncManager.register('Pool', Pool, PoolProxy) +SyncManager.register('Pool', pool.Pool, PoolProxy) SyncManager.register('list', list, ListProxy) SyncManager.register('dict', dict, DictProxy) SyncManager.register('Value', Value, ValueProxy)
--- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -7,7 +7,7 @@
Licensed to PSF under a Contributor Agreement.
# -all = ['Pool'] +all = ['Pool', 'ThreadPool'] #
Imports
@@ -21,8 +21,10 @@ import os import time import traceback -from multiprocessing import Process, TimeoutError -from multiprocessing.util import Finalize, debug +# If threading is available then ThreadPool should be provided. Therefore +# we avoid top-level imports which are liable to fail on some systems. +from . import util +from . import Process, cpu_count, TimeoutError, SimpleQueue #
Constants representing the state of a pool
@@ -104,11 +106,11 @@ def worker(inqueue, outqueue, initialize try: task = get() except (EOFError, OSError):
debug('worker got EOFError or OSError -- exiting')[](#l16.29)
util.debug('worker got EOFError or OSError -- exiting')[](#l16.30) break[](#l16.31)
debug('worker got sentinel -- exiting')[](#l16.34)
util.debug('worker got sentinel -- exiting')[](#l16.35) break[](#l16.36)
job, i, func, args, kwds = task @@ -121,11 +123,11 @@ def worker(inqueue, outqueue, initialize put((job, i, result)) except Exception as e: wrapped = MaybeEncodingError(e, result[1])
debug("Possible encoding error while sending result: %s" % ([](#l16.43)
util.debug("Possible encoding error while sending result: %s" % ([](#l16.44) wrapped))[](#l16.45) put((job, i, (False, wrapped)))[](#l16.46) completed += 1[](#l16.47)
Class representing a process pool
@@ -184,7 +186,7 @@ class Pool(object): self._result_handler._state = RUN self._result_handler.start()
self._terminate = Finalize([](#l16.57)
self._terminate = util.Finalize([](#l16.58) self, self._terminate_pool,[](#l16.59) args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,[](#l16.60) self._worker_handler, self._task_handler,[](#l16.61)
@@ -201,7 +203,7 @@ class Pool(object): worker = self._pool[i] if worker.exitcode is not None: # worker exited
debug('cleaning up worker %d' % i)[](#l16.66)
util.debug('cleaning up worker %d' % i)[](#l16.67) worker.join()[](#l16.68) cleaned = True[](#l16.69) del self._pool[i][](#l16.70)
@@ -221,7 +223,7 @@ class Pool(object): w.name = w.name.replace('Process', 'PoolWorker') w.daemon = True w.start()
debug('added worker')[](#l16.75)
util.debug('added worker')[](#l16.76)
def _maintain_pool(self): """Clean up any exited workers and start replacements for them. @@ -230,7 +232,6 @@ class Pool(object): self._repopulate_pool() def _setup_queues(self):
from .queues import SimpleQueue[](#l16.84) self._inqueue = SimpleQueue()[](#l16.85) self._outqueue = SimpleQueue()[](#l16.86) self._quick_put = self._inqueue._writer.send[](#l16.87)
@@ -358,7 +359,7 @@ class Pool(object): time.sleep(0.1) # send sentinel to stop workers pool._taskqueue.put(None)
debug('worker handler exiting')[](#l16.92)
util.debug('worker handler exiting')[](#l16.93)
@staticmethod def _handle_tasks(taskqueue, put, outqueue, pool): @@ -368,36 +369,36 @@ class Pool(object): i = -1 for i, task in enumerate(taskseq): if thread._state:
debug('task handler found thread._state != RUN')[](#l16.101)
util.debug('task handler found thread._state != RUN')[](#l16.102) break[](#l16.103) try:[](#l16.104) put(task)[](#l16.105) except OSError:[](#l16.106)
debug('could not put task on queue')[](#l16.107)
util.debug('could not put task on queue')[](#l16.108) break[](#l16.109) else:[](#l16.110) if set_length:[](#l16.111)
debug('doing set_length()')[](#l16.112)
util.debug('doing set_length()')[](#l16.113) set_length(i+1)[](#l16.114) continue[](#l16.115) break[](#l16.116) else:[](#l16.117)
debug('task handler got sentinel')[](#l16.118)
util.debug('task handler got sentinel')[](#l16.119)
try: # tell result handler to finish when cache is empty
debug('task handler sending sentinel to result handler')[](#l16.124)
util.debug('task handler sending sentinel to result handler')[](#l16.125) outqueue.put(None)[](#l16.126)
# tell workers there is no more work
debug('task handler sending sentinel to workers')[](#l16.129)
util.debug('task handler sending sentinel to workers')[](#l16.130) for p in pool:[](#l16.131) put(None)[](#l16.132) except OSError:[](#l16.133)
debug('task handler got OSError when sending sentinels')[](#l16.134)
util.debug('task handler got OSError when sending sentinels')[](#l16.135)
debug('task handler exiting')[](#l16.137)
util.debug('task handler exiting')[](#l16.138)
@staticmethod def _handle_results(outqueue, get, cache): @@ -407,16 +408,16 @@ class Pool(object): try: task = get() except (OSError, EOFError):
debug('result handler got EOFError/OSError -- exiting')[](#l16.146)
util.debug('result handler got EOFError/OSError -- exiting')[](#l16.147) return[](#l16.148)
if thread._state: assert thread._state == TERMINATE
debug('result handler found thread._state=TERMINATE')[](#l16.152)
util.debug('result handler found thread._state=TERMINATE')[](#l16.153) break[](#l16.154)
debug('result handler got sentinel')[](#l16.157)
util.debug('result handler got sentinel')[](#l16.158) break[](#l16.159)
job, i, obj = task @@ -429,11 +430,11 @@ class Pool(object): try: task = get() except (OSError, EOFError):
debug('result handler got EOFError/OSError -- exiting')[](#l16.166)
util.debug('result handler got EOFError/OSError -- exiting')[](#l16.167) return[](#l16.168)
debug('result handler ignoring extra sentinel')[](#l16.171)
util.debug('result handler ignoring extra sentinel')[](#l16.172) continue[](#l16.173) job, i, obj = task[](#l16.174) try:[](#l16.175)
@@ -442,7 +443,7 @@ class Pool(object): pass if hasattr(outqueue, '_reader'):
debug('ensuring that outqueue is not full')[](#l16.180)
util.debug('ensuring that outqueue is not full')[](#l16.181) # If we don't make room available in outqueue then[](#l16.182) # attempts to add the sentinel (None) to outqueue may[](#l16.183) # block. There is guaranteed to be no more than 2 sentinels.[](#l16.184)
@@ -454,7 +455,7 @@ class Pool(object): except (OSError, EOFError): pass
debug('result handler exiting: len(cache)=%s, thread._state=%s',[](#l16.189)
util.debug('result handler exiting: len(cache)=%s, thread._state=%s',[](#l16.190) len(cache), thread._state)[](#l16.191)
@staticmethod @@ -472,19 +473,19 @@ class Pool(object): ) def close(self):
debug('closing pool')[](#l16.198)
util.debug('closing pool')[](#l16.199) if self._state == RUN:[](#l16.200) self._state = CLOSE[](#l16.201) self._worker_handler._state = CLOSE[](#l16.202)
debug('terminating pool')[](#l16.205)
util.debug('terminating pool')[](#l16.206) self._state = TERMINATE[](#l16.207) self._worker_handler._state = TERMINATE[](#l16.208) self._terminate()[](#l16.209)
debug('joining pool')[](#l16.212)
util.debug('joining pool')[](#l16.213) assert self._state in (CLOSE, TERMINATE)[](#l16.214) self._worker_handler.join()[](#l16.215) self._task_handler.join()[](#l16.216)
@@ -495,7 +496,7 @@ class Pool(object): @staticmethod def _help_stuff_finish(inqueue, task_handler, size): # task_handler may be blocked trying to put items on inqueue
debug('removing tasks from inqueue until task handler finished')[](#l16.221)
util.debug('removing tasks from inqueue until task handler finished')[](#l16.222) inqueue._rlock.acquire()[](#l16.223) while task_handler.is_alive() and inqueue._reader.poll():[](#l16.224) inqueue._reader.recv()[](#l16.225)
@@ -505,12 +506,12 @@ class Pool(object): def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, worker_handler, task_handler, result_handler, cache): # this is guaranteed to only be called once
debug('finalizing pool')[](#l16.230)
util.debug('finalizing pool')[](#l16.231)
worker_handler._state = TERMINATE task_handler._state = TERMINATE
debug('helping task handler/workers to finish')[](#l16.236)
util.debug('helping task handler/workers to finish')[](#l16.237) cls._help_stuff_finish(inqueue, task_handler, len(pool))[](#l16.238)
assert result_handler.is_alive() or len(cache) == 0 @@ -520,31 +521,31 @@ class Pool(object): # We must wait for the worker handler to exit before terminating # workers because we don't want workers to be restarted behind our back.
debug('joining worker handler')[](#l16.245)
util.debug('joining worker handler')[](#l16.246) if threading.current_thread() is not worker_handler:[](#l16.247) worker_handler.join()[](#l16.248)
# Terminate workers which haven't already finished. if pool and hasattr(pool[0], 'terminate'):
debug('terminating workers')[](#l16.252)
util.debug('terminating workers')[](#l16.253) for p in pool:[](#l16.254) if p.exitcode is None:[](#l16.255) p.terminate()[](#l16.256)
debug('joining task handler')[](#l16.258)
util.debug('joining task handler')[](#l16.259) if threading.current_thread() is not task_handler:[](#l16.260) task_handler.join()[](#l16.261)
debug('joining result handler')[](#l16.263)
util.debug('joining result handler')[](#l16.264) if threading.current_thread() is not result_handler:[](#l16.265) result_handler.join()[](#l16.266)
if pool and hasattr(pool[0], 'terminate'):
debug('joining pool workers')[](#l16.269)
util.debug('joining pool workers')[](#l16.270) for p in pool:[](#l16.271) if p.is_alive():[](#l16.272) # worker has not yet exited[](#l16.273)
debug('cleaning up worker %d' % p.pid)[](#l16.274)
util.debug('cleaning up worker %d' % p.pid)[](#l16.275) p.join()[](#l16.276)
def enter(self): @@ -730,7 +731,10 @@ class IMapUnorderedIterator(IMapIterator class ThreadPool(Pool):
- @staticmethod
- def Process(*args, **kwds):
from .dummy import Process[](#l16.286)
return Process(*args, **kwds)[](#l16.287)
def init(self, processes=None, initializer=None, initargs=()): Pool.init(self, processes, initializer, initargs)
new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/popen.py @@ -0,0 +1,78 @@ +import sys +import threading + +all = ['Popen', 'get_spawning_popen', 'set_spawning_popen',
'assert_spawning'][](#l17.9)
+ +# +# Check that the current thread is spawning a child process +# + +_tls = threading.local() + +def get_spawning_popen():
+ +def set_spawning_popen(popen):
- if get_spawning_popen() is None:
raise RuntimeError([](#l17.25)
'%s objects should only be shared between processes'[](#l17.26)
' through inheritance' % type(obj).__name__[](#l17.27)
)[](#l17.28)
+ +# +# +# + +_Popen = None + +def Popen(process_obj):
+ +def set_start_method(meth=None, *, start_helpers=True):
- global _Popen
- try:
modname = _method_to_module[meth][](#l17.49)
__import__(modname)[](#l17.50)
- except (KeyError, ImportError):
raise ValueError('could not use start method %r' % meth)[](#l17.52)
- module = sys.modules[modname]
- if start_helpers:
module.Popen.ensure_helpers_running()[](#l17.55)
- _Popen = module.Popen
+ + +if sys.platform == 'win32': +
- _method_to_module = {
None: 'multiprocessing.popen_spawn_win32',[](#l17.62)
'spawn': 'multiprocessing.popen_spawn_win32',[](#l17.63)
}[](#l17.64)
- _method_to_module = {
None: 'multiprocessing.popen_fork',[](#l17.71)
'fork': 'multiprocessing.popen_fork',[](#l17.72)
'spawn': 'multiprocessing.popen_spawn_posix',[](#l17.73)
'forkserver': 'multiprocessing.popen_forkserver',[](#l17.74)
}[](#l17.75)
- def get_all_start_methods():
from . import reduction[](#l17.78)
if reduction.HAVE_SEND_HANDLE:[](#l17.79)
return ['fork', 'spawn', 'forkserver'][](#l17.80)
else:[](#l17.81)
return ['fork', 'spawn'][](#l17.82)
new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/popen_fork.py @@ -0,0 +1,87 @@ +import os +import sys +import signal +import errno + +from . import util + +all = ['Popen'] + +# +# Start child process using fork +# + +class Popen(object):
- def init(self, process_obj):
sys.stdout.flush()[](#l18.22)
sys.stderr.flush()[](#l18.23)
self.returncode = None[](#l18.24)
self._launch(process_obj)[](#l18.25)
- def poll(self, flag=os.WNOHANG):
if self.returncode is None:[](#l18.31)
while True:[](#l18.32)
try:[](#l18.33)
pid, sts = os.waitpid(self.pid, flag)[](#l18.34)
except OSError as e:[](#l18.35)
if e.errno == errno.EINTR:[](#l18.36)
continue[](#l18.37)
# Child process not yet created. See #1731717[](#l18.38)
# e.errno == errno.ECHILD == 10[](#l18.39)
return None[](#l18.40)
else:[](#l18.41)
break[](#l18.42)
if pid == self.pid:[](#l18.43)
if os.WIFSIGNALED(sts):[](#l18.44)
self.returncode = -os.WTERMSIG(sts)[](#l18.45)
else:[](#l18.46)
assert os.WIFEXITED(sts)[](#l18.47)
self.returncode = os.WEXITSTATUS(sts)[](#l18.48)
return self.returncode[](#l18.49)
- def wait(self, timeout=None):
if self.returncode is None:[](#l18.52)
if timeout is not None:[](#l18.53)
from .connection import wait[](#l18.54)
if not wait([self.sentinel], timeout):[](#l18.55)
return None[](#l18.56)
# This shouldn't block if wait() returned successfully.[](#l18.57)
return self.poll(os.WNOHANG if timeout == 0.0 else 0)[](#l18.58)
return self.returncode[](#l18.59)
- def terminate(self):
if self.returncode is None:[](#l18.62)
try:[](#l18.63)
os.kill(self.pid, signal.SIGTERM)[](#l18.64)
except ProcessLookupError:[](#l18.65)
pass[](#l18.66)
except OSError:[](#l18.67)
if self.wait(timeout=0.1) is None:[](#l18.68)
raise[](#l18.69)
- def _launch(self, process_obj):
code = 1[](#l18.72)
parent_r, child_w = util.pipe()[](#l18.73)
self.pid = os.fork()[](#l18.74)
if self.pid == 0:[](#l18.75)
try:[](#l18.76)
os.close(parent_r)[](#l18.77)
if 'random' in sys.modules:[](#l18.78)
import random[](#l18.79)
random.seed()[](#l18.80)
code = process_obj._bootstrap()[](#l18.81)
finally:[](#l18.82)
os._exit(code)[](#l18.83)
else:[](#l18.84)
os.close(child_w)[](#l18.85)
util.Finalize(self, os.close, (parent_r,))[](#l18.86)
self.sentinel = parent_r[](#l18.87)
new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/popen_forkserver.py @@ -0,0 +1,75 @@ +import io +import os + +from . import reduction +if not reduction.HAVE_SEND_HANDLE:
+from . import forkserver +from . import popen +from . import popen_fork +from . import spawn +from . import util + + +all = ['Popen'] + +# +# Wrapper for an fd used while launching a process +# + +class _DupFd(object):
- def init(self, ind):
self.ind = ind[](#l19.26)
- def detach(self):
return forkserver.get_inherited_fds()[self.ind][](#l19.28)
+ +# +# Start child process using a server process +# + +class Popen(popen_fork.Popen):
- def duplicate_for_child(self, fd):
self._fds.append(fd)[](#l19.43)
return len(self._fds) - 1[](#l19.44)
- def _launch(self, process_obj):
prep_data = spawn.get_preparation_data(process_obj._name)[](#l19.47)
buf = io.BytesIO()[](#l19.48)
popen.set_spawning_popen(self)[](#l19.49)
try:[](#l19.50)
reduction.dump(prep_data, buf)[](#l19.51)
reduction.dump(process_obj, buf)[](#l19.52)
finally:[](#l19.53)
popen.set_spawning_popen(None)[](#l19.54)
self.sentinel, w = forkserver.connect_to_new_process(self._fds)[](#l19.56)
util.Finalize(self, os.close, (self.sentinel,))[](#l19.57)
with open(w, 'wb', closefd=True) as f:[](#l19.58)
f.write(buf.getbuffer())[](#l19.59)
self.pid = forkserver.read_unsigned(self.sentinel)[](#l19.60)
- def poll(self, flag=os.WNOHANG):
if self.returncode is None:[](#l19.63)
from .connection import wait[](#l19.64)
timeout = 0 if flag == os.WNOHANG else None[](#l19.65)
if not wait([self.sentinel], timeout):[](#l19.66)
return None[](#l19.67)
try:[](#l19.68)
self.returncode = forkserver.read_unsigned(self.sentinel)[](#l19.69)
except (OSError, EOFError):[](#l19.70)
# The process ended abnormally perhaps because of a signal[](#l19.71)
self.returncode = 255[](#l19.72)
return self.returncode[](#l19.73)
- @staticmethod
- def ensure_helpers_running():
from . import semaphore_tracker[](#l19.77)
semaphore_tracker.ensure_running()[](#l19.78)
forkserver.ensure_running()[](#l19.79)
new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/popen_spawn_posix.py @@ -0,0 +1,75 @@ +import fcntl +import io +import os + +from . import popen +from . import popen_fork +from . import reduction +from . import spawn +from . import util + +from . import current_process + +all = ['Popen'] + + +# +# Wrapper for an fd used while launching a process +# + +class _DupFd(object):
+ +# +# Start child process using a fresh interpreter +# + +class Popen(popen_fork.Popen):
- def _launch(self, process_obj):
tracker_fd = current_process()._config['semaphore_tracker_fd'][](#l20.47)
self._fds.append(tracker_fd)[](#l20.48)
prep_data = spawn.get_preparation_data(process_obj._name)[](#l20.49)
fp = io.BytesIO()[](#l20.50)
popen.set_spawning_popen(self)[](#l20.51)
try:[](#l20.52)
reduction.dump(prep_data, fp)[](#l20.53)
reduction.dump(process_obj, fp)[](#l20.54)
finally:[](#l20.55)
popen.set_spawning_popen(None)[](#l20.56)
parent_r = child_w = child_r = parent_w = None[](#l20.58)
try:[](#l20.59)
parent_r, child_w = util.pipe()[](#l20.60)
child_r, parent_w = util.pipe()[](#l20.61)
cmd = spawn.get_command_line() + [str(child_r)][](#l20.62)
self._fds.extend([child_r, child_w])[](#l20.63)
self.pid = util.spawnv_passfds(spawn.get_executable(),[](#l20.64)
cmd, self._fds)[](#l20.65)
self.sentinel = parent_r[](#l20.66)
with open(parent_w, 'wb', closefd=False) as f:[](#l20.67)
f.write(fp.getbuffer())[](#l20.68)
finally:[](#l20.69)
if parent_r is not None:[](#l20.70)
util.Finalize(self, os.close, (parent_r,))[](#l20.71)
for fd in (child_r, child_w, parent_w):[](#l20.72)
if fd is not None:[](#l20.73)
os.close(fd)[](#l20.74)
- @staticmethod
- def ensure_helpers_running():
from . import semaphore_tracker[](#l20.78)
semaphore_tracker.ensure_running()[](#l20.79)
new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/popen_spawn_win32.py @@ -0,0 +1,102 @@ +import os +import msvcrt +import signal +import sys +import _winapi + +from . import spawn +from . import popen +from . import reduction +from . import util + +all = ['Popen'] + +# +# +# + +TERMINATE = 0x10000 +WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False)) +WINSERVICE = sys.executable.lower().endswith("pythonservice.exe") + +# +# We define a Popen class similar to the one from subprocess, but +# whose constructor takes a process object as its argument. +# + +class Popen(object):
- def init(self, process_obj):
prep_data = spawn.get_preparation_data(process_obj._name)[](#l21.38)
cmd = ' '.join('"%s"' % x for x in spawn.get_command_line())[](#l21.39)
# read end of pipe will be "stolen" by the child process[](#l21.41)
# -- see spawn_main() in spawn.py.[](#l21.42)
rhandle, whandle = _winapi.CreatePipe(None, 0)[](#l21.43)
wfd = msvcrt.open_osfhandle(whandle, 0)[](#l21.44)
cmd += ' {} {}'.format(os.getpid(), rhandle)[](#l21.45)
with open(wfd, 'wb', closefd=True) as to_child:[](#l21.47)
# start process[](#l21.48)
try:[](#l21.49)
hp, ht, pid, tid = _winapi.CreateProcess([](#l21.50)
spawn.get_executable(), cmd,[](#l21.51)
None, None, False, 0, None, None, None)[](#l21.52)
_winapi.CloseHandle(ht)[](#l21.53)
except:[](#l21.54)
_winapi.CloseHandle(rhandle)[](#l21.55)
raise[](#l21.56)
# set attributes of self[](#l21.58)
self.pid = pid[](#l21.59)
self.returncode = None[](#l21.60)
self._handle = hp[](#l21.61)
self.sentinel = int(hp)[](#l21.62)
util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))[](#l21.63)
# send information to child[](#l21.65)
popen.set_spawning_popen(self)[](#l21.66)
try:[](#l21.67)
reduction.dump(prep_data, to_child)[](#l21.68)
reduction.dump(process_obj, to_child)[](#l21.69)
finally:[](#l21.70)
popen.set_spawning_popen(None)[](#l21.71)
- def duplicate_for_child(self, handle):
assert self is popen.get_spawning_popen()[](#l21.74)
return reduction.duplicate(handle, self.sentinel)[](#l21.75)
- def wait(self, timeout=None):
if self.returncode is None:[](#l21.78)
if timeout is None:[](#l21.79)
msecs = _winapi.INFINITE[](#l21.80)
else:[](#l21.81)
msecs = max(0, int(timeout * 1000 + 0.5))[](#l21.82)
res = _winapi.WaitForSingleObject(int(self._handle), msecs)[](#l21.84)
if res == _winapi.WAIT_OBJECT_0:[](#l21.85)
code = _winapi.GetExitCodeProcess(self._handle)[](#l21.86)
if code == TERMINATE:[](#l21.87)
code = -signal.SIGTERM[](#l21.88)
self.returncode = code[](#l21.89)
return self.returncode[](#l21.91)
- def terminate(self):
if self.returncode is None:[](#l21.97)
try:[](#l21.98)
_winapi.TerminateProcess(int(self._handle), TERMINATE)[](#l21.99)
except OSError:[](#l21.100)
if self.wait(timeout=1.0) is None:[](#l21.101)
raise[](#l21.102)
--- a/Lib/multiprocessing/process.py +++ b/Lib/multiprocessing/process.py @@ -43,7 +43,7 @@ def active_children(): Return list of process objects corresponding to live child processes ''' _cleanup()
# # @@ -51,9 +51,9 @@ def active_children(): def _cleanup(): # check for processes which have finished
_current_process._children.discard(p)[](#l22.19)
_children.discard(p)[](#l22.20)
The Process
class
@@ -63,21 +63,16 @@ class Process(object): ''' Process objects represent activity that is run in a separate process
- The class is analogous to
threading.Thread
''' _Popen = None def init(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None): assert group is None, 'group argument must be None for now'
count = next(_current_process._counter)[](#l22.36)
count = next(_process_counter)[](#l22.37) self._identity = _current_process._identity + (count,)[](#l22.38)
self._authkey = _current_process._authkey[](#l22.39)
if daemon is not None:[](#l22.40)
self._daemonic = daemon[](#l22.41)
else:[](#l22.42)
self._daemonic = _current_process._daemonic[](#l22.43)
self._tempdir = _current_process._tempdir[](#l22.44)
self._config = _current_process._config.copy()[](#l22.45) self._parent_pid = os.getpid()[](#l22.46) self._popen = None[](#l22.47) self._target = target[](#l22.48)
@@ -85,6 +80,8 @@ class Process(object): self._kwargs = dict(kwargs) self._name = name or type(self).name + '-' + [](#l22.51) ':'.join(str(i) for i in self._identity)
if daemon is not None:[](#l22.53)
self.daemon = daemon[](#l22.54) _dangling.add(self)[](#l22.55)
def run(self): @@ -101,16 +98,16 @@ class Process(object): assert self._popen is None, 'cannot start a process twice' assert self._parent_pid == os.getpid(), [](#l22.60) 'can only start a process object created by current process'
assert not _current_process._daemonic, \[](#l22.62)
assert not _current_process._config.get('daemon'), \[](#l22.63) 'daemonic processes are not allowed to have children'[](#l22.64) _cleanup()[](#l22.65) if self._Popen is not None:[](#l22.66) Popen = self._Popen[](#l22.67) else:[](#l22.68)
from .forking import Popen[](#l22.69)
from .popen import Popen[](#l22.70) self._popen = Popen(self)[](#l22.71) self._sentinel = self._popen.sentinel[](#l22.72)
_current_process._children.add(self)[](#l22.73)
_children.add(self)[](#l22.74)
def terminate(self): ''' @@ -126,7 +123,7 @@ class Process(object): assert self._popen is not None, 'can only join a started process' res = self._popen.wait(timeout) if res is not None:
_current_process._children.discard(self)[](#l22.82)
_children.discard(self)[](#l22.83)
def is_alive(self): ''' @@ -154,7 +151,7 @@ class Process(object): ''' Return whether process is a daemon '''
return self._daemonic[](#l22.91)
return self._config.get('daemon', False)[](#l22.92)
@daemon.setter def daemon(self, daemonic): @@ -162,18 +159,18 @@ class Process(object): Set whether process is a daemon ''' assert self._popen is None, 'process has already started'
self._daemonic = daemonic[](#l22.100)
self._config['daemon'] = daemonic[](#l22.101)
return self._authkey[](#l22.105)
return self._config['authkey'][](#l22.106)
@authkey.setter def authkey(self, authkey): ''' Set authorization key of process '''
self._authkey = AuthenticationString(authkey)[](#l22.113)
self._config['authkey'] = AuthenticationString(authkey)[](#l22.114)
@property def exitcode(self): @@ -227,17 +224,17 @@ class Process(object): status = 'stopped[%s]' % _exitcode_to_name.get(status, status) return '<%s(%s, %s%s)>' % (type(self).name, self._name,
status, self._daemonic and ' daemon' or '')[](#l22.122)
status, self.daemon and ' daemon' or '')[](#l22.123)
## def _bootstrap(self): from . import util
global _current_process[](#l22.129)
global _current_process, _process_counter, _children[](#l22.130)
self._children = set()[](#l22.133)
self._counter = itertools.count(1)[](#l22.134)
_process_counter = itertools.count(1)[](#l22.135)
_children = set()[](#l22.136) if sys.stdin is not None:[](#l22.137) try:[](#l22.138) sys.stdin.close()[](#l22.139)
@@ -285,8 +282,8 @@ class Process(object): class AuthenticationString(bytes): def reduce(self):
from .forking import Popen[](#l22.144)
if not Popen.thread_is_spawning():[](#l22.145)
from .popen import get_spawning_popen[](#l22.146)
if get_spawning_popen() is None:[](#l22.147) raise TypeError([](#l22.148) 'Pickling an AuthenticationString object is '[](#l22.149) 'disallowed for security reasons'[](#l22.150)
@@ -301,16 +298,19 @@ class _MainProcess(Process): def init(self): self._identity = ()
self._daemonic = False[](#l22.155) self._name = 'MainProcess'[](#l22.156) self._parent_pid = None[](#l22.157) self._popen = None[](#l22.158)
self._counter = itertools.count(1)[](#l22.159)
self._children = set()[](#l22.160)
self._authkey = AuthenticationString(os.urandom(32))[](#l22.161)
self._tempdir = None[](#l22.162)
self._config = {'authkey': AuthenticationString(os.urandom(32)),[](#l22.163)
'semprefix': 'mp'}[](#l22.164)
# Note that some versions of FreeBSD only allow named[](#l22.165)
# semaphores to have names of up to 14 characters. Therfore[](#l22.166)
# we choose a short prefix.[](#l22.167)
+ _current_process = _MainProcess() +_process_counter = itertools.count(1) +_children = set() del _MainProcess #
--- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -18,11 +18,15 @@ import weakref import errno from queue import Empty, Full + import _multiprocessing -from multiprocessing.connection import Pipe -from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition -from multiprocessing.util import debug, info, Finalize, register_after_fork -from multiprocessing.forking import assert_spawning, ForkingPickler + +from . import connection +from . import popen +from . import synchronize + +from .util import debug, info, Finalize, register_after_fork, is_exiting +from .reduction import ForkingPickler #
Queue type using a pipe, buffer and thread
@@ -34,14 +38,14 @@ class Queue(object): if maxsize <= 0: maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX self._maxsize = maxsize
self._reader, self._writer = Pipe(duplex=False)[](#l23.27)
self._rlock = Lock()[](#l23.28)
self._reader, self._writer = connection.Pipe(duplex=False)[](#l23.29)
self._rlock = synchronize.Lock()[](#l23.30) self._opid = os.getpid()[](#l23.31) if sys.platform == 'win32':[](#l23.32) self._wlock = None[](#l23.33) else:[](#l23.34)
self._wlock = Lock()[](#l23.35)
self._sem = BoundedSemaphore(maxsize)[](#l23.36)
self._wlock = synchronize.Lock()[](#l23.37)
self._sem = synchronize.BoundedSemaphore(maxsize)[](#l23.38) # For use by concurrent.futures[](#l23.39) self._ignore_epipe = False[](#l23.40)
@@ -51,7 +55,7 @@ class Queue(object): register_after_fork(self, Queue._after_fork) def getstate(self):
assert_spawning(self)[](#l23.46)
popen.assert_spawning(self)[](#l23.47) return (self._ignore_epipe, self._maxsize, self._reader, self._writer,[](#l23.48) self._rlock, self._wlock, self._sem, self._opid)[](#l23.49)
@@ -208,8 +212,6 @@ class Queue(object): @staticmethod def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): debug('starting thread to feed data to pipe')
from .util import is_exiting[](#l23.55)
- nacquire = notempty.acquire nrelease = notempty.release nwait = notempty.wait @@ -279,8 +281,8 @@ class JoinableQueue(Queue): def init(self, maxsize=0): Queue.init(self, maxsize)
self._unfinished_tasks = Semaphore(0)[](#l23.64)
self._cond = Condition()[](#l23.65)
self._unfinished_tasks = synchronize.Semaphore(0)[](#l23.66)
self._cond = synchronize.Condition()[](#l23.67)
def getstate(self): return Queue.getstate(self) + (self._cond, self._unfinished_tasks) @@ -331,19 +333,19 @@ class JoinableQueue(Queue): class SimpleQueue(object): def init(self):
self._reader, self._writer = Pipe(duplex=False)[](#l23.75)
self._rlock = Lock()[](#l23.76)
self._reader, self._writer = connection.Pipe(duplex=False)[](#l23.77)
self._rlock = synchronize.Lock()[](#l23.78) self._poll = self._reader.poll[](#l23.79) if sys.platform == 'win32':[](#l23.80) self._wlock = None[](#l23.81) else:[](#l23.82)
self._wlock = Lock()[](#l23.83)
self._wlock = synchronize.Lock()[](#l23.84)
def empty(self): return not self._poll() def getstate(self):
assert_spawning(self)[](#l23.90)
popen.assert_spawning(self)[](#l23.91) return (self._reader, self._writer, self._rlock, self._wlock)[](#l23.92)
--- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -1,6 +1,5 @@ # -# Module to allow connection and socket objects to be transferred -# between processes +# Module which deals with pickling of objects. #
multiprocessing/reduction.py
Licensed to PSF under a Contributor Agreement.
# -all = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] - +import copyreg +import functools +import io import os -import sys +import pickle import socket -import threading -import struct -import signal +import sys -from multiprocessing import current_process -from multiprocessing.util import register_after_fork, debug, sub_debug -from multiprocessing.util import is_exiting, sub_warning +from . import popen +from . import util + +all = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump'] +HAVE_SEND_HANDLE = (sys.platform == 'win32' or
(hasattr(socket, 'CMSG_LEN') and[](#l24.39)
hasattr(socket, 'SCM_RIGHTS') and[](#l24.40)
hasattr(socket.socket, 'sendmsg')))[](#l24.41)
+ # -# +# Pickler subclass # -if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
hasattr(socket, 'SCM_RIGHTS'))):[](#l24.49)
- raise ImportError('pickling of connections not supported')
+class ForkingPickler(pickle.Pickler):
- '''Pickler subclass used by multiprocessing.'''
- _extra_reducers = {}
- _copyreg_dispatch_table = copyreg.dispatch_table
- def init(self, *args):
super().__init__(*args)[](#l24.57)
self.dispatch_table = self._copyreg_dispatch_table.copy()[](#l24.58)
self.dispatch_table.update(self._extra_reducers)[](#l24.59)
- @classmethod
- def register(cls, type, reduce):
'''Register a reduce function for a type.'''[](#l24.63)
cls._extra_reducers[type] = reduce[](#l24.64)
- @classmethod
- def dumps(cls, obj, protocol=None):
buf = io.BytesIO()[](#l24.68)
cls(buf, protocol).dump(obj)[](#l24.69)
return buf.getbuffer()[](#l24.70)
+ +register = ForkingPickler.register + +def dump(obj, file, protocol=None):
Platform specific definitions
@@ -36,20 +65,44 @@ if not(sys.platform == 'win32' or (hasat if sys.platform == 'win32': # Windows
- all += ['DupHandle', 'duplicate', 'steal_handle'] import _winapi
- def duplicate(handle, target_process=None, inheritable=False):
'''Duplicate a handle. (target_process is a handle not a pid!)'''[](#l24.91)
if target_process is None:[](#l24.92)
target_process = _winapi.GetCurrentProcess()[](#l24.93)
return _winapi.DuplicateHandle([](#l24.94)
_winapi.GetCurrentProcess(), handle, target_process,[](#l24.95)
0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)[](#l24.96)
- def steal_handle(source_pid, handle):
'''Steal a handle from process identified by source_pid.'''[](#l24.99)
source_process_handle = _winapi.OpenProcess([](#l24.100)
_winapi.PROCESS_DUP_HANDLE, False, source_pid)[](#l24.101)
try:[](#l24.102)
return _winapi.DuplicateHandle([](#l24.103)
source_process_handle, handle,[](#l24.104)
_winapi.GetCurrentProcess(), 0, False,[](#l24.105)
_winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)[](#l24.106)
finally:[](#l24.107)
_winapi.CloseHandle(source_process_handle)[](#l24.108)
+ def send_handle(conn, handle, destination_pid):
'''Send a handle over a local connection.'''[](#l24.111) dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)[](#l24.112) conn.send(dh)[](#l24.113)
'''Receive a handle over a local connection.'''[](#l24.116) return conn.recv().detach()[](#l24.117)
'''Picklable wrapper for a handle.'''[](#l24.120) def __init__(self, handle, access, pid=None):[](#l24.121)
# duplicate handle for process with given pid[](#l24.122) if pid is None:[](#l24.123)
# We just duplicate the handle in the current process and[](#l24.124)
# let the receiving process steal the handle.[](#l24.125) pid = os.getpid()[](#l24.126) proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)[](#l24.127) try:[](#l24.128)
@@ -62,9 +115,12 @@ if sys.platform == 'win32': self._pid = pid def detach(self):
'''Get the handle. This should only be called once.'''[](#l24.133) # retrieve handle from process which currently owns it[](#l24.134) if self._pid == os.getpid():[](#l24.135)
# The handle has already been duplicated for this process.[](#l24.136) return self._handle[](#l24.137)
# We must steal the handle from the process whose pid is self._pid.[](#l24.138) proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,[](#l24.139) self._pid)[](#l24.140) try:[](#l24.141)
@@ -74,207 +130,112 @@ if sys.platform == 'win32': finally: _winapi.CloseHandle(proc)
- class DupSocket(object):
def __init__(self, sock):[](#l24.147)
new_sock = sock.dup()[](#l24.148)
def send(conn, pid):[](#l24.149)
share = new_sock.share(pid)[](#l24.150)
conn.send_bytes(share)[](#l24.151)
self._id = resource_sharer.register(send, new_sock.close)[](#l24.152)
def detach(self):[](#l24.154)
conn = resource_sharer.get_connection(self._id)[](#l24.155)
try:[](#l24.156)
share = conn.recv_bytes()[](#l24.157)
return socket.fromshare(share)[](#l24.158)
finally:[](#l24.159)
conn.close()[](#l24.160)
- def reduce_connection(conn):
handle = conn.fileno()[](#l24.169)
with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:[](#l24.170)
ds = DupSocket(s)[](#l24.171)
return rebuild_connection, (ds, conn.readable, conn.writable)[](#l24.172)
- def rebuild_connection(ds, readable, writable):
from .connection import Connection[](#l24.175)
sock = ds.detach()[](#l24.176)
return Connection(sock.detach(), readable, writable)[](#l24.177)
- def reduce_pipe_connection(conn):
access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |[](#l24.180)
(_winapi.FILE_GENERIC_WRITE if conn.writable else 0))[](#l24.181)
dh = DupHandle(conn.fileno(), access)[](#l24.182)
return rebuild_pipe_connection, (dh, conn.readable, conn.writable)[](#l24.183)
- def rebuild_pipe_connection(dh, readable, writable):
from .connection import PipeConnection[](#l24.186)
handle = dh.detach()[](#l24.187)
return PipeConnection(handle, readable, writable)[](#l24.188)
# On MacOSX we should acknowledge receipt of fds -- see Issue14669 ACKNOWLEDGE = sys.platform == 'darwin'
- def sendfds(sock, fds):
'''Send an array of fds over an AF_UNIX socket.'''[](#l24.199)
fds = array.array('i', fds)[](#l24.200)
msg = bytes([len(fds) % 256])[](#l24.201)
sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])[](#l24.202)
if ACKNOWLEDGE and sock.recv(1) != b'A':[](#l24.203)
raise RuntimeError('did not receive acknowledgement of fd')[](#l24.204)
- def recvfds(sock, size):
'''Receive an array of fds over an AF_UNIX socket.'''[](#l24.207)
a = array.array('i')[](#l24.208)
bytes_size = a.itemsize * size[](#l24.209)
msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))[](#l24.210)
if not msg and not ancdata:[](#l24.211)
raise EOFError[](#l24.212)
try:[](#l24.213)
if ACKNOWLEDGE:[](#l24.214)
sock.send(b'A')[](#l24.215)
if len(ancdata) != 1:[](#l24.216)
raise RuntimeError('received %d items of ancdata' %[](#l24.217)
len(ancdata))[](#l24.218)
cmsg_level, cmsg_type, cmsg_data = ancdata[0][](#l24.219)
if (cmsg_level == socket.SOL_SOCKET and[](#l24.220)
cmsg_type == socket.SCM_RIGHTS):[](#l24.221)
if len(cmsg_data) % a.itemsize != 0:[](#l24.222)
raise ValueError[](#l24.223)
a.frombytes(cmsg_data)[](#l24.224)
assert len(a) % 256 == msg[0][](#l24.225)
return list(a)[](#l24.226)
except (ValueError, IndexError):[](#l24.227)
pass[](#l24.228)
raise RuntimeError('Invalid data received')[](#l24.229)
+ def send_handle(conn, handle, destination_pid):
'''Send a handle over a local connection.'''[](#l24.232) with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:[](#l24.233)
s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,[](#l24.234)
struct.pack("@i", handle))])[](#l24.235)
if ACKNOWLEDGE and conn.recv_bytes() != b'ACK':[](#l24.236)
raise RuntimeError('did not receive acknowledgement of fd')[](#l24.237)
sendfds(s, [handle])[](#l24.238)
size = struct.calcsize("@i")[](#l24.241)
'''Receive a handle over a local connection.'''[](#l24.242) with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:[](#l24.243)
msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))[](#l24.244)
try:[](#l24.245)
if ACKNOWLEDGE:[](#l24.246)
conn.send_bytes(b'ACK')[](#l24.247)
cmsg_level, cmsg_type, cmsg_data = ancdata[0][](#l24.248)
if (cmsg_level == socket.SOL_SOCKET and[](#l24.249)
cmsg_type == socket.SCM_RIGHTS):[](#l24.250)
return struct.unpack("@i", cmsg_data[:size])[0][](#l24.251)
except (ValueError, IndexError, struct.error):[](#l24.252)
pass[](#l24.253)
raise RuntimeError('Invalid data received')[](#l24.254)
- class DupFd(object):
def __init__(self, fd):[](#l24.257)
new_fd = os.dup(fd)[](#l24.258)
def send(conn, pid):[](#l24.259)
send_handle(conn, new_fd, pid)[](#l24.260)
def close():[](#l24.261)
os.close(new_fd)[](#l24.262)
self._id = resource_sharer.register(send, close)[](#l24.263)
return recvfds(s, 1)[0][](#l24.264)
def detach(self):[](#l24.266)
conn = resource_sharer.get_connection(self._id)[](#l24.267)
try:[](#l24.268)
return recv_handle(conn)[](#l24.269)
finally:[](#l24.270)
conn.close()[](#l24.271)
- def reduce_socket(s):
df = DupFd(s.fileno())[](#l24.274)
return rebuild_socket, (df, s.family, s.type, s.proto)[](#l24.275)
- def rebuild_socket(df, family, type, proto):
fd = df.detach()[](#l24.278)
s = socket.fromfd(fd, family, type, proto)[](#l24.279)
os.close(fd)[](#l24.280)
return s[](#l24.281)
- def reduce_connection(conn):
df = DupFd(conn.fileno())[](#l24.284)
return rebuild_connection, (df, conn.readable, conn.writable)[](#l24.285)
- def rebuild_connection(df, readable, writable):
from .connection import Connection[](#l24.288)
fd = df.detach()[](#l24.289)
return Connection(fd, readable, writable)[](#l24.290)
- def DupFd(fd):
'''Return a wrapper for an fd.'''[](#l24.292)
popen_obj = popen.get_spawning_popen()[](#l24.293)
if popen_obj is not None:[](#l24.294)
return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))[](#l24.295)
elif HAVE_SEND_HANDLE:[](#l24.296)
from . import resource_sharer[](#l24.297)
return resource_sharer.DupFd(fd)[](#l24.298)
else:[](#l24.299)
raise ValueError('SCM_RIGHTS appears not to be available')[](#l24.300)
# -# Server which shares registered resources with clients +# Try making some callable types picklable # -class ResourceSharer(object):
- def init(self):
self._key = 0[](#l24.309)
self._cache = {}[](#l24.310)
self._old_locks = [][](#l24.311)
self._lock = threading.Lock()[](#l24.312)
self._listener = None[](#l24.313)
self._address = None[](#l24.314)
self._thread = None[](#l24.315)
register_after_fork(self, ResourceSharer._afterfork)[](#l24.316)
- def register(self, send, close):
with self._lock:[](#l24.319)
if self._address is None:[](#l24.320)
self._start()[](#l24.321)
self._key += 1[](#l24.322)
self._cache[self._key] = (send, close)[](#l24.323)
return (self._address, self._key)[](#l24.324)
- if m.self is None:
return getattr, (m.__class__, m.__func__.__name__)[](#l24.327)
- else:
return getattr, (m.__self__, m.__func__.__name__)[](#l24.329)
+register(type(_C().f), _reduce_method)
- @staticmethod
- def get_connection(ident):
from .connection import Client[](#l24.337)
address, key = ident[](#l24.338)
c = Client(address, authkey=current_process().authkey)[](#l24.339)
c.send((key, os.getpid()))[](#l24.340)
return c[](#l24.341)
- def stop(self, timeout=None):
from .connection import Client[](#l24.344)
with self._lock:[](#l24.345)
if self._address is not None:[](#l24.346)
c = Client(self._address, authkey=current_process().authkey)[](#l24.347)
c.send(None)[](#l24.348)
c.close()[](#l24.349)
self._thread.join(timeout)[](#l24.350)
if self._thread.is_alive():[](#l24.351)
sub_warn('ResourceSharer thread did not stop when asked')[](#l24.352)
self._listener.close()[](#l24.353)
self._thread = None[](#l24.354)
self._address = None[](#l24.355)
self._listener = None[](#l24.356)
for key, (send, close) in self._cache.items():[](#l24.357)
close()[](#l24.358)
self._cache.clear()[](#l24.359)
+def _reduce_method_descriptor(m):
+register(type(list.append), _reduce_method_descriptor) +register(type(int.add), _reduce_method_descriptor) +
- def _afterfork(self):
for key, (send, close) in self._cache.items():[](#l24.367)
close()[](#l24.368)
self._cache.clear()[](#l24.369)
# If self._lock was locked at the time of the fork, it may be broken[](#l24.370)
# -- see issue 6721. Replace it without letting it be gc'ed.[](#l24.371)
self._old_locks.append(self._lock)[](#l24.372)
self._lock = threading.Lock()[](#l24.373)
if self._listener is not None:[](#l24.374)
self._listener.close()[](#l24.375)
self._listener = None[](#l24.376)
self._address = None[](#l24.377)
self._thread = None[](#l24.378)
+def _rebuild_partial(func, args, keywords):
+register(functools.partial, _reduce_partial) + +# +# Make sockets picklable +#
- def _start(self):
from .connection import Listener[](#l24.390)
assert self._listener is None[](#l24.391)
debug('starting listener and thread for sending handles')[](#l24.392)
self._listener = Listener(authkey=current_process().authkey)[](#l24.393)
self._address = self._listener.address[](#l24.394)
t = threading.Thread(target=self._serve)[](#l24.395)
t.daemon = True[](#l24.396)
t.start()[](#l24.397)
self._thread = t[](#l24.398)
- def _reduce_socket(s):
from .resource_sharer import DupSocket[](#l24.401)
return _rebuild_socket, (DupSocket(s),)[](#l24.402)
- def _rebuild_socket(ds):
return ds.detach()[](#l24.404)
- register(socket.socket, _reduce_socket)
- def _serve(self):
if hasattr(signal, 'pthread_sigmask'):[](#l24.408)
signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))[](#l24.409)
while 1:[](#l24.410)
try:[](#l24.411)
conn = self._listener.accept()[](#l24.412)
msg = conn.recv()[](#l24.413)
if msg is None:[](#l24.414)
break[](#l24.415)
key, destination_pid = msg[](#l24.416)
send, close = self._cache.pop(key)[](#l24.417)
send(conn, destination_pid)[](#l24.418)
close()[](#l24.419)
conn.close()[](#l24.420)
except:[](#l24.421)
if not is_exiting():[](#l24.422)
import traceback[](#l24.423)
sub_warning([](#l24.424)
'thread for sharing handles raised exception :\n' +[](#l24.425)
'-'*79 + '\n' + traceback.format_exc() + '-'*79[](#l24.426)
)[](#l24.427)
- -resource_sharer = ResourceSharer() +else:
- def _reduce_socket(s):
df = DupFd(s.fileno())[](#l24.432)
return _rebuild_socket, (df, s.family, s.type, s.proto)[](#l24.433)
- def _rebuild_socket(df, family, type, proto):
fd = df.detach()[](#l24.435)
return socket.socket(family, type, proto, fileno=fd)[](#l24.436)
- register(socket.socket, _reduce_socket)
new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/resource_sharer.py @@ -0,0 +1,158 @@ +# +# We use a background thread for sharing fds on Unix, and for sharing sockets on +# Windows. +# +# A client which wants to pickle a resource registers it with the resource +# sharer and gets an identifier in return. The unpickling process will connect +# to the resource sharer, sends the identifier and its pid, and then receives +# the resource. +# + +import os +import signal +import socket +import sys +import threading + +from . import process +from . import reduction +from . import util + +all = ['stop'] + + +if sys.platform == 'win32':
- class DupSocket(object):
'''Picklable wrapper for a socket.'''[](#l25.32)
def __init__(self, sock):[](#l25.33)
new_sock = sock.dup()[](#l25.34)
def send(conn, pid):[](#l25.35)
share = new_sock.share(pid)[](#l25.36)
conn.send_bytes(share)[](#l25.37)
self._id = _resource_sharer.register(send, new_sock.close)[](#l25.38)
def detach(self):[](#l25.40)
'''Get the socket. This should only be called once.'''[](#l25.41)
with _resource_sharer.get_connection(self._id) as conn:[](#l25.42)
share = conn.recv_bytes()[](#l25.43)
return socket.fromshare(share)[](#l25.44)
- class DupFd(object):
'''Wrapper for fd which can be used at any time.'''[](#l25.50)
def __init__(self, fd):[](#l25.51)
new_fd = os.dup(fd)[](#l25.52)
def send(conn, pid):[](#l25.53)
reduction.send_handle(conn, new_fd, pid)[](#l25.54)
def close():[](#l25.55)
os.close(new_fd)[](#l25.56)
self._id = _resource_sharer.register(send, close)[](#l25.57)
def detach(self):[](#l25.59)
'''Get the fd. This should only be called once.'''[](#l25.60)
with _resource_sharer.get_connection(self._id) as conn:[](#l25.61)
return reduction.recv_handle(conn)[](#l25.62)
+ + +class _ResourceSharer(object):
- '''Manager for resouces using background thread.'''
- def init(self):
self._key = 0[](#l25.68)
self._cache = {}[](#l25.69)
self._old_locks = [][](#l25.70)
self._lock = threading.Lock()[](#l25.71)
self._listener = None[](#l25.72)
self._address = None[](#l25.73)
self._thread = None[](#l25.74)
util.register_after_fork(self, _ResourceSharer._afterfork)[](#l25.75)
- def register(self, send, close):
'''Register resource, returning an identifier.'''[](#l25.78)
with self._lock:[](#l25.79)
if self._address is None:[](#l25.80)
self._start()[](#l25.81)
self._key += 1[](#l25.82)
self._cache[self._key] = (send, close)[](#l25.83)
return (self._address, self._key)[](#l25.84)
- @staticmethod
- def get_connection(ident):
'''Return connection from which to receive identified resource.'''[](#l25.88)
from .connection import Client[](#l25.89)
address, key = ident[](#l25.90)
c = Client(address, authkey=process.current_process().authkey)[](#l25.91)
c.send((key, os.getpid()))[](#l25.92)
return c[](#l25.93)
- def stop(self, timeout=None):
'''Stop the background thread and clear registered resources.'''[](#l25.96)
from .connection import Client[](#l25.97)
with self._lock:[](#l25.98)
if self._address is not None:[](#l25.99)
c = Client(self._address,[](#l25.100)
authkey=process.current_process().authkey)[](#l25.101)
c.send(None)[](#l25.102)
c.close()[](#l25.103)
self._thread.join(timeout)[](#l25.104)
if self._thread.is_alive():[](#l25.105)
util.sub_warning('_ResourceSharer thread did '[](#l25.106)
'not stop when asked')[](#l25.107)
self._listener.close()[](#l25.108)
self._thread = None[](#l25.109)
self._address = None[](#l25.110)
self._listener = None[](#l25.111)
for key, (send, close) in self._cache.items():[](#l25.112)
close()[](#l25.113)
self._cache.clear()[](#l25.114)
- def _afterfork(self):
for key, (send, close) in self._cache.items():[](#l25.117)
close()[](#l25.118)
self._cache.clear()[](#l25.119)
# If self._lock was locked at the time of the fork, it may be broken[](#l25.120)
# -- see issue 6721. Replace it without letting it be gc'ed.[](#l25.121)
self._old_locks.append(self._lock)[](#l25.122)
self._lock = threading.Lock()[](#l25.123)
if self._listener is not None:[](#l25.124)
self._listener.close()[](#l25.125)
self._listener = None[](#l25.126)
self._address = None[](#l25.127)
self._thread = None[](#l25.128)
- def _start(self):
from .connection import Listener[](#l25.131)
assert self._listener is None[](#l25.132)
util.debug('starting listener and thread for sending handles')[](#l25.133)
self._listener = Listener(authkey=process.current_process().authkey)[](#l25.134)
self._address = self._listener.address[](#l25.135)
t = threading.Thread(target=self._serve)[](#l25.136)
t.daemon = True[](#l25.137)
t.start()[](#l25.138)
self._thread = t[](#l25.139)
- def _serve(self):
if hasattr(signal, 'pthread_sigmask'):[](#l25.142)
signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))[](#l25.143)
while 1:[](#l25.144)
try:[](#l25.145)
with self._listener.accept() as conn:[](#l25.146)
msg = conn.recv()[](#l25.147)
if msg is None:[](#l25.148)
break[](#l25.149)
key, destination_pid = msg[](#l25.150)
send, close = self._cache.pop(key)[](#l25.151)
try:[](#l25.152)
send(conn, destination_pid)[](#l25.153)
finally:[](#l25.154)
close()[](#l25.155)
except:[](#l25.156)
if not util.is_exiting():[](#l25.157)
sys.excepthook(*sys.exc_info())[](#l25.158)
+ + +_resource_sharer = _ResourceSharer() +stop = _resource_sharer.stop
new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/semaphore_tracker.py @@ -0,0 +1,135 @@ +# +# On Unix we run a server process which keeps track of unlinked +# semaphores. The server ignores SIGINT and SIGTERM and reads from a +# pipe. Every other process of the program has a copy of the writable +# end of the pipe, so we get EOF when all other processes have exited. +# Then the server process unlinks any remaining semaphore names. +# +# This is important because the system only supports a limited number +# of named semaphores, and they will not be automatically removed till +# the next reboot. Without this semaphore tracker process, "killall +# python" would probably leave unlinked semaphores. +# + +import errno +import os +import signal +import sys +import threading +import warnings +import _multiprocessing + +from . import spawn +from . import util +from . import current_process + +all = ['ensure_running', 'register', 'unregister'] + + +_lock = threading.Lock() + + +def ensure_running():
- This can be run from any process. Usually a child process will use
- the semaphore created by its parent.'''
- with _lock:
config = current_process()._config[](#l26.42)
if config.get('semaphore_tracker_fd') is not None:[](#l26.43)
return[](#l26.44)
fds_to_pass = [][](#l26.45)
try:[](#l26.46)
fds_to_pass.append(sys.stderr.fileno())[](#l26.47)
except Exception:[](#l26.48)
pass[](#l26.49)
cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)'[](#l26.50)
r, semaphore_tracker_fd = util.pipe()[](#l26.51)
try:[](#l26.52)
fds_to_pass.append(r)[](#l26.53)
# process will out live us, so no need to wait on pid[](#l26.54)
exe = spawn.get_executable()[](#l26.55)
args = [exe] + util._args_from_interpreter_flags()[](#l26.56)
args += ['-c', cmd % r][](#l26.57)
util.spawnv_passfds(exe, args, fds_to_pass)[](#l26.58)
except:[](#l26.59)
os.close(semaphore_tracker_fd)[](#l26.60)
raise[](#l26.61)
else:[](#l26.62)
config['semaphore_tracker_fd'] = semaphore_tracker_fd[](#l26.63)
finally:[](#l26.64)
os.close(r)[](#l26.65)
- msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
- if len(name) > 512:
# posix guarantees that writes to a pipe of less than PIPE_BUF[](#l26.81)
# bytes are atomic, and that PIPE_BUF >= 512[](#l26.82)
raise ValueError('name too long')[](#l26.83)
- fd = current_process()._config['semaphore_tracker_fd']
- nbytes = os.write(fd, msg)
- assert nbytes == len(msg)
- '''Run semaphore tracker.'''
protect the process from ^C and "killall python" etc
- signal.signal(signal.SIGINT, signal.SIG_IGN)
- signal.signal(signal.SIGTERM, signal.SIG_IGN)
- for f in (sys.stdin, sys.stdout):
try:[](#l26.96)
f.close()[](#l26.97)
except Exception:[](#l26.98)
pass[](#l26.99)
- cache = set()
- try:
# keep track of registered/unregistered semaphores[](#l26.103)
with open(fd, 'rb') as f:[](#l26.104)
for line in f:[](#l26.105)
try:[](#l26.106)
cmd, name = line.strip().split(b':')[](#l26.107)
if cmd == b'REGISTER':[](#l26.108)
cache.add(name)[](#l26.109)
elif cmd == b'UNREGISTER':[](#l26.110)
cache.remove(name)[](#l26.111)
else:[](#l26.112)
raise RuntimeError('unrecognized command %r' % cmd)[](#l26.113)
except Exception:[](#l26.114)
try:[](#l26.115)
sys.excepthook(*sys.exc_info())[](#l26.116)
except:[](#l26.117)
pass[](#l26.118)
- finally:
# all processes have terminated; cleanup any remaining semaphores[](#l26.120)
if cache:[](#l26.121)
try:[](#l26.122)
warnings.warn('semaphore_tracker: There appear to be %d '[](#l26.123)
'leaked semaphores to clean up at shutdown' %[](#l26.124)
len(cache))[](#l26.125)
except Exception:[](#l26.126)
pass[](#l26.127)
for name in cache:[](#l26.128)
# For some reason the process which created and registered this[](#l26.129)
# semaphore has failed to unregister it. Presumably it has died.[](#l26.130)
# We therefore unlink it.[](#l26.131)
try:[](#l26.132)
name = name.decode('ascii')[](#l26.133)
try:[](#l26.134)
_multiprocessing.sem_unlink(name)[](#l26.135)
except Exception as e:[](#l26.136)
warnings.warn('semaphore_tracker: %r: %s' % (name, e))[](#l26.137)
finally:[](#l26.138)
pass[](#l26.139)
--- a/Lib/multiprocessing/sharedctypes.py +++ b/Lib/multiprocessing/sharedctypes.py @@ -10,8 +10,11 @@ import ctypes import weakref -from multiprocessing import heap, RLock -from multiprocessing.forking import assert_spawning, ForkingPickler +from . import heap + +from .synchronize import RLock +from .reduction import ForkingPickler +from .popen import assert_spawning all = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
new file mode 100644 --- /dev/null +++ b/Lib/multiprocessing/spawn.py @@ -0,0 +1,258 @@ +# +# Code used to start processes when using the spawn or forkserver +# start methods. +# +# multiprocessing/spawn.py +# +# Copyright (c) 2006-2008, R Oudkerk +# Licensed to PSF under a Contributor Agreement. +# + +import os +import pickle +import sys + +from . import process +from . import util +from . import popen + +all = ['_main', 'freeze_support', 'set_executable', 'get_executable',
'get_preparation_data', 'get_command_line', 'import_main_path'][](#l28.24)
+ +# +# _python_exe is the assumed path to the python executable. +# People embedding Python want to modify it. +# + +if sys.platform != 'win32':
- WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
- WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
+ +# +# +# + +def is_forking(argv):
- '''
- Return whether commandline indicates we are forking
- '''
- if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
return True[](#l28.59)
- else:
return False[](#l28.61)
- '''
- Run code for process object if this in not the main process
- '''
- if is_forking(sys.argv):
main()[](#l28.69)
sys.exit()[](#l28.70)
- '''
- Returns prefix of command line used for spawning a child process
- '''
- if getattr(sys, 'frozen', False):
return [sys.executable, '--multiprocessing-fork'][](#l28.78)
- else:
prog = 'from multiprocessing.spawn import spawn_main; spawn_main()'[](#l28.80)
opts = util._args_from_interpreter_flags()[](#l28.81)
return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork'][](#l28.82)
- '''
- Run code specifed by data received over pipe
- '''
- assert is_forking(sys.argv)
- handle = int(sys.argv[-1])
- if sys.platform == 'win32':
import msvcrt[](#l28.92)
from .reduction import steal_handle[](#l28.93)
pid = int(sys.argv[-2])[](#l28.94)
new_handle = steal_handle(pid, handle)[](#l28.95)
fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)[](#l28.96)
- else:
fd = handle[](#l28.98)
- exitcode = _main(fd)
- sys.exit(exitcode)
- with os.fdopen(fd, 'rb', closefd=True) as from_parent:
process.current_process()._inheriting = True[](#l28.105)
try:[](#l28.106)
preparation_data = pickle.load(from_parent)[](#l28.107)
prepare(preparation_data)[](#l28.108)
self = pickle.load(from_parent)[](#l28.109)
finally:[](#l28.110)
del process.current_process()._inheriting[](#l28.111)
- return self._bootstrap()
+ + +def _check_not_importing_main():
- if getattr(process.current_process(), '_inheriting', False):
raise RuntimeError('''[](#l28.117)
An attempt has been made to start a new process before the[](#l28.118)
current process has finished its bootstrapping phase.[](#l28.119)
This probably means that you are not using fork to start your[](#l28.121)
child processes and you have forgotten to use the proper idiom[](#l28.122)
in the main module:[](#l28.123)
if __name__ == '__main__':[](#l28.125)
freeze_support()[](#l28.126)
...[](#l28.127)
The "freeze_support()" line can be omitted if the program[](#l28.129)
is not going to be frozen to produce an executable.''')[](#l28.130)
+ + +def get_preparation_data(name):
- '''
- Return info about parent needed by child to unpickle process object
- '''
- _check_not_importing_main()
- d = dict(
log_to_stderr=util._log_to_stderr,[](#l28.139)
authkey=process.current_process().authkey,[](#l28.140)
)[](#l28.141)
- sys_path=sys.path.copy()
- try:
i = sys_path.index('')[](#l28.148)
- except ValueError:
pass[](#l28.150)
- else:
sys_path[i] = process.ORIGINAL_DIR[](#l28.152)
- d.update(
name=name,[](#l28.155)
sys_path=sys_path,[](#l28.156)
sys_argv=sys.argv,[](#l28.157)
orig_dir=process.ORIGINAL_DIR,[](#l28.158)
dir=os.getcwd(),[](#l28.159)
start_method=popen.get_start_method(),[](#l28.160)
)[](#l28.161)
- if sys.platform != 'win32' or (not WINEXE and not WINSERVICE):
main_path = getattr(sys.modules['__main__'], '__file__', None)[](#l28.164)
if not main_path and sys.argv[0] not in ('', '-c'):[](#l28.165)
main_path = sys.argv[0][](#l28.166)
if main_path is not None:[](#l28.167)
if (not os.path.isabs(main_path) and[](#l28.168)
process.ORIGINAL_DIR is not None):[](#l28.169)
main_path = os.path.join(process.ORIGINAL_DIR, main_path)[](#l28.170)
d['main_path'] = os.path.normpath(main_path)[](#l28.171)
+ +# +# Prepare current process +# + +old_main_modules = [] + +def prepare(data):
- '''
- Try to get current process ready to unpickle process object
- '''
- if 'name' in data:
process.current_process().name = data['name'][](#l28.186)
- if 'start_method' in data:
popen.set_start_method(data['start_method'], start_helpers=False)[](#l28.210)
+ + +def import_main_path(main_path):
- '''
- Set sys.modules['main'] to module at main_path
- '''
XXX (ncoghlan): The following code makes several bogus
assumptions regarding the relationship between file
and a module's real name. See PEP 302 and issue #10845
- if getattr(sys.modules['main'], 'file', None) == main_path:
return[](#l28.224)
- main_name = os.path.splitext(os.path.basename(main_path))[0]
- if main_name == 'init':
main_name = os.path.basename(os.path.dirname(main_path))[](#l28.228)
- if main_name == 'main':
main_module = sys.modules['__main__'][](#l28.231)
main_module.__file__ = main_path[](#l28.232)
- elif main_name != 'ipython':
# Main modules not actually called __main__.py may[](#l28.234)
# contain additional code that should still be executed[](#l28.235)
import importlib[](#l28.236)
import types[](#l28.237)
if main_path is None:[](#l28.239)
dirs = None[](#l28.240)
elif os.path.basename(main_path).startswith('__init__.py'):[](#l28.241)
dirs = [os.path.dirname(os.path.dirname(main_path))][](#l28.242)
else:[](#l28.243)
dirs = [os.path.dirname(main_path)][](#l28.244)
assert main_name not in sys.modules, main_name[](#l28.246)
sys.modules.pop('__mp_main__', None)[](#l28.247)
# We should not try to load __main__[](#l28.248)
# since that would execute 'if __name__ == "__main__"'[](#l28.249)
# clauses, potentially causing a psuedo fork bomb.[](#l28.250)
loader = importlib.find_loader(main_name, path=dirs)[](#l28.251)
main_module = types.ModuleType(main_name)[](#l28.252)
try:[](#l28.253)
loader.init_module_attrs(main_module)[](#l28.254)
except AttributeError: # init_module_attrs is optional[](#l28.255)
pass[](#l28.256)
main_module.__name__ = '__mp_main__'[](#l28.257)
code = loader.get_code(main_name)[](#l28.258)
exec(code, main_module.__dict__)[](#l28.259)
old_main_modules.append(sys.modules['__main__'])[](#l28.261)
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module[](#l28.262)
--- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -11,20 +11,24 @@ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event' ] +import os import threading import sys - +import itertools +import tempfile import _multiprocessing -from multiprocessing.process import current_process -from multiprocessing.util import register_after_fork, debug -from multiprocessing.forking import assert_spawning, Popen + from time import time as _time +from . import popen +from . import process +from . import util +
Try to import the mp.synchronize module cleanly, if it fails
raise ImportError for platforms lacking a working sem_open implementation.
See issue 3770
except (ImportError): raise ImportError("This platform lacks a functioning sem_open" + " implementation, therefore, the required" + @@ -44,15 +48,45 @@ SEM_VALUE_MAX = _multiprocessing.SemLock class SemLock(object):
+ def init(self, kind, value, maxvalue):
sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)[](#l29.40)
debug('created semlock with handle %s' % sl.handle)[](#l29.41)
unlink_immediately = (sys.platform == 'win32' or[](#l29.42)
popen.get_start_method() == 'fork')[](#l29.43)
for i in range(100):[](#l29.44)
try:[](#l29.45)
sl = self._semlock = _multiprocessing.SemLock([](#l29.46)
kind, value, maxvalue, self._make_name(),[](#l29.47)
unlink_immediately)[](#l29.48)
except FileExistsError:[](#l29.49)
pass[](#l29.50)
else:[](#l29.51)
break[](#l29.52)
else:[](#l29.53)
raise FileExistsError('cannot find name for semaphore')[](#l29.54)
util.debug('created semlock with handle %s' % sl.handle)[](#l29.56) self._make_methods()[](#l29.57)
if sys.platform != 'win32': def _after_fork(obj): obj._semlock._after_fork()
register_after_fork(self, _after_fork)[](#l29.62)
util.register_after_fork(self, _after_fork)[](#l29.63)
if self._semlock.name is not None:[](#l29.65)
# We only get here if we are on Unix with forking[](#l29.66)
# disabled. When the object is garbage collected or the[](#l29.67)
# process shuts down we unlink the semaphore name[](#l29.68)
from .semaphore_tracker import register[](#l29.69)
register(self._semlock.name)[](#l29.70)
util.Finalize(self, SemLock._cleanup, (self._semlock.name,),[](#l29.71)
exitpriority=0)[](#l29.72)
- @staticmethod
- def _cleanup(name):
from .semaphore_tracker import unregister[](#l29.76)
sem_unlink(name)[](#l29.77)
unregister(name)[](#l29.78)
def _make_methods(self): self.acquire = self._semlock.acquire @@ -65,15 +99,24 @@ class SemLock(object): return self._semlock.exit(*args) def getstate(self):
assert_spawning(self)[](#l29.86)
popen.assert_spawning(self)[](#l29.87) sl = self._semlock[](#l29.88)
return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)[](#l29.89)
if sys.platform == 'win32':[](#l29.90)
h = popen.get_spawning_popen().duplicate_for_child(sl.handle)[](#l29.91)
else:[](#l29.92)
h = sl.handle[](#l29.93)
return (h, sl.kind, sl.maxvalue, sl.name)[](#l29.94)
def setstate(self, state): self._semlock = _multiprocessing.SemLock._rebuild(*state)
debug('recreated blocker with handle %r' % state[0])[](#l29.98)
util.debug('recreated blocker with handle %r' % state[0])[](#l29.99) self._make_methods()[](#l29.100)
- @staticmethod
- def _make_name():
return '/%s-%s' % (process.current_process()._config['semprefix'],[](#l29.104)
next(SemLock._rand))[](#l29.105)
Semaphore
# @@ -122,7 +165,7 @@ class Lock(SemLock): def repr(self): try: if self._semlock._is_mine():
name = current_process().name[](#l29.114)
name = process.current_process().name[](#l29.115) if threading.current_thread().name != 'MainThread':[](#l29.116) name += '|' + threading.current_thread().name[](#l29.117) elif self._semlock._get_value() == 1:[](#l29.118)
@@ -147,7 +190,7 @@ class RLock(SemLock): def repr(self): try: if self._semlock._is_mine():
name = current_process().name[](#l29.123)
name = process.current_process().name[](#l29.124) if threading.current_thread().name != 'MainThread':[](#l29.125) name += '|' + threading.current_thread().name[](#l29.126) count = self._semlock._count()[](#l29.127)
@@ -175,7 +218,7 @@ class Condition(object): self._make_methods() def getstate(self):
assert_spawning(self)[](#l29.132)
popen.assert_spawning(self)[](#l29.133) return (self._lock, self._sleeping_count,[](#l29.134) self._woken_count, self._wait_semaphore)[](#l29.135)
@@ -342,7 +385,7 @@ class Barrier(threading.Barrier): def init(self, parties, action=None, timeout=None): import struct
from multiprocessing.heap import BufferWrapper[](#l29.141)
from .heap import BufferWrapper[](#l29.142) wrapper = BufferWrapper(struct.calcsize('i') * 2)[](#l29.143) cond = Condition()[](#l29.144) self.__setstate__((parties, action, timeout, cond, wrapper))[](#l29.145)
--- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -17,13 +17,13 @@ import threading # we want thread # cleanup function before multiprocessing does from subprocess import _args_from_interpreter_flags -from multiprocessing.process import current_process, active_children +from . import process all = [ 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 'log_to_stderr', 'get_temp_dir', 'register_after_fork', 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
# @@ -71,8 +71,6 @@ def get_logger(): _logger = logging.getLogger(LOGGER_NAME) _logger.propagate = 0
logging.addLevelName(SUBDEBUG, 'SUBDEBUG')[](#l30.23)
logging.addLevelName(SUBWARNING, 'SUBWARNING')[](#l30.24)
# XXX multiprocessing should cleanup before logging if hasattr(atexit, 'unregister'): @@ -111,13 +109,14 @@ def log_to_stderr(level=None): def get_temp_dir(): # get name of a temp directory which will be automatically cleaned up
- tempdir = process.current_process()._config.get('tempdir')
- if tempdir is None: import shutil, tempfile tempdir = tempfile.mkdtemp(prefix='pymp-') info('created temp directory %s', tempdir) Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
Support for reinitialization of objects when bootstrapping a child process
@@ -273,8 +272,8 @@ def is_exiting(): _exiting = False def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
active_children=active_children,[](#l30.50)
current_process=current_process):[](#l30.51)
active_children=process.active_children,[](#l30.52)
current_process=process.current_process):[](#l30.53)
We hold on to references to functions in the arglist due to the
situation described below, where this function is called after this
@@ -303,7 +302,7 @@ def _exit_function(info=info, debug=debu module's globals are destroyed. # #9207.
if p._daemonic:[](#l30.61)
if p.daemon:[](#l30.62) info('calling terminate() for daemon %s', p.name)[](#l30.63) p._popen.terminate()[](#l30.64)
@@ -335,3 +334,54 @@ class ForkAwareLocal(threading.local): register_after_fork(self, lambda obj : obj.dict.clear()) def reduce(self): return type(self), () + +# +# Close fds except those specified +# + +try:
+ +def close_all_fds_except(fds):
- fds = list(fds) + [-1, MAXFD]
- fds.sort()
- assert fds[-1] == MAXFD, 'fd too large'
- for i in range(len(fds) - 1):
os.closerange(fds[i]+1, fds[i+1])[](#l30.85)
+ +# +# Start a program with only specified fds kept open +# + +def spawnv_passfds(path, args, passfds):
- import _posixsubprocess, fcntl
- passfds = sorted(passfds)
- tmp = []
temporarily unset CLOEXEC on passed fds
- for fd in passfds:
flag = fcntl.fcntl(fd, fcntl.F_GETFD)[](#l30.97)
if flag & fcntl.FD_CLOEXEC:[](#l30.98)
fcntl.fcntl(fd, fcntl.F_SETFD, flag & ~fcntl.FD_CLOEXEC)[](#l30.99)
tmp.append((fd, flag))[](#l30.100)
- errpipe_read, errpipe_write = _posixsubprocess.cloexec_pipe()
- try:
return _posixsubprocess.fork_exec([](#l30.103)
args, [os.fsencode(path)], True, passfds, None, None,[](#l30.104)
-1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,[](#l30.105)
False, False, None)[](#l30.106)
- finally:
os.close(errpipe_read)[](#l30.108)
os.close(errpipe_write)[](#l30.109)
# reset CLOEXEC where necessary[](#l30.110)
for fd, flag in tmp:[](#l30.111)
fcntl.fcntl(fd, fcntl.F_SETFD, flag)[](#l30.112)
+ +# +# Return pipe with CLOEXEC set on fds +# + +def pipe():
rename from Lib/test/test_multiprocessing.py rename to Lib/test/_test_multiprocessing.py --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -43,7 +43,7 @@ from multiprocessing import util try: from multiprocessing import reduction
except ImportError: HAS_REDUCTION = False @@ -99,6 +99,9 @@ try: except: MAXFD = 256 +# To speed up tests when using the forkserver, we can preload these: +PRELOAD = ['main', 'test.test_multiprocessing_forkserver'] + #
Some tests require ctypes
# @@ -330,7 +333,6 @@ class _TestProcess(BaseTestCase): @classmethod def _test_recursion(cls, wconn, id):
from multiprocessing import forking[](#l31.28) wconn.send(id)[](#l31.29) if len(id) < 2:[](#l31.30) for i in range(2):[](#l31.31)
@@ -378,7 +380,7 @@ class _TestProcess(BaseTestCase): self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) event.set() p.join()
self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))[](#l31.36)
self.assertTrue(wait_for_handle(sentinel, timeout=1))[](#l31.37)
# # @@ -2493,7 +2495,7 @@ class _TestPicklingConnections(BaseTestC @classmethod def tearDownClass(cls):
from multiprocessing.reduction import resource_sharer[](#l31.45)
from multiprocessing import resource_sharer[](#l31.46) resource_sharer.stop(timeout=5)[](#l31.47)
@classmethod @@ -2807,30 +2809,40 @@ class _TestFinalize(BaseTestCase):
Test that from ... import * works for each module
# -class _TestImportStar(BaseTestCase): -
+class _TestImportStar(unittest.TestCase): +
- def get_module_names(self):
import glob[](#l31.60)
folder = os.path.dirname(multiprocessing.__file__)[](#l31.61)
pattern = os.path.join(folder, '*.py')[](#l31.62)
files = glob.glob(pattern)[](#l31.63)
modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files][](#l31.64)
modules = ['multiprocessing.' + m for m in modules][](#l31.65)
modules.remove('multiprocessing.__init__')[](#l31.66)
modules.append('multiprocessing')[](#l31.67)
return modules[](#l31.68)
modules = [[](#l31.71)
'multiprocessing', 'multiprocessing.connection',[](#l31.72)
'multiprocessing.heap', 'multiprocessing.managers',[](#l31.73)
'multiprocessing.pool', 'multiprocessing.process',[](#l31.74)
'multiprocessing.synchronize', 'multiprocessing.util'[](#l31.75)
][](#l31.76)
if HAS_REDUCTION:[](#l31.78)
modules.append('multiprocessing.reduction')[](#l31.79)
if c_int is not None:[](#l31.81)
modules = self.get_module_names()[](#l31.82)
if sys.platform == 'win32':[](#l31.83)
modules.remove('multiprocessing.popen_fork')[](#l31.84)
modules.remove('multiprocessing.popen_forkserver')[](#l31.85)
modules.remove('multiprocessing.popen_spawn_posix')[](#l31.86)
else:[](#l31.87)
modules.remove('multiprocessing.popen_spawn_win32')[](#l31.88)
if not HAS_REDUCTION:[](#l31.89)
modules.remove('multiprocessing.popen_forkserver')[](#l31.90)
if c_int is None:[](#l31.92) # This module requires _ctypes[](#l31.93)
modules.append('multiprocessing.sharedctypes')[](#l31.94)
modules.remove('multiprocessing.sharedctypes')[](#l31.95)
for name in modules: import(name) mod = sys.modules[name] -
for attr in getattr(mod, '__all__', ()):[](#l31.101)
self.assertTrue(hasattr(mod, '__all__'), name)[](#l31.102)
for attr in mod.__all__:[](#l31.104) self.assertTrue([](#l31.105) hasattr(mod, attr),[](#l31.106) '%r does not have attribute %r' % (mod, attr)[](#l31.107)
@@ -2953,131 +2965,6 @@ class TestInvalidHandle(unittest.TestCas self.assertRaises((ValueError, OSError), multiprocessing.connection.Connection, -1) -# -# Functions used to create test cases from the base ones in this module -# - -def create_test_cases(Mixin, type):
- result = {}
- glob = globals()
- Type = type.capitalize()
- ALL_TYPES = {'processes', 'threads', 'manager'}
- for name in list(glob.keys()):
if name.startswith('_Test'):[](#l31.123)
base = glob[name][](#l31.124)
assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)[](#l31.125)
if type in base.ALLOWED_TYPES:[](#l31.126)
newname = 'With' + Type + name[1:][](#l31.127)
class Temp(base, Mixin, unittest.TestCase):[](#l31.128)
pass[](#l31.129)
result[newname] = Temp[](#l31.130)
Temp.__name__ = Temp.__qualname__ = newname[](#l31.131)
Temp.__module__ = Mixin.__module__[](#l31.132)
- return result
- -# -# Create test cases -# - -class ProcessesMixin(object):
- TYPE = 'processes'
- Process = multiprocessing.Process
- connection = multiprocessing.connection
- current_process = staticmethod(multiprocessing.current_process)
- active_children = staticmethod(multiprocessing.active_children)
- Pool = staticmethod(multiprocessing.Pool)
- Pipe = staticmethod(multiprocessing.Pipe)
- Queue = staticmethod(multiprocessing.Queue)
- JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
- Lock = staticmethod(multiprocessing.Lock)
- RLock = staticmethod(multiprocessing.RLock)
- Semaphore = staticmethod(multiprocessing.Semaphore)
- BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
- Condition = staticmethod(multiprocessing.Condition)
- Event = staticmethod(multiprocessing.Event)
- Barrier = staticmethod(multiprocessing.Barrier)
- Value = staticmethod(multiprocessing.Value)
- Array = staticmethod(multiprocessing.Array)
- RawValue = staticmethod(multiprocessing.RawValue)
- RawArray = staticmethod(multiprocessing.RawArray)
- -testcases_processes = create_test_cases(ProcessesMixin, type='processes') -globals().update(testcases_processes) - - -class ManagerMixin(object):
- TYPE = 'manager'
- Process = multiprocessing.Process
- Queue = property(operator.attrgetter('manager.Queue'))
- JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
- Lock = property(operator.attrgetter('manager.Lock'))
- RLock = property(operator.attrgetter('manager.RLock'))
- Semaphore = property(operator.attrgetter('manager.Semaphore'))
- BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
- Condition = property(operator.attrgetter('manager.Condition'))
- Event = property(operator.attrgetter('manager.Event'))
- Barrier = property(operator.attrgetter('manager.Barrier'))
- Value = property(operator.attrgetter('manager.Value'))
- Array = property(operator.attrgetter('manager.Array'))
- list = property(operator.attrgetter('manager.list'))
- dict = property(operator.attrgetter('manager.dict'))
- Namespace = property(operator.attrgetter('manager.Namespace'))
- @classmethod
- def tearDownClass(cls):
# only the manager process should be returned by active_children()[](#l31.193)
# but this can take a bit on slow machines, so wait a few seconds[](#l31.194)
# if there are other children too (see #17395)[](#l31.195)
t = 0.01[](#l31.196)
while len(multiprocessing.active_children()) > 1 and t < 5:[](#l31.197)
time.sleep(t)[](#l31.198)
t *= 2[](#l31.199)
gc.collect() # do garbage collection[](#l31.200)
if cls.manager._number_of_objects() != 0:[](#l31.201)
# This is not really an error since some tests do not[](#l31.202)
# ensure that all processes which hold a reference to a[](#l31.203)
# managed object have been joined.[](#l31.204)
print('Shared objects which still exist at manager shutdown:')[](#l31.205)
print(cls.manager._debug_info())[](#l31.206)
cls.manager.shutdown()[](#l31.207)
cls.manager.join()[](#l31.208)
cls.manager = None[](#l31.209)
- -testcases_manager = create_test_cases(ManagerMixin, type='manager') -globals().update(testcases_manager) - - -class ThreadsMixin(object):
- TYPE = 'threads'
- Process = multiprocessing.dummy.Process
- connection = multiprocessing.dummy.connection
- current_process = staticmethod(multiprocessing.dummy.current_process)
- active_children = staticmethod(multiprocessing.dummy.active_children)
- Pool = staticmethod(multiprocessing.Pool)
- Pipe = staticmethod(multiprocessing.dummy.Pipe)
- Queue = staticmethod(multiprocessing.dummy.Queue)
- JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
- Lock = staticmethod(multiprocessing.dummy.Lock)
- RLock = staticmethod(multiprocessing.dummy.RLock)
- Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
- BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
- Condition = staticmethod(multiprocessing.dummy.Condition)
- Event = staticmethod(multiprocessing.dummy.Event)
- Barrier = staticmethod(multiprocessing.dummy.Barrier)
- Value = staticmethod(multiprocessing.dummy.Value)
- Array = staticmethod(multiprocessing.dummy.Array)
- -testcases_threads = create_test_cases(ThreadsMixin, type='threads') -globals().update(testcases_threads) class OtherTest(unittest.TestCase): @@ -3427,7 +3314,7 @@ class TestFlags(unittest.TestCase): def test_flags(self): import json, subprocess # start child process using unusual flags
prog = ('from test.test_multiprocessing import TestFlags; ' +[](#l31.244)
prog = ('from test._test_multiprocessing import TestFlags; ' +[](#l31.245) 'TestFlags.run_in_child()')[](#l31.246) data = subprocess.check_output([](#l31.247) [sys.executable, '-E', '-S', '-O', '-c', prog])[](#l31.248)
@@ -3474,13 +3361,14 @@ class TestTimeouts(unittest.TestCase): class TestNoForkBomb(unittest.TestCase): def test_noforkbomb(self):
sm = multiprocessing.get_start_method()[](#l31.253) name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')[](#l31.254)
if WIN32:[](#l31.255)
rc, out, err = test.script_helper.assert_python_failure(name)[](#l31.256)
if sm != 'fork':[](#l31.257)
rc, out, err = test.script_helper.assert_python_failure(name, sm)[](#l31.258) self.assertEqual('', out.decode('ascii'))[](#l31.259) self.assertIn('RuntimeError', err.decode('ascii'))[](#l31.260) else:[](#l31.261)
rc, out, err = test.script_helper.assert_python_ok(name)[](#l31.262)
rc, out, err = test.script_helper.assert_python_ok(name, sm)[](#l31.263) self.assertEqual('123', out.decode('ascii').rstrip())[](#l31.264) self.assertEqual('', err.decode('ascii'))[](#l31.265)
@@ -3514,6 +3402,72 @@ class TestForkAwareThreadLock(unittest.T self.assertLessEqual(new_size, old_size) # +# Check that non-forked child processes do not inherit unneeded fds/handles +# + +class TestCloseFds(unittest.TestCase): +
- def get_high_socket_fd(self):
if WIN32:[](#l31.277)
# The child process will not have any socket handles, so[](#l31.278)
# calling socket.fromfd() should produce WSAENOTSOCK even[](#l31.279)
# if there is a handle of the same number.[](#l31.280)
return socket.socket().detach()[](#l31.281)
else:[](#l31.282)
# We want to produce a socket with an fd high enough that a[](#l31.283)
# freshly created child process will not have any fds as high.[](#l31.284)
fd = socket.socket().detach()[](#l31.285)
to_close = [][](#l31.286)
while fd < 50:[](#l31.287)
to_close.append(fd)[](#l31.288)
fd = os.dup(fd)[](#l31.289)
for x in to_close:[](#l31.290)
os.close(x)[](#l31.291)
return fd[](#l31.292)
- def close(self, fd):
if WIN32:[](#l31.295)
socket.socket(fileno=fd).close()[](#l31.296)
else:[](#l31.297)
os.close(fd)[](#l31.298)
- @classmethod
- def _test_closefds(cls, conn, fd):
try:[](#l31.302)
s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)[](#l31.303)
except Exception as e:[](#l31.304)
conn.send(e)[](#l31.305)
else:[](#l31.306)
s.close()[](#l31.307)
conn.send(None)[](#l31.308)
- def test_closefd(self):
if not HAS_REDUCTION:[](#l31.311)
raise unittest.SkipTest('requires fd pickling')[](#l31.312)
reader, writer = multiprocessing.Pipe()[](#l31.314)
fd = self.get_high_socket_fd()[](#l31.315)
try:[](#l31.316)
p = multiprocessing.Process(target=self._test_closefds,[](#l31.317)
args=(writer, fd))[](#l31.318)
p.start()[](#l31.319)
writer.close()[](#l31.320)
e = reader.recv()[](#l31.321)
p.join(timeout=5)[](#l31.322)
finally:[](#l31.323)
self.close(fd)[](#l31.324)
writer.close()[](#l31.325)
reader.close()[](#l31.326)
if multiprocessing.get_start_method() == 'fork':[](#l31.328)
self.assertIs(e, None)[](#l31.329)
else:[](#l31.330)
WSAENOTSOCK = 10038[](#l31.331)
self.assertIsInstance(e, OSError)[](#l31.332)
self.assertTrue(e.errno == errno.EBADF or[](#l31.333)
e.winerror == WSAENOTSOCK, e)[](#l31.334)
Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
# @@ -3557,10 +3511,10 @@ class TestIgnoreEINTR(unittest.TestCase) def handler(signum, frame): pass signal.signal(signal.SIGUSR1, handler)
l = multiprocessing.connection.Listener()[](#l31.344)
conn.send(l.address)[](#l31.345)
a = l.accept()[](#l31.346)
a.send('welcome')[](#l31.347)
with multiprocessing.connection.Listener() as l:[](#l31.348)
conn.send(l.address)[](#l31.349)
a = l.accept()[](#l31.350)
a.send('welcome')[](#l31.351)
@unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') def test_ignore_listener(self): @@ -3581,26 +3535,221 @@ class TestIgnoreEINTR(unittest.TestCase) finally: conn.close() -# -# -# - -def setUpModule():
+class TestStartMethod(unittest.TestCase):
- def test_set_get(self):
multiprocessing.set_forkserver_preload(PRELOAD)[](#l31.367)
count = 0[](#l31.368)
old_method = multiprocessing.get_start_method()[](#l31.369) try:[](#l31.370)
lock = multiprocessing.RLock()[](#l31.371)
except OSError:[](#l31.372)
raise unittest.SkipTest("OSError raises on RLock creation, "[](#l31.373)
"see issue 3111!")[](#l31.374)
- check_enough_semaphores()
- util.get_temp_dir() # creates temp directory for use by all processes
- multiprocessing.get_logger().setLevel(LOG_LEVEL)
for method in ('fork', 'spawn', 'forkserver'):[](#l31.387)
try:[](#l31.388)
multiprocessing.set_start_method(method)[](#l31.389)
except ValueError:[](#l31.390)
continue[](#l31.391)
self.assertEqual(multiprocessing.get_start_method(), method)[](#l31.392)
count += 1[](#l31.393)
finally:[](#l31.394)
multiprocessing.set_start_method(old_method)[](#l31.395)
self.assertGreaterEqual(count, 1)[](#l31.396)
- def test_get_all(self):
methods = multiprocessing.get_all_start_methods()[](#l31.399)
if sys.platform == 'win32':[](#l31.400)
self.assertEqual(methods, ['spawn'])[](#l31.401)
else:[](#l31.402)
self.assertTrue(methods == ['fork', 'spawn'] or[](#l31.403)
methods == ['fork', 'spawn', 'forkserver'])[](#l31.404)
+ +# +# Check that killing process does not leak named semaphores +# + +@unittest.skipIf(sys.platform == "win32",
"test semantics don't make sense on Windows")[](#l31.411)
+class TestSemaphoreTracker(unittest.TestCase):
- def test_semaphore_tracker(self):
import subprocess[](#l31.414)
cmd = '''if 1:[](#l31.415)
import multiprocessing as mp, time, os[](#l31.416)
mp.set_start_method("spawn")[](#l31.417)
lock1 = mp.Lock()[](#l31.418)
lock2 = mp.Lock()[](#l31.419)
os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")[](#l31.420)
os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")[](#l31.421)
time.sleep(10)[](#l31.422)
'''[](#l31.423)
print("\nTestSemaphoreTracker will output warnings a bit like:\n"[](#l31.424)
" ... There appear to be 2 leaked semaphores"[](#l31.425)
" to clean up at shutdown\n"[](#l31.426)
" ... '/mp-03jgqz': [Errno 2] No such file or directory",[](#l31.427)
file=sys.stderr)[](#l31.428)
r, w = os.pipe()[](#l31.429)
p = subprocess.Popen([sys.executable,[](#l31.430)
#'-W', 'ignore:semaphore_tracker',[](#l31.431)
'-c', cmd % (w, w)],[](#l31.432)
pass_fds=[w])[](#l31.433)
os.close(w)[](#l31.434)
with open(r, 'rb', closefd=True) as f:[](#l31.435)
name1 = f.readline().rstrip().decode('ascii')[](#l31.436)
name2 = f.readline().rstrip().decode('ascii')[](#l31.437)
_multiprocessing.sem_unlink(name1)[](#l31.438)
p.terminate()[](#l31.439)
p.wait()[](#l31.440)
time.sleep(1.0)[](#l31.441)
with self.assertRaises(OSError) as ctx:[](#l31.442)
_multiprocessing.sem_unlink(name2)[](#l31.443)
# docs say it should be ENOENT, but OSX seems to give EINVAL[](#l31.444)
self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))[](#l31.445)
+ +# +# Mixins +# + +class ProcessesMixin(object):
- TYPE = 'processes'
- Process = multiprocessing.Process
- connection = multiprocessing.connection
- current_process = staticmethod(multiprocessing.current_process)
- active_children = staticmethod(multiprocessing.active_children)
- Pool = staticmethod(multiprocessing.Pool)
- Pipe = staticmethod(multiprocessing.Pipe)
- Queue = staticmethod(multiprocessing.Queue)
- JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
- Lock = staticmethod(multiprocessing.Lock)
- RLock = staticmethod(multiprocessing.RLock)
- Semaphore = staticmethod(multiprocessing.Semaphore)
- BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
- Condition = staticmethod(multiprocessing.Condition)
- Event = staticmethod(multiprocessing.Event)
- Barrier = staticmethod(multiprocessing.Barrier)
- Value = staticmethod(multiprocessing.Value)
- Array = staticmethod(multiprocessing.Array)
- RawValue = staticmethod(multiprocessing.RawValue)
- RawArray = staticmethod(multiprocessing.RawArray)
+ + +class ManagerMixin(object):
- TYPE = 'manager'
- Process = multiprocessing.Process
- Queue = property(operator.attrgetter('manager.Queue'))
- JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
- Lock = property(operator.attrgetter('manager.Lock'))
- RLock = property(operator.attrgetter('manager.RLock'))
- Semaphore = property(operator.attrgetter('manager.Semaphore'))
- BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
- Condition = property(operator.attrgetter('manager.Condition'))
- Event = property(operator.attrgetter('manager.Event'))
- Barrier = property(operator.attrgetter('manager.Barrier'))
- Value = property(operator.attrgetter('manager.Value'))
- Array = property(operator.attrgetter('manager.Array'))
- list = property(operator.attrgetter('manager.list'))
- dict = property(operator.attrgetter('manager.dict'))
- Namespace = property(operator.attrgetter('manager.Namespace'))
- @classmethod
- def tearDownClass(cls):
# only the manager process should be returned by active_children()[](#l31.502)
# but this can take a bit on slow machines, so wait a few seconds[](#l31.503)
# if there are other children too (see #17395)[](#l31.504)
t = 0.01[](#l31.505)
while len(multiprocessing.active_children()) > 1 and t < 5:[](#l31.506)
time.sleep(t)[](#l31.507)
t *= 2[](#l31.508)
gc.collect() # do garbage collection[](#l31.509)
if cls.manager._number_of_objects() != 0:[](#l31.510)
# This is not really an error since some tests do not[](#l31.511)
# ensure that all processes which hold a reference to a[](#l31.512)
# managed object have been joined.[](#l31.513)
print('Shared objects which still exist at manager shutdown:')[](#l31.514)
print(cls.manager._debug_info())[](#l31.515)
cls.manager.shutdown()[](#l31.516)
cls.manager.join()[](#l31.517)
cls.manager = None[](#l31.518)
+ + +class ThreadsMixin(object):
- TYPE = 'threads'
- Process = multiprocessing.dummy.Process
- connection = multiprocessing.dummy.connection
- current_process = staticmethod(multiprocessing.dummy.current_process)
- active_children = staticmethod(multiprocessing.dummy.active_children)
- Pool = staticmethod(multiprocessing.Pool)
- Pipe = staticmethod(multiprocessing.dummy.Pipe)
- Queue = staticmethod(multiprocessing.dummy.Queue)
- JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
- Lock = staticmethod(multiprocessing.dummy.Lock)
- RLock = staticmethod(multiprocessing.dummy.RLock)
- Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
- BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
- Condition = staticmethod(multiprocessing.dummy.Condition)
- Event = staticmethod(multiprocessing.dummy.Event)
- Barrier = staticmethod(multiprocessing.dummy.Barrier)
- Value = staticmethod(multiprocessing.dummy.Value)
- Array = staticmethod(multiprocessing.dummy.Array)
+ +# +# Functions used to create test cases from the base ones in this module +# + +def install_tests_in_module_dict(remote_globs, start_method):
- module = remote_globs['name']
- local_globs = globals()
- ALL_TYPES = {'processes', 'threads', 'manager'}
- for name, base in local_globs.items():
if not isinstance(base, type):[](#l31.551)
continue[](#l31.552)
if issubclass(base, BaseTestCase):[](#l31.553)
if base is BaseTestCase:[](#l31.554)
continue[](#l31.555)
assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES[](#l31.556)
for type_ in base.ALLOWED_TYPES:[](#l31.557)
newname = 'With' + type_.capitalize() + name[1:][](#l31.558)
Mixin = local_globs[type_.capitalize() + 'Mixin'][](#l31.559)
class Temp(base, Mixin, unittest.TestCase):[](#l31.560)
pass[](#l31.561)
Temp.__name__ = Temp.__qualname__ = newname[](#l31.562)
Temp.__module__ = __module__[](#l31.563)
remote_globs[newname] = Temp[](#l31.564)
elif issubclass(base, unittest.TestCase):[](#l31.565)
class Temp(base, object):[](#l31.566)
pass[](#l31.567)
Temp.__name__ = Temp.__qualname__ = name[](#l31.568)
Temp.__module__ = __module__[](#l31.569)
remote_globs[name] = Temp[](#l31.570)
- def setUpModule():
multiprocessing.set_forkserver_preload(PRELOAD)[](#l31.573)
remote_globs['old_start_method'] = multiprocessing.get_start_method()[](#l31.574)
try:[](#l31.575)
multiprocessing.set_start_method(start_method)[](#l31.576)
except ValueError:[](#l31.577)
raise unittest.SkipTest(start_method +[](#l31.578)
' start method not supported')[](#l31.579)
print('Using start method %r' % multiprocessing.get_start_method())[](#l31.580)
if sys.platform.startswith("linux"):[](#l31.582)
try:[](#l31.583)
lock = multiprocessing.RLock()[](#l31.584)
except OSError:[](#l31.585)
raise unittest.SkipTest("OSError raises on RLock creation, "[](#l31.586)
"see issue 3111!")[](#l31.587)
check_enough_semaphores()[](#l31.588)
util.get_temp_dir() # creates temp directory[](#l31.589)
multiprocessing.get_logger().setLevel(LOG_LEVEL)[](#l31.590)
- def tearDownModule():
multiprocessing.set_start_method(remote_globs['old_start_method'])[](#l31.593)
# pause a bit so we don't get warning about dangling threads/processes[](#l31.594)
time.sleep(0.5)[](#l31.595)
--- a/Lib/test/mp_fork_bomb.py +++ b/Lib/test/mp_fork_bomb.py @@ -7,6 +7,11 @@ def foo():
correctly on Windows. However, we should get a RuntimeError rather
than the Windows equivalent of a fork bomb.
+ p = multiprocessing.Process(target=foo) p.start() p.join()
--- a/Lib/test/regrtest.py +++ b/Lib/test/regrtest.py @@ -149,7 +149,7 @@ try: except ImportError: threading = None try:
except ImportError: multiprocessing = None
new file mode 100644 --- /dev/null +++ b/Lib/test/test_multiprocessing_fork.py @@ -0,0 +1,7 @@ +import unittest +import test._test_multiprocessing + +test._test_multiprocessing.install_tests_in_module_dict(globals(), 'fork') + +if name == 'main':
new file mode 100644 --- /dev/null +++ b/Lib/test/test_multiprocessing_forkserver.py @@ -0,0 +1,7 @@ +import unittest +import test._test_multiprocessing + +test._test_multiprocessing.install_tests_in_module_dict(globals(), 'forkserver') + +if name == 'main':
new file mode 100644 --- /dev/null +++ b/Lib/test/test_multiprocessing_spawn.py @@ -0,0 +1,7 @@ +import unittest +import test._test_multiprocessing + +test._test_multiprocessing.install_tests_in_module_dict(globals(), 'spawn') + +if name == 'main':
--- a/Makefile.pre.in +++ b/Makefile.pre.in @@ -938,7 +938,9 @@ buildbottest: all platform QUICKTESTOPTS= $(TESTOPTS) -x test_subprocess test_io test_lib2to3 [](#l37.5) test_multibytecodec test_urllib2_localnet test_itertools [](#l37.6) - test_multiprocessing test_mailbox test_socket test_poll [](#l37.7) + test_multiprocessing_fork test_multiprocessing_spawn [](#l37.8) + test_multiprocessing_forkserver [](#l37.9) + test_mailbox test_socket test_poll [](#l37.10) test_select test_zipfile test_concurrent_futures quicktest: all platform (TESTRUNNER)(TESTRUNNER) (TESTRUNNER)(QUICKTESTOPTS)
--- a/Modules/_multiprocessing/multiprocessing.c +++ b/Modules/_multiprocessing/multiprocessing.c @@ -126,6 +126,7 @@ static PyMethodDef module_methods[] = { {"recv", multiprocessing_recv, METH_VARARGS, ""}, {"send", multiprocessing_send, METH_VARARGS, ""}, #endif
--- a/Modules/_multiprocessing/multiprocessing.h +++ b/Modules/_multiprocessing/multiprocessing.h @@ -98,5 +98,6 @@ PyObject *_PyMp_SetError(PyObject *Type, */ extern PyTypeObject _PyMp_SemLockType; +extern PyObject *_PyMp_sem_unlink(PyObject *ignore, PyObject args); #endif / MULTIPROCESSING_H */
--- a/Modules/_multiprocessing/semaphore.c +++ b/Modules/_multiprocessing/semaphore.c @@ -18,6 +18,7 @@ typedef struct { int count; int maxvalue; int kind;
} SemLockObject; #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid) @@ -397,7 +398,8 @@ semlock_release(SemLockObject *self, PyO */ static PyObject * -newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue) +newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue,
char *name)[](#l40.17)
{ SemLockObject *self; @@ -409,21 +411,22 @@ newsemlockobject(PyTypeObject *type, SEM self->count = 0; self->last_tid = 0; self->maxvalue = maxvalue;
static PyObject * semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds) {
- char *name, *name_copy = NULL;
- static char *kwlist[] = {"kind", "value", "maxvalue", "name", "unlink",
NULL};[](#l40.41)
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "iiisi", kwlist,
&kind, &value, &maxvalue, &name, &unlink))[](#l40.46) return NULL;[](#l40.47)
if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) { @@ -431,18 +434,23 @@ semlock_new(PyTypeObject *type, PyObject return NULL; }
- if (!unlink) {
name_copy = PyMem_Malloc(strlen(name) + 1);[](#l40.56)
if (name_copy == NULL)[](#l40.57)
goto failure;[](#l40.58)
strcpy(name_copy, name);[](#l40.59)
- }
- handle = SEM_CREATE(name, value, maxvalue); /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */ if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0) goto failure;
@@ -451,6 +459,7 @@ semlock_new(PyTypeObject *type, PyObject failure: if (handle != SEM_FAILED) SEM_CLOSE(handle);
@@ -460,12 +469,30 @@ semlock_rebuild(PyTypeObject *type, PyOb { SEM_HANDLE handle; int kind, maxvalue;
- if (!PyArg_ParseTuple(args, F_SEM_HANDLE "iiz",
&handle, &kind, &maxvalue, &name))[](#l40.95) return NULL;[](#l40.96)
- if (name != NULL) {
name_copy = PyMem_Malloc(strlen(name) + 1);[](#l40.100)
if (name_copy == NULL)[](#l40.101)
return PyErr_NoMemory();[](#l40.102)
strcpy(name_copy, name);[](#l40.103)
- }
- if (name != NULL) {
handle = sem_open(name, 0);[](#l40.108)
if (handle == SEM_FAILED) {[](#l40.109)
PyMem_Free(name_copy);[](#l40.110)
return PyErr_SetFromErrno(PyExc_OSError);[](#l40.111)
}[](#l40.112)
- }
} static void @@ -473,6 +500,7 @@ semlock_dealloc(SemLockObject* self) { if (self->handle != SEM_FAILED) SEM_CLOSE(self->handle);
@@ -574,6 +602,8 @@ static PyMemberDef semlock_members[] = { ""}, {"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY, ""},
- {"name", T_STRING, offsetof(SemLockObject, name), READONLY,
{NULL} }; @@ -621,3 +651,23 @@ PyTypeObject _PyMp_SemLockType = { /* tp_alloc / 0, / tp_new / semlock_new, }; + +/""},[](#l40.133)
- */ + +PyObject * +_PyMp_sem_unlink(PyObject *ignore, PyObject *args) +{
- char *name;