cpython: 08d4c2fe51ea (original) (raw)
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -832,6 +832,10 @@ Connection objects are usually created u
raised and the complete message is available as e.args[0]
where e
is the exception instance.
- .. versionchanged:: 3.3
Connection objects themselves can now be transferred between processes[](#l1.8)
using :meth:`Connection.send` and :meth:`Connection.recv`.[](#l1.9)
--- a/Lib/multiprocessing/init.py +++ b/Lib/multiprocessing/init.py @@ -161,7 +161,9 @@ def allow_connection_pickling(): ''' Install support for sending connections and sockets between processes '''
This is undocumented. In previous versions of multiprocessing
its only effect was to make socket objects inheritable on Windows.
- import multiprocessing.connection
Definitions depending on native semaphores
--- a/Lib/multiprocessing/connection.py +++ b/Lib/multiprocessing/connection.py @@ -50,6 +50,7 @@ import _multiprocessing from multiprocessing import current_process, AuthenticationError, BufferTooShort from multiprocessing.util import ( get_temp_dir, Finalize, sub_debug, debug, _eintr_retry) +from multiprocessing.forking import ForkingPickler try: import _winapi from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE @@ -227,8 +228,9 @@ class _ConnectionBase: """Send a (picklable) object""" self._check_closed() self._check_writable()
buf = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)[](#l3.15)
self._send_bytes(memoryview(buf))[](#l3.16)
buf = io.BytesIO()[](#l3.17)
ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)[](#l3.18)
self._send_bytes(buf.getbuffer())[](#l3.19)
def recv_bytes(self, maxlength=None): """ @@ -880,3 +882,21 @@ else: raise if timeout is not None: timeout = deadline - time.time() + +# +# Make connection and socket objects sharable if possible +# + +if sys.platform == 'win32':
- from . import reduction
- ForkingPickler.register(socket.socket, reduction.reduce_socket)
- ForkingPickler.register(Connection, reduction.reduce_connection)
- ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection)
- try:
from . import reduction[](#l3.39)
- except ImportError:
pass[](#l3.41)
- else:
ForkingPickler.register(socket.socket, reduction.reduce_socket)[](#l3.43)
ForkingPickler.register(Connection, reduction.reduce_connection)[](#l3.44)
--- a/Lib/multiprocessing/forking.py +++ b/Lib/multiprocessing/forking.py @@ -407,25 +407,6 @@ else: return d
- def reduce_connection(conn):
if not Popen.thread_is_spawning():[](#l4.15)
raise RuntimeError([](#l4.16)
'By default %s objects can only be shared between processes\n'[](#l4.17)
'using inheritance' % type(conn).__name__[](#l4.18)
)[](#l4.19)
return type(conn), (Popen.duplicate_for_child(conn.fileno()),[](#l4.20)
conn.readable, conn.writable)[](#l4.21)
- ForkingPickler.register(Connection, reduce_connection)
- ForkingPickler.register(PipeConnection, reduce_connection)
Prepare current process
--- a/Lib/multiprocessing/reduction.py +++ b/Lib/multiprocessing/reduction.py @@ -33,7 +33,7 @@
SUCH DAMAGE.
# -all = [] +all = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle'] import os import sys @@ -42,9 +42,8 @@ import threading import struct from multiprocessing import current_process -from multiprocessing.forking import Popen, duplicate, close, ForkingPickler from multiprocessing.util import register_after_fork, debug, sub_debug -from multiprocessing.connection import Client, Listener, Connection +from multiprocessing.util import is_exiting, sub_warning # @@ -60,22 +59,91 @@ if not(sys.platform == 'win32' or (hasat # if sys.platform == 'win32':
Windows
- all += ['reduce_pipe_connection'] import _winapi def send_handle(conn, handle, destination_pid):
process_handle = _winapi.OpenProcess([](#l5.32)
_winapi.PROCESS_ALL_ACCESS, False, destination_pid[](#l5.33)
)[](#l5.34)
try:[](#l5.35)
new_handle = duplicate(handle, process_handle)[](#l5.36)
conn.send(new_handle)[](#l5.37)
finally:[](#l5.38)
close(process_handle)[](#l5.39)
dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)[](#l5.40)
conn.send(dh)[](#l5.41)
return conn.recv()[](#l5.44)
return conn.recv().detach()[](#l5.45)
- class DupHandle(object):
def __init__(self, handle, access, pid=None):[](#l5.48)
# duplicate handle for process with given pid[](#l5.49)
if pid is None:[](#l5.50)
pid = os.getpid()[](#l5.51)
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)[](#l5.52)
try:[](#l5.53)
self._handle = _winapi.DuplicateHandle([](#l5.54)
_winapi.GetCurrentProcess(),[](#l5.55)
handle, proc, access, False, 0)[](#l5.56)
finally:[](#l5.57)
_winapi.CloseHandle(proc)[](#l5.58)
self._access = access[](#l5.59)
self._pid = pid[](#l5.60)
def detach(self):[](#l5.62)
# retrieve handle from process which currently owns it[](#l5.63)
if self._pid == os.getpid():[](#l5.64)
return self._handle[](#l5.65)
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,[](#l5.66)
self._pid)[](#l5.67)
try:[](#l5.68)
return _winapi.DuplicateHandle([](#l5.69)
proc, self._handle, _winapi.GetCurrentProcess(),[](#l5.70)
self._access, False, _winapi.DUPLICATE_CLOSE_SOURCE)[](#l5.71)
finally:[](#l5.72)
_winapi.CloseHandle(proc)[](#l5.73)
- class DupSocket(object):
def __init__(self, sock):[](#l5.76)
new_sock = sock.dup()[](#l5.77)
def send(conn, pid):[](#l5.78)
share = new_sock.share(pid)[](#l5.79)
conn.send_bytes(share)[](#l5.80)
self._id = resource_sharer.register(send, new_sock.close)[](#l5.81)
def detach(self):[](#l5.83)
conn = resource_sharer.get_connection(self._id)[](#l5.84)
try:[](#l5.85)
share = conn.recv_bytes()[](#l5.86)
return socket.fromshare(share)[](#l5.87)
finally:[](#l5.88)
conn.close()[](#l5.89)
- def reduce_connection(conn):
handle = conn.fileno()[](#l5.98)
with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:[](#l5.99)
ds = DupSocket(s)[](#l5.100)
return rebuild_connection, (ds, conn.readable, conn.writable)[](#l5.101)
- def rebuild_connection(ds, readable, writable):
from .connection import Connection[](#l5.104)
sock = ds.detach()[](#l5.105)
return Connection(sock.detach(), readable, writable)[](#l5.106)
- def reduce_pipe_connection(conn):
access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |[](#l5.109)
(_winapi.FILE_GENERIC_WRITE if conn.writable else 0))[](#l5.110)
dh = DupHandle(conn.fileno(), access)[](#l5.111)
return rebuild_pipe_connection, (dh, conn.readable, conn.writable)[](#l5.112)
- def rebuild_pipe_connection(dh, readable, writable):
from .connection import PipeConnection[](#l5.115)
handle = dh.detach()[](#l5.116)
return PipeConnection(handle, readable, writable)[](#l5.117)
def send_handle(conn, handle, destination_pid): Unix with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s: s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
@@ -94,136 +162,109 @@ else: pass raise RuntimeError('Invalid data received')
- class DupFd(object):
def __init__(self, fd):[](#l5.129)
new_fd = os.dup(fd)[](#l5.130)
def send(conn, pid):[](#l5.131)
send_handle(conn, new_fd, pid)[](#l5.132)
def close():[](#l5.133)
os.close(new_fd)[](#l5.134)
self._id = resource_sharer.register(send, close)[](#l5.135)
def detach(self):[](#l5.137)
conn = resource_sharer.get_connection(self._id)[](#l5.138)
try:[](#l5.139)
return recv_handle(conn)[](#l5.140)
finally:[](#l5.141)
conn.close()[](#l5.142)
- def reduce_socket(s):
df = DupFd(s.fileno())[](#l5.145)
return rebuild_socket, (df, s.family, s.type, s.proto)[](#l5.146)
- def rebuild_socket(df, family, type, proto):
fd = df.detach()[](#l5.149)
s = socket.fromfd(fd, family, type, proto)[](#l5.150)
os.close(fd)[](#l5.151)
return s[](#l5.152)
- def reduce_connection(conn):
df = DupFd(conn.fileno())[](#l5.155)
return rebuild_connection, (df, conn.readable, conn.writable)[](#l5.156)
- def rebuild_connection(df, readable, writable):
from .connection import Connection[](#l5.159)
fd = df.detach()[](#l5.160)
return Connection(fd, readable, writable)[](#l5.161)
# -# Support for a per-process server thread which caches pickled handles -# - -_cache = set() - -def _reset(obj):
- global _lock, _listener, _cache
- for h in _cache:
close(h)[](#l5.172)
- _cache.clear()
- _lock = threading.Lock()
- _listener = None
- -_reset(None) -register_after_fork(_reset, _reset) - -def _get_listener():
- if _listener is None:
_lock.acquire()[](#l5.184)
try:[](#l5.185)
if _listener is None:[](#l5.186)
debug('starting listener and thread for sending handles')[](#l5.187)
_listener = Listener(authkey=current_process().authkey)[](#l5.188)
t = threading.Thread(target=_serve)[](#l5.189)
t.daemon = True[](#l5.190)
t.start()[](#l5.191)
finally:[](#l5.192)
_lock.release()[](#l5.193)
- while 1:
try:[](#l5.201)
conn = _listener.accept()[](#l5.202)
handle_wanted, destination_pid = conn.recv()[](#l5.203)
_cache.remove(handle_wanted)[](#l5.204)
send_handle(conn, handle_wanted, destination_pid)[](#l5.205)
close(handle_wanted)[](#l5.206)
conn.close()[](#l5.207)
except:[](#l5.208)
if not is_exiting():[](#l5.209)
import traceback[](#l5.210)
sub_warning([](#l5.211)
'thread for sharing handles raised exception :\n' +[](#l5.212)
'-'*79 + '\n' + traceback.format_exc() + '-'*79[](#l5.213)
)[](#l5.214)
- -# -# Functions to be used for pickling/unpickling objects with handles +# Server which shares registered resources with clients # -def reduce_handle(handle):
- if Popen.thread_is_spawning():
return (None, Popen.duplicate_for_child(handle), True)[](#l5.223)
- dup_handle = duplicate(handle)
- _cache.add(dup_handle)
- sub_debug('reducing handle %d', handle)
- return (_get_listener().address, dup_handle, False)
+class ResourceSharer(object):
- def init(self):
self._key = 0[](#l5.230)
self._cache = {}[](#l5.231)
self._old_locks = [][](#l5.232)
self._lock = threading.Lock()[](#l5.233)
self._listener = None[](#l5.234)
self._address = None[](#l5.235)
register_after_fork(self, ResourceSharer._afterfork)[](#l5.236)
-def rebuild_handle(pickled_data):
- address, handle, inherited = pickled_data
- if inherited:
return handle[](#l5.241)
- sub_debug('rebuilding handle %d', handle)
- conn = Client(address, authkey=current_process().authkey)
- conn.send((handle, os.getpid()))
- new_handle = recv_handle(conn)
- conn.close()
- return new_handle
- def register(self, send, close):
with self._lock:[](#l5.249)
if self._address is None:[](#l5.250)
self._start()[](#l5.251)
self._key += 1[](#l5.252)
self._cache[self._key] = (send, close)[](#l5.253)
return (self._address, self._key)[](#l5.254)
-#
-# Register Connection
with ForkingPickler
-#
-
-def reduce_connection(conn):
- -def rebuild_connection(reduced_handle, readable, writable):
- handle = rebuild_handle(reduced_handle)
- return Connection(
handle, readable=readable, writable=writable[](#l5.267)
)[](#l5.268)
- -ForkingPickler.register(Connection, reduce_connection)
- @staticmethod
- def get_connection(ident):
from .connection import Client[](#l5.273)
address, key = ident[](#l5.274)
c = Client(address, authkey=current_process().authkey)[](#l5.275)
c.send((key, os.getpid()))[](#l5.276)
return c[](#l5.277)
-#
-# Register socket.socket
with ForkingPickler
-#
-
-def fromfd(fd, family, type_, proto=0):
- s = socket.fromfd(fd, family, type_, proto)
- if s.class is not socket.socket:
s = socket.socket(_sock=s)[](#l5.286)
- return s
- reduced_handle = reduce_handle(s.fileno())
- return rebuild_socket, (reduced_handle, s.family, s.type, s.proto)
- -def rebuild_socket(reduced_handle, family, type_, proto):
- def _afterfork(self):
for key, (send, close) in self._cache.items():[](#l5.299)
close()[](#l5.300)
self._cache.clear()[](#l5.301)
# If self._lock was locked at the time of the fork, it may be broken[](#l5.302)
# -- see issue 6721. Replace it without letting it be gc'ed.[](#l5.303)
self._old_locks.append(self._lock)[](#l5.304)
self._lock = threading.Lock()[](#l5.305)
if self._listener is not None:[](#l5.306)
self._listener.close()[](#l5.307)
self._listener = None[](#l5.308)
self._address = None[](#l5.309)
-ForkingPickler.register(socket.socket, reduce_socket)
-
-#
-# Register _multiprocessing.PipeConnection
with ForkingPickler
-#
-
-if sys.platform == 'win32':
- def _start(self):
from .connection import Listener[](#l5.320)
assert self._listener is None[](#l5.321)
debug('starting listener and thread for sending handles')[](#l5.322)
self._listener = Listener(authkey=current_process().authkey)[](#l5.323)
self._address = self._listener.address[](#l5.324)
t = threading.Thread(target=self._serve)[](#l5.325)
t.daemon = True[](#l5.326)
t.start()[](#l5.327)
- def reduce_pipe_connection(conn):
rh = reduce_handle(conn.fileno())[](#l5.330)
return rebuild_pipe_connection, (rh, conn.readable, conn.writable)[](#l5.331)
- def _serve(self):
while 1:[](#l5.333)
try:[](#l5.334)
conn = self._listener.accept()[](#l5.335)
key, destination_pid = conn.recv()[](#l5.336)
send, close = self._cache.pop(key)[](#l5.337)
send(conn, destination_pid)[](#l5.338)
close()[](#l5.339)
conn.close()[](#l5.340)
except:[](#l5.341)
if not is_exiting():[](#l5.342)
import traceback[](#l5.343)
sub_warning([](#l5.344)
'thread for sharing handles raised exception :\n' +[](#l5.345)
'-'*79 + '\n' + traceback.format_exc() + '-'*79[](#l5.346)
)[](#l5.347)
- def rebuild_pipe_connection(reduced_handle, readable, writable):
handle = rebuild_handle(reduced_handle)[](#l5.350)
return PipeConnection([](#l5.351)
handle, readable=readable, writable=writable[](#l5.352)
)[](#l5.353)
+resource_sharer = ResourceSharer()
--- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -1959,49 +1959,49 @@ class _TestPoll(unittest.TestCase): #
Test of sending connection and socket objects between processes
# -""" + +@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") class _TestPicklingConnections(BaseTestCase): ALLOWED_TYPES = ('processes',)
l = self.connection.Listener(family=fam)[](#l6.18)
l = cls.connection.Listener(family=fam)[](#l6.19) conn.send(l.address)[](#l6.20) new_conn = l.accept()[](#l6.21) conn.send(new_conn)[](#l6.22)
if self.TYPE == 'processes':[](#l6.24)
l = socket.socket()[](#l6.25)
l.bind(('localhost', 0))[](#l6.26)
conn.send(l.getsockname())[](#l6.27)
l.listen(1)[](#l6.28)
new_conn, addr = l.accept()[](#l6.29)
conn.send(new_conn)[](#l6.30)
new_conn.close()[](#l6.31)
l.close()[](#l6.32)
l = socket.socket()[](#l6.34)
l.bind(('localhost', 0))[](#l6.35)
conn.send(l.getsockname())[](#l6.36)
l.listen(1)[](#l6.37)
new_conn, addr = l.accept()[](#l6.38)
conn.send(new_conn)[](#l6.39)
new_conn.close()[](#l6.40)
l.close()[](#l6.41)
client = self.connection.Client(address)[](#l6.49)
client = cls.connection.Client(address)[](#l6.50) client.send(msg.upper())[](#l6.51) client.close()[](#l6.52)
if self.TYPE == 'processes':[](#l6.54)
address, msg = conn.recv()[](#l6.55)
client = socket.socket()[](#l6.56)
client.connect(address)[](#l6.57)
client.sendall(msg.upper())[](#l6.58)
client.close()[](#l6.59)
address, msg = conn.recv()[](#l6.60)
client = socket.socket()[](#l6.61)
client.connect(address)[](#l6.62)
client.sendall(msg.upper())[](#l6.63)
client.close()[](#l6.64)
conn.close() def test_pickling(self):
try:[](#l6.69)
multiprocessing.allow_connection_pickling()[](#l6.70)
except ImportError:[](#l6.71)
return[](#l6.72)
- families = self.connection.families lconn, lconn0 = self.Pipe() @@ -2025,16 +2025,12 @@ class _TestPicklingConnections(BaseTestC rconn.send(None)
if self.TYPE == 'processes':[](#l6.81)
msg = latin('This connection uses a normal socket')[](#l6.82)
address = lconn.recv()[](#l6.83)
rconn.send((address, msg))[](#l6.84)
if hasattr(socket, 'fromfd'):[](#l6.85)
new_conn = lconn.recv()[](#l6.86)
self.assertEqual(new_conn.recv(100), msg.upper())[](#l6.87)
else:[](#l6.88)
# XXX On Windows with Py2.6 need to backport fromfd()[](#l6.89)
discard = lconn.recv_bytes()[](#l6.90)
msg = latin('This connection uses a normal socket')[](#l6.91)
address = lconn.recv()[](#l6.92)
rconn.send((address, msg))[](#l6.93)
new_conn = lconn.recv()[](#l6.94)
self.assertEqual(new_conn.recv(100), msg.upper())[](#l6.95)
new_conn.close()[](#l6.96)
lconn.send(None) @@ -2043,7 +2039,46 @@ class _TestPicklingConnections(BaseTestC lp.join() rp.join() -""" +
- @classmethod
- def child_access(cls, conn):
w = conn.recv()[](#l6.108)
w.send('all is well')[](#l6.109)
w.close()[](#l6.110)
r = conn.recv()[](#l6.112)
msg = r.recv()[](#l6.113)
conn.send(msg*2)[](#l6.114)
conn.close()[](#l6.116)
- def test_access(self):
# On Windows, if we do not specify a destination pid when[](#l6.119)
# using DupHandle then we need to be careful to use the[](#l6.120)
# correct access flags for DuplicateHandle(), or else[](#l6.121)
# DupHandle.detach() will raise PermissionError. For example,[](#l6.122)
# for a read only pipe handle we should use[](#l6.123)
# access=FILE_GENERIC_READ. (Unfortunately[](#l6.124)
# DUPLICATE_SAME_ACCESS does not work.)[](#l6.125)
conn, child_conn = self.Pipe()[](#l6.126)
p = self.Process(target=self.child_access, args=(child_conn,))[](#l6.127)
p.daemon = True[](#l6.128)
p.start()[](#l6.129)
child_conn.close()[](#l6.130)
r, w = self.Pipe(duplex=False)[](#l6.132)
conn.send(w)[](#l6.133)
w.close()[](#l6.134)
self.assertEqual(r.recv(), 'all is well')[](#l6.135)
r.close()[](#l6.136)
r, w = self.Pipe(duplex=False)[](#l6.138)
conn.send(r)[](#l6.139)
r.close()[](#l6.140)
w.send('foobar')[](#l6.141)
w.close()[](#l6.142)
self.assertEqual(conn.recv(), 'foobar'*2)[](#l6.143)
--- a/Misc/NEWS +++ b/Misc/NEWS @@ -71,6 +71,9 @@ Core and Builtins Library ------- +- Issue #4892: multiprocessing Connections can now be transferred over
- Issue #14160: TarFile.extractfile() failed to resolve symbolic links when the links were not located in an archive subdirectory.
--- a/Modules/_winapi.c +++ b/Modules/_winapi.c @@ -1280,6 +1280,7 @@ PyInit__winapi(void) WINAPI_CONSTANT(F_DWORD, CREATE_NEW_CONSOLE); WINAPI_CONSTANT(F_DWORD, CREATE_NEW_PROCESS_GROUP); WINAPI_CONSTANT(F_DWORD, DUPLICATE_SAME_ACCESS);
- WINAPI_CONSTANT(F_DWORD, DUPLICATE_CLOSE_SOURCE); WINAPI_CONSTANT(F_DWORD, ERROR_ALREADY_EXISTS); WINAPI_CONSTANT(F_DWORD, ERROR_BROKEN_PIPE); WINAPI_CONSTANT(F_DWORD, ERROR_IO_PENDING); @@ -1298,6 +1299,8 @@ PyInit__winapi(void) WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT); WINAPI_CONSTANT(F_DWORD, FILE_FLAG_FIRST_PIPE_INSTANCE); WINAPI_CONSTANT(F_DWORD, FILE_FLAG_OVERLAPPED);
- WINAPI_CONSTANT(F_DWORD, FILE_GENERIC_READ);
- WINAPI_CONSTANT(F_DWORD, FILE_GENERIC_WRITE); WINAPI_CONSTANT(F_DWORD, GENERIC_READ); WINAPI_CONSTANT(F_DWORD, GENERIC_WRITE); WINAPI_CONSTANT(F_DWORD, INFINITE); @@ -1310,6 +1313,7 @@ PyInit__winapi(void) WINAPI_CONSTANT(F_DWORD, PIPE_UNLIMITED_INSTANCES); WINAPI_CONSTANT(F_DWORD, PIPE_WAIT); WINAPI_CONSTANT(F_DWORD, PROCESS_ALL_ACCESS);
- WINAPI_CONSTANT(F_DWORD, PROCESS_DUP_HANDLE); WINAPI_CONSTANT(F_DWORD, STARTF_USESHOWWINDOW); WINAPI_CONSTANT(F_DWORD, STARTF_USESTDHANDLES); WINAPI_CONSTANT(F_DWORD, STD_INPUT_HANDLE);