cpython: c0f1f882737c (original) (raw)
Mercurial > cpython
changeset 98541:c0f1f882737c
Issue #23972: updates to asyncio datagram API. By Chris Laws. (Merge 3.5->3.6.) [#23972]
Guido van Rossum guido@dropbox.com | |
---|---|
date | Mon, 05 Oct 2015 09:29:32 -0700 |
parents | 5b9ffea7e7c3(current diff)ba956289fe66(diff) |
children | 07161dd8a078 |
files | Lib/test/test_asyncio/test_events.py Misc/ACKS Misc/NEWS |
diffstat | 7 files changed, 380 insertions(+), 69 deletions(-)[+] [-] Doc/library/asyncio-eventloop.rst 46 Lib/asyncio/base_events.py 164 Lib/asyncio/events.py 40 Lib/test/test_asyncio/test_base_events.py 140 Lib/test/test_asyncio/test_events.py 52 Misc/ACKS 1 Misc/NEWS 6 |
line wrap: on
line diff
--- a/Doc/library/asyncio-eventloop.rst
+++ b/Doc/library/asyncio-eventloop.rst
@@ -285,17 +285,50 @@ Creating connections
(:class:StreamReader
, :class:StreamWriter
) instead of a protocol.
-.. coroutinemethod:: BaseEventLoop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0)
+.. coroutinemethod:: BaseEventLoop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None)
Create datagram connection: socket family :py:data:~socket.AF_INET
or
:py:data:~socket.AF_INET6
depending on host (or family if specified),
- socket type :py:data:
~socket.SOCK_DGRAM
. protocol_factory must be a - callable returning a :ref:
protocol <asyncio-protocol>
instance. This method is a :ref:coroutine <coroutine>
which will try to establish the connection in the background. When successful, the coroutine returns a(transport, protocol)
pair.
- Options changing how the connection is created: +
to bind the socket to locally. The *local_host* and *local_port*[](#l1.24)
are looked up using :meth:`getaddrinfo`.[](#l1.25)
to connect the socket to a remote address. The *remote_host* and[](#l1.28)
*remote_port* are looked up using :meth:`getaddrinfo`.[](#l1.29)
and flags to be passed through to :meth:`getaddrinfo` for *host*[](#l1.32)
resolution. If given, these should all be integers from the[](#l1.33)
corresponding :mod:`socket` module constants.[](#l1.34)
TIME_WAIT state, without waiting for its natural timeout to[](#l1.37)
expire. If not specified will automatically be set to True on[](#l1.38)
UNIX.[](#l1.39)
same port as other existing endpoints are bound to, so long as they all[](#l1.42)
set this flag when being created. This option is not supported on Windows[](#l1.43)
and some UNIX's. If the :py:data:`~socket.SO_REUSEPORT` constant is not[](#l1.44)
defined then this capability is unsupported.[](#l1.45)
messages to the broadcast address.[](#l1.48)
already connected, :class:`socket.socket` object to be used by the[](#l1.51)
transport. If specified, *local_addr* and *remote_addr* should be omitted[](#l1.52)
(must be :const:`None`).[](#l1.53)
On Windows with :class:ProactorEventLoop
, this method is not supported.
@@ -322,7 +355,7 @@ Creating connections
Creating listening connections
------------------------------
-.. coroutinemethod:: BaseEventLoop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None)
+.. coroutinemethod:: BaseEventLoop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None)
Create a TCP server (socket type :data:~socket.SOCK_STREAM
) bound to
host and port.
@@ -361,6 +394,11 @@ Creating listening connections
expire. If not specified will automatically be set to True on
UNIX.
same port as other existing endpoints are bound to, so long as they all[](#l1.71)
set this flag when being created. This option is not supported on[](#l1.72)
Windows.[](#l1.73)
+
This method is a :ref:coroutine <coroutine>
.
.. versionchanged:: 3.5
--- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -700,75 +700,109 @@ class BaseEventLoop(events.AbstractEvent @coroutine def create_datagram_endpoint(self, protocol_factory, local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0):[](#l2.7)
family=0, proto=0, flags=0,[](#l2.8)
reuse_address=None, reuse_port=None,[](#l2.9)
allow_broadcast=None, sock=None):[](#l2.10) """Create datagram connection."""[](#l2.11)
if not (local_addr or remote_addr):[](#l2.12)
if family == 0:[](#l2.13)
raise ValueError('unexpected address family')[](#l2.14)
addr_pairs_info = (((family, proto), (None, None)),)[](#l2.15)
if sock is not None:[](#l2.16)
if (local_addr or remote_addr or[](#l2.17)
family or proto or flags or[](#l2.18)
reuse_address or reuse_port or allow_broadcast):[](#l2.19)
# show the problematic kwargs in exception msg[](#l2.20)
opts = dict(local_addr=local_addr, remote_addr=remote_addr,[](#l2.21)
family=family, proto=proto, flags=flags,[](#l2.22)
reuse_address=reuse_address, reuse_port=reuse_port,[](#l2.23)
allow_broadcast=allow_broadcast)[](#l2.24)
problems = ', '.join([](#l2.25)
'{}={}'.format(k, v) for k, v in opts.items() if v)[](#l2.26)
raise ValueError([](#l2.27)
'socket modifier keyword arguments can not be used '[](#l2.28)
'when sock is specified. ({})'.format(problems))[](#l2.29)
sock.setblocking(False)[](#l2.30)
r_addr = None[](#l2.31) else:[](#l2.32)
# join address by (family, protocol)[](#l2.33)
addr_infos = collections.OrderedDict()[](#l2.34)
for idx, addr in ((0, local_addr), (1, remote_addr)):[](#l2.35)
if addr is not None:[](#l2.36)
assert isinstance(addr, tuple) and len(addr) == 2, ([](#l2.37)
'2-tuple is expected')[](#l2.38)
if not (local_addr or remote_addr):[](#l2.39)
if family == 0:[](#l2.40)
raise ValueError('unexpected address family')[](#l2.41)
addr_pairs_info = (((family, proto), (None, None)),)[](#l2.42)
else:[](#l2.43)
# join address by (family, protocol)[](#l2.44)
addr_infos = collections.OrderedDict()[](#l2.45)
for idx, addr in ((0, local_addr), (1, remote_addr)):[](#l2.46)
if addr is not None:[](#l2.47)
assert isinstance(addr, tuple) and len(addr) == 2, ([](#l2.48)
'2-tuple is expected')[](#l2.49)
infos = yield from self.getaddrinfo([](#l2.51)
*addr, family=family, type=socket.SOCK_DGRAM,[](#l2.52)
proto=proto, flags=flags)[](#l2.53)
if not infos:[](#l2.54)
raise OSError('getaddrinfo() returned empty list')[](#l2.55)
infos = yield from self.getaddrinfo([](#l2.56)
*addr, family=family, type=socket.SOCK_DGRAM,[](#l2.57)
proto=proto, flags=flags)[](#l2.58)
if not infos:[](#l2.59)
raise OSError('getaddrinfo() returned empty list')[](#l2.60)
for fam, _, pro, _, address in infos:[](#l2.62)
key = (fam, pro)[](#l2.63)
if key not in addr_infos:[](#l2.64)
addr_infos[key] = [None, None][](#l2.65)
addr_infos[key][idx] = address[](#l2.66)
# each addr has to have info for each (family, proto) pair[](#l2.68)
addr_pairs_info = [[](#l2.69)
(key, addr_pair) for key, addr_pair in addr_infos.items()[](#l2.70)
if not ((local_addr and addr_pair[0] is None) or[](#l2.71)
(remote_addr and addr_pair[1] is None))][](#l2.72)
for fam, _, pro, _, address in infos:[](#l2.73)
key = (fam, pro)[](#l2.74)
if key not in addr_infos:[](#l2.75)
addr_infos[key] = [None, None][](#l2.76)
addr_infos[key][idx] = address[](#l2.77)
if not addr_pairs_info:[](#l2.79)
raise ValueError('can not get address information')[](#l2.80)
exceptions = [][](#l2.82)
# each addr has to have info for each (family, proto) pair[](#l2.83)
addr_pairs_info = [[](#l2.84)
(key, addr_pair) for key, addr_pair in addr_infos.items()[](#l2.85)
if not ((local_addr and addr_pair[0] is None) or[](#l2.86)
(remote_addr and addr_pair[1] is None))][](#l2.87)
for ((family, proto),[](#l2.89)
(local_address, remote_address)) in addr_pairs_info:[](#l2.90)
sock = None[](#l2.91)
r_addr = None[](#l2.92)
try:[](#l2.93)
sock = socket.socket([](#l2.94)
family=family, type=socket.SOCK_DGRAM, proto=proto)[](#l2.95)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)[](#l2.96)
sock.setblocking(False)[](#l2.97)
if not addr_pairs_info:[](#l2.98)
raise ValueError('can not get address information')[](#l2.99)
exceptions = [][](#l2.101)
if reuse_address is None:[](#l2.103)
reuse_address = os.name == 'posix' and sys.platform != 'cygwin'[](#l2.104)
if local_addr:[](#l2.106)
sock.bind(local_address)[](#l2.107)
if remote_addr:[](#l2.108)
yield from self.sock_connect(sock, remote_address)[](#l2.109)
r_addr = remote_address[](#l2.110)
except OSError as exc:[](#l2.111)
if sock is not None:[](#l2.112)
sock.close()[](#l2.113)
exceptions.append(exc)[](#l2.114)
except:[](#l2.115)
if sock is not None:[](#l2.116)
sock.close()[](#l2.117)
raise[](#l2.118)
for ((family, proto),[](#l2.119)
(local_address, remote_address)) in addr_pairs_info:[](#l2.120)
sock = None[](#l2.121)
r_addr = None[](#l2.122)
try:[](#l2.123)
sock = socket.socket([](#l2.124)
family=family, type=socket.SOCK_DGRAM, proto=proto)[](#l2.125)
if reuse_address:[](#l2.126)
sock.setsockopt([](#l2.127)
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)[](#l2.128)
if reuse_port:[](#l2.129)
if not hasattr(socket, 'SO_REUSEPORT'):[](#l2.130)
raise ValueError([](#l2.131)
'reuse_port not supported by socket module')[](#l2.132)
else:[](#l2.133)
sock.setsockopt([](#l2.134)
socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)[](#l2.135)
if allow_broadcast:[](#l2.136)
sock.setsockopt([](#l2.137)
socket.SOL_SOCKET, socket.SO_BROADCAST, 1)[](#l2.138)
sock.setblocking(False)[](#l2.139)
if local_addr:[](#l2.141)
sock.bind(local_address)[](#l2.142)
if remote_addr:[](#l2.143)
yield from self.sock_connect(sock, remote_address)[](#l2.144)
r_addr = remote_address[](#l2.145)
except OSError as exc:[](#l2.146)
if sock is not None:[](#l2.147)
sock.close()[](#l2.148)
exceptions.append(exc)[](#l2.149)
except:[](#l2.150)
if sock is not None:[](#l2.151)
sock.close()[](#l2.152)
raise[](#l2.153)
else:[](#l2.154)
break[](#l2.155) else:[](#l2.156)
break[](#l2.157)
else:[](#l2.158)
raise exceptions[0][](#l2.159)
raise exceptions[0][](#l2.160)
protocol = protocol_factory() waiter = futures.Future(loop=self)
transport = self._make_datagram_transport(sock, protocol, r_addr,[](#l2.164)
waiter)[](#l2.165)
transport = self._make_datagram_transport([](#l2.166)
sock, protocol, r_addr, waiter)[](#l2.167) if self._debug:[](#l2.168) if local_addr:[](#l2.169) logger.info("Datagram endpoint local_addr=%r remote_addr=%r "[](#l2.170)
@@ -804,7 +838,8 @@ class BaseEventLoop(events.AbstractEvent sock=None, backlog=100, ssl=None,
reuse_address=None):[](#l2.175)
reuse_address=None,[](#l2.176)
reuse_port=None):[](#l2.177) """Create a TCP server.[](#l2.178)
The host parameter can be a string, in that case the TCP server is bound @@ -857,8 +892,15 @@ class BaseEventLoop(events.AbstractEvent continue sockets.append(sock) if reuse_address:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,[](#l2.185)
True)[](#l2.186)
sock.setsockopt([](#l2.187)
socket.SOL_SOCKET, socket.SO_REUSEADDR, True)[](#l2.188)
if reuse_port:[](#l2.189)
if not hasattr(socket, 'SO_REUSEPORT'):[](#l2.190)
raise ValueError([](#l2.191)
'reuse_port not supported by socket module')[](#l2.192)
else:[](#l2.193)
sock.setsockopt([](#l2.194)
socket.SOL_SOCKET, socket.SO_REUSEPORT, True)[](#l2.195) # Disable IPv4/IPv6 dual stack support (enabled by[](#l2.196) # default on Linux) which makes a single socket[](#l2.197) # listen on both address families.[](#l2.198)
--- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -297,7 +297,8 @@ class AbstractEventLoop: def create_server(self, protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
sock=None, backlog=100, ssl=None, reuse_address=None):[](#l3.7)
sock=None, backlog=100, ssl=None, reuse_address=None,[](#l3.8)
reuse_port=None):[](#l3.9) """A coroutine which creates a TCP server bound to host and port.[](#l3.10)
The return value is a Server object which can be used to stop @@ -327,6 +328,11 @@ class AbstractEventLoop: TIME_WAIT state, without waiting for its natural timeout to expire. If not specified will automatically be set to True on UNIX. +
reuse_port tells the kernel to allow this endpoint to be bound to[](#l3.18)
the same port as other existing endpoints are bound to, so long as[](#l3.19)
they all set this flag when being created. This option is not[](#l3.20)
supported on Windows.[](#l3.21) """[](#l3.22) raise NotImplementedError[](#l3.23)
@@ -358,7 +364,37 @@ class AbstractEventLoop: def create_datagram_endpoint(self, protocol_factory, local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0):[](#l3.29)
family=0, proto=0, flags=0,[](#l3.30)
reuse_address=None, reuse_port=None,[](#l3.31)
allow_broadcast=None, sock=None):[](#l3.32)
"""A coroutine which creates a datagram endpoint.[](#l3.33)
This method will try to establish the endpoint in the background.[](#l3.35)
When successful, the coroutine returns a (transport, protocol) pair.[](#l3.36)
protocol_factory must be a callable returning a protocol instance.[](#l3.38)
socket family AF_INET or socket.AF_INET6 depending on host (or[](#l3.40)
family if specified), socket type SOCK_DGRAM.[](#l3.41)
reuse_address tells the kernel to reuse a local socket in[](#l3.43)
TIME_WAIT state, without waiting for its natural timeout to[](#l3.44)
expire. If not specified it will automatically be set to True on[](#l3.45)
UNIX.[](#l3.46)
reuse_port tells the kernel to allow this endpoint to be bound to[](#l3.48)
the same port as other existing endpoints are bound to, so long as[](#l3.49)
they all set this flag when being created. This option is not[](#l3.50)
supported on Windows and some UNIX's. If the[](#l3.51)
:py:data:`~socket.SO_REUSEPORT` constant is not defined then this[](#l3.52)
capability is unsupported.[](#l3.53)
allow_broadcast tells the kernel to allow this endpoint to send[](#l3.55)
messages to the broadcast address.[](#l3.56)
sock can optionally be specified in order to use a preexisting[](#l3.58)
socket object.[](#l3.59)
"""[](#l3.60) raise NotImplementedError[](#l3.61)
--- a/Lib/test/test_asyncio/test_base_events.py +++ b/Lib/test/test_asyncio/test_base_events.py @@ -3,6 +3,7 @@ import errno import logging import math +import os import socket import sys import threading @@ -790,11 +791,11 @@ class MyProto(asyncio.Protocol): class MyDatagramProto(asyncio.DatagramProtocol): done = None
- def init(self, create_future=False, loop=None): self.state = 'INITIAL' self.nbytes = 0 if create_future:
self.done = asyncio.Future()[](#l4.20)
self.done = asyncio.Future(loop=loop)[](#l4.21)
def connection_made(self, transport): self.transport = transport @@ -1100,6 +1101,19 @@ class BaseEventLoopWithSelectorTests(tes self.assertRaises(OSError, self.loop.run_until_complete, f) @mock.patch('asyncio.base_events.socket')
- def test_create_server_nosoreuseport(self, m_socket):
m_socket.getaddrinfo = socket.getaddrinfo[](#l4.30)
m_socket.SOCK_STREAM = socket.SOCK_STREAM[](#l4.31)
m_socket.SOL_SOCKET = socket.SOL_SOCKET[](#l4.32)
del m_socket.SO_REUSEPORT[](#l4.33)
m_socket.socket.return_value = mock.Mock()[](#l4.34)
f = self.loop.create_server([](#l4.36)
MyProto, '0.0.0.0', 0, reuse_port=True)[](#l4.37)
self.assertRaises(ValueError, self.loop.run_until_complete, f)[](#l4.39)
- @mock.patch('asyncio.base_events.socket') def test_create_server_cant_bind(self, m_socket): class Err(OSError):
@@ -1199,6 +1213,128 @@ class BaseEventLoopWithSelectorTests(tes self.assertRaises(Err, self.loop.run_until_complete, fut) self.assertTrue(m_sock.close.called)
- def test_create_datagram_endpoint_sock(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)[](#l4.50)
fut = self.loop.create_datagram_endpoint([](#l4.51)
lambda: MyDatagramProto(create_future=True, loop=self.loop),[](#l4.52)
sock=sock)[](#l4.53)
transport, protocol = self.loop.run_until_complete(fut)[](#l4.54)
transport.close()[](#l4.55)
self.loop.run_until_complete(protocol.done)[](#l4.56)
self.assertEqual('CLOSED', protocol.state)[](#l4.57)
- def test_create_datagram_endpoint_sock_sockopts(self):
fut = self.loop.create_datagram_endpoint([](#l4.60)
MyDatagramProto, local_addr=('127.0.0.1', 0), sock=object())[](#l4.61)
self.assertRaises(ValueError, self.loop.run_until_complete, fut)[](#l4.62)
fut = self.loop.create_datagram_endpoint([](#l4.64)
MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=object())[](#l4.65)
self.assertRaises(ValueError, self.loop.run_until_complete, fut)[](#l4.66)
fut = self.loop.create_datagram_endpoint([](#l4.68)
MyDatagramProto, family=1, sock=object())[](#l4.69)
self.assertRaises(ValueError, self.loop.run_until_complete, fut)[](#l4.70)
fut = self.loop.create_datagram_endpoint([](#l4.72)
MyDatagramProto, proto=1, sock=object())[](#l4.73)
self.assertRaises(ValueError, self.loop.run_until_complete, fut)[](#l4.74)
fut = self.loop.create_datagram_endpoint([](#l4.76)
MyDatagramProto, flags=1, sock=object())[](#l4.77)
self.assertRaises(ValueError, self.loop.run_until_complete, fut)[](#l4.78)
fut = self.loop.create_datagram_endpoint([](#l4.80)
MyDatagramProto, reuse_address=True, sock=object())[](#l4.81)
self.assertRaises(ValueError, self.loop.run_until_complete, fut)[](#l4.82)
fut = self.loop.create_datagram_endpoint([](#l4.84)
MyDatagramProto, reuse_port=True, sock=object())[](#l4.85)
self.assertRaises(ValueError, self.loop.run_until_complete, fut)[](#l4.86)
fut = self.loop.create_datagram_endpoint([](#l4.88)
MyDatagramProto, allow_broadcast=True, sock=object())[](#l4.89)
self.assertRaises(ValueError, self.loop.run_until_complete, fut)[](#l4.90)
- def test_create_datagram_endpoint_sockopts(self):
# Socket options should not be applied unless asked for.[](#l4.93)
# SO_REUSEADDR defaults to on for UNIX.[](#l4.94)
# SO_REUSEPORT is not available on all platforms.[](#l4.95)
coro = self.loop.create_datagram_endpoint([](#l4.97)
lambda: MyDatagramProto(create_future=True, loop=self.loop),[](#l4.98)
local_addr=('127.0.0.1', 0))[](#l4.99)
transport, protocol = self.loop.run_until_complete(coro)[](#l4.100)
sock = transport.get_extra_info('socket')[](#l4.101)
reuse_address_default_on = ([](#l4.103)
os.name == 'posix' and sys.platform != 'cygwin')[](#l4.104)
reuseport_supported = hasattr(socket, 'SO_REUSEPORT')[](#l4.105)
if reuse_address_default_on:[](#l4.107)
self.assertTrue([](#l4.108)
sock.getsockopt([](#l4.109)
socket.SOL_SOCKET, socket.SO_REUSEADDR))[](#l4.110)
else:[](#l4.111)
self.assertFalse([](#l4.112)
sock.getsockopt([](#l4.113)
socket.SOL_SOCKET, socket.SO_REUSEADDR))[](#l4.114)
if reuseport_supported:[](#l4.115)
self.assertFalse([](#l4.116)
sock.getsockopt([](#l4.117)
socket.SOL_SOCKET, socket.SO_REUSEPORT))[](#l4.118)
self.assertFalse([](#l4.119)
sock.getsockopt([](#l4.120)
socket.SOL_SOCKET, socket.SO_BROADCAST))[](#l4.121)
transport.close()[](#l4.123)
self.loop.run_until_complete(protocol.done)[](#l4.124)
self.assertEqual('CLOSED', protocol.state)[](#l4.125)
coro = self.loop.create_datagram_endpoint([](#l4.127)
lambda: MyDatagramProto(create_future=True, loop=self.loop),[](#l4.128)
local_addr=('127.0.0.1', 0),[](#l4.129)
reuse_address=True,[](#l4.130)
reuse_port=reuseport_supported,[](#l4.131)
allow_broadcast=True)[](#l4.132)
transport, protocol = self.loop.run_until_complete(coro)[](#l4.133)
sock = transport.get_extra_info('socket')[](#l4.134)
self.assertTrue([](#l4.136)
sock.getsockopt([](#l4.137)
socket.SOL_SOCKET, socket.SO_REUSEADDR))[](#l4.138)
if reuseport_supported:[](#l4.139)
self.assertTrue([](#l4.140)
sock.getsockopt([](#l4.141)
socket.SOL_SOCKET, socket.SO_REUSEPORT))[](#l4.142)
else:[](#l4.143)
self.assertFalse([](#l4.144)
sock.getsockopt([](#l4.145)
socket.SOL_SOCKET, socket.SO_REUSEPORT))[](#l4.146)
self.assertTrue([](#l4.147)
sock.getsockopt([](#l4.148)
socket.SOL_SOCKET, socket.SO_BROADCAST))[](#l4.149)
transport.close()[](#l4.151)
self.loop.run_until_complete(protocol.done)[](#l4.152)
self.assertEqual('CLOSED', protocol.state)[](#l4.153)
- @mock.patch('asyncio.base_events.socket')
- def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
m_socket.getaddrinfo = socket.getaddrinfo[](#l4.157)
m_socket.SOCK_DGRAM = socket.SOCK_DGRAM[](#l4.158)
m_socket.SOL_SOCKET = socket.SOL_SOCKET[](#l4.159)
del m_socket.SO_REUSEPORT[](#l4.160)
m_socket.socket.return_value = mock.Mock()[](#l4.161)
coro = self.loop.create_datagram_endpoint([](#l4.163)
lambda: MyDatagramProto(loop=self.loop),[](#l4.164)
local_addr=('127.0.0.1', 0),[](#l4.165)
reuse_address=False,[](#l4.166)
reuse_port=True)[](#l4.167)
self.assertRaises(ValueError, self.loop.run_until_complete, coro)[](#l4.169)
+ def test_accept_connection_retry(self): sock = mock.Mock() sock.accept.side_effect = BlockingIOError()
--- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -814,6 +814,32 @@ class EventLoopTestsMixin: # close server server.close()
- @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
- def test_create_server_reuse_port(self):
proto = MyProto(self.loop)[](#l5.9)
f = self.loop.create_server([](#l5.10)
lambda: proto, '0.0.0.0', 0)[](#l5.11)
server = self.loop.run_until_complete(f)[](#l5.12)
self.assertEqual(len(server.sockets), 1)[](#l5.13)
sock = server.sockets[0][](#l5.14)
self.assertFalse([](#l5.15)
sock.getsockopt([](#l5.16)
socket.SOL_SOCKET, socket.SO_REUSEPORT))[](#l5.17)
server.close()[](#l5.18)
test_utils.run_briefly(self.loop)[](#l5.20)
proto = MyProto(self.loop)[](#l5.22)
f = self.loop.create_server([](#l5.23)
lambda: proto, '0.0.0.0', 0, reuse_port=True)[](#l5.24)
server = self.loop.run_until_complete(f)[](#l5.25)
self.assertEqual(len(server.sockets), 1)[](#l5.26)
sock = server.sockets[0][](#l5.27)
self.assertTrue([](#l5.28)
sock.getsockopt([](#l5.29)
socket.SOL_SOCKET, socket.SO_REUSEPORT))[](#l5.30)
server.close()[](#l5.31)
+ def _make_unix_server(self, factory, **kwargs): path = test_utils.gen_unix_socket_path() self.addCleanup(lambda: os.path.exists(path) and os.unlink(path)) @@ -1264,6 +1290,32 @@ class EventLoopTestsMixin: self.assertEqual('CLOSED', client.state) server.transport.close()
- def test_create_datagram_endpoint_sock(self):
sock = None[](#l5.41)
local_address = ('127.0.0.1', 0)[](#l5.42)
infos = self.loop.run_until_complete([](#l5.43)
self.loop.getaddrinfo([](#l5.44)
*local_address, type=socket.SOCK_DGRAM))[](#l5.45)
for family, type, proto, cname, address in infos:[](#l5.46)
try:[](#l5.47)
sock = socket.socket(family=family, type=type, proto=proto)[](#l5.48)
sock.setblocking(False)[](#l5.49)
sock.bind(address)[](#l5.50)
except:[](#l5.51)
pass[](#l5.52)
else:[](#l5.53)
break[](#l5.54)
else:[](#l5.55)
assert False, 'Can not create socket.'[](#l5.56)
f = self.loop.create_connection([](#l5.58)
lambda: MyDatagramProto(loop=self.loop), sock=sock)[](#l5.59)
tr, pr = self.loop.run_until_complete(f)[](#l5.60)
self.assertIsInstance(tr, asyncio.Transport)[](#l5.61)
self.assertIsInstance(pr, MyDatagramProto)[](#l5.62)
tr.close()[](#l5.63)
self.loop.run_until_complete(pr.done)[](#l5.64)
+ def test_internal_fds(self): loop = self.create_event_loop() if not isinstance(loop, selector_events.BaseSelectorEventLoop):
--- a/Misc/ACKS +++ b/Misc/ACKS @@ -813,6 +813,7 @@ Simon Law Julia Lawall Chris Lawrence Mark Lawrence +Chris Laws Brian Leair Mathieu Leduc-Hamel Amandine Lee
--- a/Misc/NEWS +++ b/Misc/NEWS @@ -43,6 +43,12 @@ Core and Builtins Library ------- +- Issue #23972: Updates asyncio datagram create method allowing reuseport