cpython: 80e0040d910c (original) (raw)
--- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -340,6 +340,8 @@ class _SelectorTransport(transports.Tran max_size = 256 * 1024 # Buffer size passed to recv().
+ def init(self, loop, sock, protocol, extra, server=None): super().init(extra) self._extra['socket'] = sock @@ -354,7 +356,7 @@ class _SelectorTransport(transports.Tran self._sock_fd = sock.fileno() self._protocol = protocol self._server = server
self._buffer = collections.deque()[](#l1.16)
self._buffer = self._buffer_factory()[](#l1.17) self._conn_lost = 0 # Set when call to connection_lost scheduled.[](#l1.18) self._closing = False # Set when close() called.[](#l1.19) self._protocol_paused = False[](#l1.20)
@@ -433,12 +435,14 @@ class _SelectorTransport(transports.Tran high = 4*low if low is None: low = high // 4
assert 0 <= low <= high, repr((low, high))[](#l1.25)
if not high >= low >= 0:[](#l1.26)
raise ValueError('high (%r) must be >= low (%r) must be >= 0' %[](#l1.27)
(high, low))[](#l1.28) self._high_water = high[](#l1.29) self._low_water = low[](#l1.30)
def get_write_buffer_size(self):
return sum(len(data) for data in self._buffer)[](#l1.33)
return len(self._buffer)[](#l1.34)
class _SelectorSocketTransport(_SelectorTransport): @@ -455,13 +459,16 @@ class _SelectorSocketTransport(_Selector self._loop.call_soon(waiter.set_result, None) def pause_reading(self):
assert not self._closing, 'Cannot pause_reading() when closing'[](#l1.42)
assert not self._paused, 'Already paused'[](#l1.43)
if self._closing:[](#l1.44)
raise RuntimeError('Cannot pause_reading() when closing')[](#l1.45)
if self._paused:[](#l1.46)
raise RuntimeError('Already paused')[](#l1.47) self._paused = True[](#l1.48) self._loop.remove_reader(self._sock_fd)[](#l1.49)
assert self._paused, 'Not paused'[](#l1.52)
if not self._paused:[](#l1.53)
raise RuntimeError('Not paused')[](#l1.54) self._paused = False[](#l1.55) if self._closing:[](#l1.56) return[](#l1.57)
@@ -488,8 +495,11 @@ class _SelectorSocketTransport(_Selector self.close() def write(self, data):
assert isinstance(data, bytes), repr(type(data))[](#l1.62)
assert not self._eof, 'Cannot call write() after write_eof()'[](#l1.63)
if not isinstance(data, (bytes, bytearray, memoryview)):[](#l1.64)
raise TypeError('data argument must be byte-ish (%r)',[](#l1.65)
type(data))[](#l1.66)
if self._eof:[](#l1.67)
raise RuntimeError('Cannot call write() after write_eof()')[](#l1.68) if not data:[](#l1.69) return[](#l1.70)
@@ -516,25 +526,23 @@ class _SelectorSocketTransport(_Selector self._loop.add_writer(self._sock_fd, self._write_ready) # Add it to the buffer.
self._buffer.append(data)[](#l1.76)
self._buffer.extend(data)[](#l1.77) self._maybe_pause_protocol()[](#l1.78)
data = b''.join(self._buffer)[](#l1.81)
assert data, 'Data should not be empty'[](#l1.82)
assert self._buffer, 'Data should not be empty'[](#l1.83)
self._buffer.clear() # Optimistically; may have to put it back later.[](#l1.85) try:[](#l1.86)
n = self._sock.send(data)[](#l1.87)
n = self._sock.send(self._buffer)[](#l1.88) except (BlockingIOError, InterruptedError):[](#l1.89)
self._buffer.append(data) # Still need to write this.[](#l1.90)
pass[](#l1.91) except Exception as exc:[](#l1.92) self._loop.remove_writer(self._sock_fd)[](#l1.93)
self._buffer.clear()[](#l1.94) self._fatal_error(exc)[](#l1.95) else:[](#l1.96)
data = data[n:][](#l1.97)
if data:[](#l1.98)
self._buffer.append(data) # Still need to write this.[](#l1.99)
if n:[](#l1.100)
del self._buffer[:n][](#l1.101) self._maybe_resume_protocol() # May append to buffer.[](#l1.102) if not self._buffer:[](#l1.103) self._loop.remove_writer(self._sock_fd)[](#l1.104)
@@ -556,6 +564,8 @@ class _SelectorSocketTransport(_Selector class _SelectorSslTransport(_SelectorTransport):
+ def init(self, loop, rawsock, protocol, sslcontext, waiter=None, server_side=False, server_hostname=None, extra=None, server=None): @@ -661,13 +671,16 @@ class _SelectorSslTransport(_SelectorTra # accept more data for the buffer and eventually the app will # call resume_reading() again, and things will flow again.
assert not self._closing, 'Cannot pause_reading() when closing'[](#l1.118)
assert not self._paused, 'Already paused'[](#l1.119)
if self._closing:[](#l1.120)
raise RuntimeError('Cannot pause_reading() when closing')[](#l1.121)
if self._paused:[](#l1.122)
raise RuntimeError('Already paused')[](#l1.123) self._paused = True[](#l1.124) self._loop.remove_reader(self._sock_fd)[](#l1.125)
assert self._paused, 'Not paused'[](#l1.128)
if not self._paused:[](#l1.129)
raise ('Not paused')[](#l1.130) self._paused = False[](#l1.131) if self._closing:[](#l1.132) return[](#l1.133)
@@ -712,10 +725,8 @@ class _SelectorSslTransport(_SelectorTra self._loop.add_reader(self._sock_fd, self._read_ready) if self._buffer:
data = b''.join(self._buffer)[](#l1.138)
self._buffer.clear()[](#l1.139) try:[](#l1.140)
n = self._sock.send(data)[](#l1.141)
n = self._sock.send(self._buffer)[](#l1.142) except (BlockingIOError, InterruptedError,[](#l1.143) ssl.SSLWantWriteError):[](#l1.144) n = 0[](#l1.145)
@@ -725,11 +736,12 @@ class _SelectorSslTransport(_SelectorTra self._write_wants_read = True except Exception as exc: self._loop.remove_writer(self._sock_fd)
self._buffer.clear()[](#l1.150) self._fatal_error(exc)[](#l1.151) return[](#l1.152)
if n < len(data):[](#l1.154)
self._buffer.append(data[n:])[](#l1.155)
if n:[](#l1.156)
del self._buffer[:n][](#l1.157)
self._maybe_resume_protocol() # May append to buffer. @@ -739,7 +751,9 @@ class _SelectorSslTransport(_SelectorTra self._call_connection_lost(None) def write(self, data):
assert isinstance(data, bytes), repr(type(data))[](#l1.165)
if not isinstance(data, (bytes, bytearray, memoryview)):[](#l1.166)
raise TypeError('data argument must be byte-ish (%r)',[](#l1.167)
type(data))[](#l1.168) if not data:[](#l1.169) return[](#l1.170)
@@ -753,7 +767,7 @@ class _SelectorSslTransport(_SelectorTra self._loop.add_writer(self._sock_fd, self._write_ready) # Add it to the buffer.
self._buffer.append(data)[](#l1.176)
self._buffer.extend(data)[](#l1.177) self._maybe_pause_protocol()[](#l1.178)
def can_write_eof(self): @@ -762,6 +776,8 @@ class _SelectorSslTransport(_SelectorTra class _SelectorDatagramTransport(_SelectorTransport):
+ def init(self, loop, sock, protocol, address=None, extra=None): super().init(loop, sock, protocol, extra) self._address = address @@ -784,12 +800,15 @@ class _SelectorDatagramTransport(_Select self._protocol.datagram_received(data, addr) def sendto(self, data, addr=None):
assert isinstance(data, bytes), repr(type(data))[](#l1.194)
if not isinstance(data, (bytes, bytearray, memoryview)):[](#l1.195)
raise TypeError('data argument must be byte-ish (%r)',[](#l1.196)
type(data))[](#l1.197) if not data:[](#l1.198) return[](#l1.199)
if self._address:[](#l1.201)
assert addr in (None, self._address)[](#l1.202)
if self._address and addr not in (None, self._address):[](#l1.203)
raise ValueError('Invalid address: must be None or %s' %[](#l1.204)
(self._address,))[](#l1.205)
if self._conn_lost and self._address: if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES: @@ -814,7 +833,8 @@ class _SelectorDatagramTransport(_Select self._fatal_error(exc) return
self._buffer.append((data, addr))[](#l1.213)
# Ensure that what we buffer is immutable.[](#l1.214)
self._buffer.append((bytes(data), addr))[](#l1.215) self._maybe_pause_protocol()[](#l1.216)
--- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -32,6 +32,10 @@ class TestBaseSelectorEventLoop(BaseSele self._internal_fds += 1 +def list_to_buffer(l=()):
+ + class BaseSelectorEventLoopTests(unittest.TestCase): def setUp(self): @@ -613,7 +617,7 @@ class SelectorTransportTests(unittest.Te def test_close_write_buffer(self): tr = _SelectorTransport(self.loop, self.sock, self.protocol, None)
tr._buffer.append(b'data')[](#l2.18)
tr._buffer.extend(b'data')[](#l2.19) tr.close()[](#l2.20)
self.assertFalse(self.loop.readers) @@ -622,13 +626,13 @@ class SelectorTransportTests(unittest.Te def test_force_close(self): tr = _SelectorTransport(self.loop, self.sock, self.protocol, None)
tr._buffer.append(b'1')[](#l2.27)
tr._buffer.extend(b'1')[](#l2.28) self.loop.add_reader(7, unittest.mock.sentinel)[](#l2.29) self.loop.add_writer(7, unittest.mock.sentinel)[](#l2.30) tr._force_close(None)[](#l2.31)
self.assertEqual(tr._buffer, collections.deque())[](#l2.34)
self.assertEqual(tr._buffer, list_to_buffer())[](#l2.35) self.assertFalse(self.loop.readers)[](#l2.36) self.assertFalse(self.loop.writers)[](#l2.37)
@@ -783,21 +787,40 @@ class SelectorSocketTransportTests(unitt transport.write(data) self.sock.send.assert_called_with(data)
- def test_write_bytearray(self):
data = bytearray(b'data')[](#l2.44)
self.sock.send.return_value = len(data)[](#l2.45)
transport = _SelectorSocketTransport([](#l2.47)
self.loop, self.sock, self.protocol)[](#l2.48)
transport.write(data)[](#l2.49)
self.sock.send.assert_called_with(data)[](#l2.50)
self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated.[](#l2.51)
- def test_write_memoryview(self):
data = memoryview(b'data')[](#l2.54)
self.sock.send.return_value = len(data)[](#l2.55)
transport = _SelectorSocketTransport([](#l2.57)
self.loop, self.sock, self.protocol)[](#l2.58)
transport.write(data)[](#l2.59)
self.sock.send.assert_called_with(data)[](#l2.60)
+ def test_write_no_data(self): transport = _SelectorSocketTransport( self.loop, self.sock, self.protocol)
transport._buffer.append(b'data')[](#l2.65)
transport._buffer.extend(b'data')[](#l2.66) transport.write(b'')[](#l2.67) self.assertFalse(self.sock.send.called)[](#l2.68)
self.assertEqual(collections.deque([b'data']), transport._buffer)[](#l2.69)
self.assertEqual(list_to_buffer([b'data']), transport._buffer)[](#l2.70)
def test_write_buffer(self): transport = _SelectorSocketTransport( self.loop, self.sock, self.protocol)
transport._buffer.append(b'data1')[](#l2.75)
transport._buffer.extend(b'data1')[](#l2.76) transport.write(b'data2')[](#l2.77) self.assertFalse(self.sock.send.called)[](#l2.78)
self.assertEqual(collections.deque([b'data1', b'data2']),[](#l2.79)
self.assertEqual(list_to_buffer([b'data1', b'data2']),[](#l2.80) transport._buffer)[](#l2.81)
def test_write_partial(self): @@ -809,7 +832,30 @@ class SelectorSocketTransportTests(unitt transport.write(data) self.loop.assert_writer(7, transport._write_ready)
self.assertEqual(collections.deque([b'ta']), transport._buffer)[](#l2.88)
self.assertEqual(list_to_buffer([b'ta']), transport._buffer)[](#l2.89)
- def test_write_partial_bytearray(self):
data = bytearray(b'data')[](#l2.92)
self.sock.send.return_value = 2[](#l2.93)
transport = _SelectorSocketTransport([](#l2.95)
self.loop, self.sock, self.protocol)[](#l2.96)
transport.write(data)[](#l2.97)
self.loop.assert_writer(7, transport._write_ready)[](#l2.99)
self.assertEqual(list_to_buffer([b'ta']), transport._buffer)[](#l2.100)
self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated.[](#l2.101)
- def test_write_partial_memoryview(self):
data = memoryview(b'data')[](#l2.104)
self.sock.send.return_value = 2[](#l2.105)
transport = _SelectorSocketTransport([](#l2.107)
self.loop, self.sock, self.protocol)[](#l2.108)
transport.write(data)[](#l2.109)
self.loop.assert_writer(7, transport._write_ready)[](#l2.111)
self.assertEqual(list_to_buffer([b'ta']), transport._buffer)[](#l2.112)
def test_write_partial_none(self): data = b'data' @@ -821,7 +867,7 @@ class SelectorSocketTransportTests(unitt transport.write(data) self.loop.assert_writer(7, transport._write_ready)
self.assertEqual(collections.deque([b'data']), transport._buffer)[](#l2.120)
self.assertEqual(list_to_buffer([b'data']), transport._buffer)[](#l2.121)
def test_write_tryagain(self): self.sock.send.side_effect = BlockingIOError @@ -832,7 +878,7 @@ class SelectorSocketTransportTests(unitt transport.write(data) self.loop.assert_writer(7, transport._write_ready)
self.assertEqual(collections.deque([b'data']), transport._buffer)[](#l2.129)
self.assertEqual(list_to_buffer([b'data']), transport._buffer)[](#l2.130)
@unittest.mock.patch('asyncio.selector_events.logger') def test_write_exception(self, m_log): @@ -859,7 +905,7 @@ class SelectorSocketTransportTests(unitt def test_write_str(self): transport = _SelectorSocketTransport( self.loop, self.sock, self.protocol)
self.assertRaises(AssertionError, transport.write, 'str')[](#l2.138)
self.assertRaises(TypeError, transport.write, 'str')[](#l2.139)
def test_write_closing(self): transport = _SelectorSocketTransport( @@ -875,11 +921,10 @@ class SelectorSocketTransportTests(unitt transport = _SelectorSocketTransport( self.loop, self.sock, self.protocol)
transport._buffer.append(data)[](#l2.147)
transport._buffer.extend(data)[](#l2.148) self.loop.add_writer(7, transport._write_ready)[](#l2.149) transport._write_ready()[](#l2.150) self.assertTrue(self.sock.send.called)[](#l2.151)
self.assertEqual(self.sock.send.call_args[0], (data,))[](#l2.152) self.assertFalse(self.loop.writers)[](#l2.153)
def test_write_ready_closing(self): @@ -889,10 +934,10 @@ class SelectorSocketTransportTests(unitt transport = _SelectorSocketTransport( self.loop, self.sock, self.protocol) transport._closing = True
transport._buffer.append(data)[](#l2.160)
transport._buffer.extend(data)[](#l2.161) self.loop.add_writer(7, transport._write_ready)[](#l2.162) transport._write_ready()[](#l2.163)
self.sock.send.assert_called_with(data)[](#l2.164)
self.assertTrue(self.sock.send.called)[](#l2.165) self.assertFalse(self.loop.writers)[](#l2.166) self.sock.close.assert_called_with()[](#l2.167) self.protocol.connection_lost.assert_called_with(None)[](#l2.168)
@@ -900,6 +945,7 @@ class SelectorSocketTransportTests(unitt def test_write_ready_no_data(self): transport = _SelectorSocketTransport( self.loop, self.sock, self.protocol)
# This is an internal error.[](#l2.173) self.assertRaises(AssertionError, transport._write_ready)[](#l2.174)
def test_write_ready_partial(self): @@ -908,11 +954,11 @@ class SelectorSocketTransportTests(unitt transport = _SelectorSocketTransport( self.loop, self.sock, self.protocol)
transport._buffer.append(data)[](#l2.181)
transport._buffer.extend(data)[](#l2.182) self.loop.add_writer(7, transport._write_ready)[](#l2.183) transport._write_ready()[](#l2.184) self.loop.assert_writer(7, transport._write_ready)[](#l2.185)
self.assertEqual(collections.deque([b'ta']), transport._buffer)[](#l2.186)
self.assertEqual(list_to_buffer([b'ta']), transport._buffer)[](#l2.187)
def test_write_ready_partial_none(self): data = b'data' @@ -920,23 +966,23 @@ class SelectorSocketTransportTests(unitt transport = _SelectorSocketTransport( self.loop, self.sock, self.protocol)
transport._buffer.append(data)[](#l2.195)
transport._buffer.extend(data)[](#l2.196) self.loop.add_writer(7, transport._write_ready)[](#l2.197) transport._write_ready()[](#l2.198) self.loop.assert_writer(7, transport._write_ready)[](#l2.199)
self.assertEqual(collections.deque([b'data']), transport._buffer)[](#l2.200)
self.assertEqual(list_to_buffer([b'data']), transport._buffer)[](#l2.201)
def test_write_ready_tryagain(self): self.sock.send.side_effect = BlockingIOError transport = _SelectorSocketTransport( self.loop, self.sock, self.protocol)
transport._buffer = collections.deque([b'data1', b'data2'])[](#l2.208)
transport._buffer = list_to_buffer([b'data1', b'data2'])[](#l2.209) self.loop.add_writer(7, transport._write_ready)[](#l2.210) transport._write_ready()[](#l2.211)
self.loop.assert_writer(7, transport._write_ready)
self.assertEqual(collections.deque([b'data1data2']), transport._buffer)[](#l2.214)
self.assertEqual(list_to_buffer([b'data1data2']), transport._buffer)[](#l2.215)
def test_write_ready_exception(self): err = self.sock.send.side_effect = OSError() @@ -944,7 +990,7 @@ class SelectorSocketTransportTests(unitt transport = _SelectorSocketTransport( self.loop, self.sock, self.protocol) transport._fatal_error = unittest.mock.Mock()
transport._buffer.append(b'data')[](#l2.223)
transport._buffer.extend(b'data')[](#l2.224) transport._write_ready()[](#l2.225) transport._fatal_error.assert_called_with(err)[](#l2.226)
@@ -956,7 +1002,7 @@ class SelectorSocketTransportTests(unitt transport = _SelectorSocketTransport( self.loop, self.sock, self.protocol) transport.close()
transport._buffer.append(b'data')[](#l2.232)
transport._buffer.extend(b'data')[](#l2.233) transport._write_ready()[](#l2.234) remove_writer.assert_called_with(self.sock_fd)[](#l2.235)
@@ -976,12 +1022,12 @@ class SelectorSocketTransportTests(unitt self.sock.send.side_effect = BlockingIOError tr.write(b'data') tr.write_eof()
self.assertEqual(tr._buffer, collections.deque([b'data']))[](#l2.241)
self.assertEqual(tr._buffer, list_to_buffer([b'data']))[](#l2.242) self.assertTrue(tr._eof)[](#l2.243) self.assertFalse(self.sock.shutdown.called)[](#l2.244) self.sock.send.side_effect = lambda _: 4[](#l2.245) tr._write_ready()[](#l2.246)
self.sock.send.assert_called_with(b'data')[](#l2.247)
self.assertTrue(self.sock.send.called)[](#l2.248) self.sock.shutdown.assert_called_with(socket.SHUT_WR)[](#l2.249) tr.close()[](#l2.250)
@@ -1065,15 +1111,34 @@ class SelectorSslTransportTests(unittest self.assertFalse(tr._paused) self.loop.assert_reader(1, tr._read_ready)
- def test_write(self):
transport = self._make_one()[](#l2.257)
transport.write(b'data')[](#l2.258)
self.assertEqual(list_to_buffer([b'data']), transport._buffer)[](#l2.259)
- def test_write_bytearray(self):
transport = self._make_one()[](#l2.262)
data = bytearray(b'data')[](#l2.263)
transport.write(data)[](#l2.264)
self.assertEqual(list_to_buffer([b'data']), transport._buffer)[](#l2.265)
self.assertEqual(data, bytearray(b'data')) # Hasn't been mutated.[](#l2.266)
self.assertIsNot(data, transport._buffer) # Hasn't been incorporated.[](#l2.267)
- def test_write_memoryview(self):
transport = self._make_one()[](#l2.270)
data = memoryview(b'data')[](#l2.271)
transport.write(data)[](#l2.272)
self.assertEqual(list_to_buffer([b'data']), transport._buffer)[](#l2.273)
+ def test_write_no_data(self): transport = self._make_one()
transport._buffer.append(b'data')[](#l2.277)
transport._buffer.extend(b'data')[](#l2.278) transport.write(b'')[](#l2.279)
self.assertEqual(collections.deque([b'data']), transport._buffer)[](#l2.280)
self.assertEqual(list_to_buffer([b'data']), transport._buffer)[](#l2.281)
def test_write_str(self): transport = self._make_one()
self.assertRaises(AssertionError, transport.write, 'str')[](#l2.285)
self.assertRaises(TypeError, transport.write, 'str')[](#l2.286)
def test_write_closing(self): transport = self._make_one() @@ -1087,7 +1152,7 @@ class SelectorSslTransportTests(unittest transport = self._make_one() transport._conn_lost = 1 transport.write(b'data')
self.assertEqual(transport._buffer, collections.deque())[](#l2.294)
self.assertEqual(transport._buffer, list_to_buffer())[](#l2.295) transport.write(b'data')[](#l2.296) transport.write(b'data')[](#l2.297) transport.write(b'data')[](#l2.298)
@@ -1107,7 +1172,7 @@ class SelectorSslTransportTests(unittest transport = self._make_one() transport._write_wants_read = True transport._write_ready = unittest.mock.Mock()
transport._buffer.append(b'data')[](#l2.303)
transport._buffer.extend(b'data')[](#l2.304) transport._read_ready()[](#l2.305)
self.assertFalse(transport._write_wants_read) @@ -1168,31 +1233,31 @@ class SelectorSslTransportTests(unittest def test_write_ready_send(self): self.sslsock.send.return_value = 4 transport = self._make_one()
transport._buffer = collections.deque([b'data'])[](#l2.312)
transport._buffer = list_to_buffer([b'data'])[](#l2.313) transport._write_ready()[](#l2.314)
self.assertEqual(collections.deque(), transport._buffer)[](#l2.315)
self.assertEqual(list_to_buffer(), transport._buffer)[](#l2.316) self.assertTrue(self.sslsock.send.called)[](#l2.317)
def test_write_ready_send_none(self): self.sslsock.send.return_value = 0 transport = self._make_one()
transport._buffer = collections.deque([b'data1', b'data2'])[](#l2.322)
transport._buffer = list_to_buffer([b'data1', b'data2'])[](#l2.323) transport._write_ready()[](#l2.324) self.assertTrue(self.sslsock.send.called)[](#l2.325)
self.assertEqual(collections.deque([b'data1data2']), transport._buffer)[](#l2.326)
self.assertEqual(list_to_buffer([b'data1data2']), transport._buffer)[](#l2.327)
def test_write_ready_send_partial(self): self.sslsock.send.return_value = 2 transport = self._make_one()
transport._buffer = collections.deque([b'data1', b'data2'])[](#l2.332)
transport._buffer = list_to_buffer([b'data1', b'data2'])[](#l2.333) transport._write_ready()[](#l2.334) self.assertTrue(self.sslsock.send.called)[](#l2.335)
self.assertEqual(collections.deque([b'ta1data2']), transport._buffer)[](#l2.336)
self.assertEqual(list_to_buffer([b'ta1data2']), transport._buffer)[](#l2.337)
def test_write_ready_send_closing_partial(self): self.sslsock.send.return_value = 2 transport = self._make_one()
transport._buffer = collections.deque([b'data1', b'data2'])[](#l2.342)
transport._buffer = list_to_buffer([b'data1', b'data2'])[](#l2.343) transport._write_ready()[](#l2.344) self.assertTrue(self.sslsock.send.called)[](#l2.345) self.assertFalse(self.sslsock.close.called)[](#l2.346)
@@ -1201,7 +1266,7 @@ class SelectorSslTransportTests(unittest self.sslsock.send.return_value = 4 transport = self._make_one() transport.close()
transport._buffer = collections.deque([b'data'])[](#l2.351)
transport._buffer = list_to_buffer([b'data'])[](#l2.352) transport._write_ready()[](#l2.353) self.assertFalse(self.loop.writers)[](#l2.354) self.protocol.connection_lost.assert_called_with(None)[](#l2.355)
@@ -1210,26 +1275,26 @@ class SelectorSslTransportTests(unittest self.sslsock.send.return_value = 4 transport = self._make_one() transport.close()
transport._buffer = collections.deque()[](#l2.360)
transport._buffer = list_to_buffer()[](#l2.361) transport._write_ready()[](#l2.362) self.assertFalse(self.loop.writers)[](#l2.363) self.protocol.connection_lost.assert_called_with(None)[](#l2.364)
def test_write_ready_send_retry(self): transport = self._make_one()
transport._buffer = collections.deque([b'data'])[](#l2.368)
transport._buffer = list_to_buffer([b'data'])[](#l2.369)
self.sslsock.send.side_effect = ssl.SSLWantWriteError transport._write_ready()
self.assertEqual(collections.deque([b'data']), transport._buffer)[](#l2.373)
self.assertEqual(list_to_buffer([b'data']), transport._buffer)[](#l2.374)
self.sslsock.send.side_effect = BlockingIOError() transport._write_ready()
self.assertEqual(collections.deque([b'data']), transport._buffer)[](#l2.378)
self.assertEqual(list_to_buffer([b'data']), transport._buffer)[](#l2.379)
def test_write_ready_send_read(self): transport = self._make_one()
transport._buffer = collections.deque([b'data'])[](#l2.383)
transport._buffer = list_to_buffer([b'data'])[](#l2.384)
self.loop.remove_writer = unittest.mock.Mock() self.sslsock.send.side_effect = ssl.SSLWantReadError @@ -1242,11 +1307,11 @@ class SelectorSslTransportTests(unittest err = self.sslsock.send.side_effect = OSError() transport = self._make_one()
transport._buffer = collections.deque([b'data'])[](#l2.392)
transport._buffer = list_to_buffer([b'data'])[](#l2.393) transport._fatal_error = unittest.mock.Mock()[](#l2.394) transport._write_ready()[](#l2.395) transport._fatal_error.assert_called_with(err)[](#l2.396)
self.assertEqual(collections.deque(), transport._buffer)[](#l2.397)
self.assertEqual(list_to_buffer(), transport._buffer)[](#l2.398)
def test_write_ready_read_wants_write(self): self.loop.add_reader = unittest.mock.Mock() @@ -1355,6 +1420,24 @@ class SelectorDatagramTransportTests(uni self.assertEqual( self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))
- def test_sendto_bytearray(self):
data = bytearray(b'data')[](#l2.407)
transport = _SelectorDatagramTransport([](#l2.408)
self.loop, self.sock, self.protocol)[](#l2.409)
transport.sendto(data, ('0.0.0.0', 1234))[](#l2.410)
self.assertTrue(self.sock.sendto.called)[](#l2.411)
self.assertEqual([](#l2.412)
self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))[](#l2.413)
- def test_sendto_memoryview(self):
data = memoryview(b'data')[](#l2.416)
transport = _SelectorDatagramTransport([](#l2.417)
self.loop, self.sock, self.protocol)[](#l2.418)
transport.sendto(data, ('0.0.0.0', 1234))[](#l2.419)
self.assertTrue(self.sock.sendto.called)[](#l2.420)
self.assertEqual([](#l2.421)
self.sock.sendto.call_args[0], (data, ('0.0.0.0', 1234)))[](#l2.422)
+ def test_sendto_no_data(self): transport = _SelectorDatagramTransport( self.loop, self.sock, self.protocol) @@ -1375,6 +1458,32 @@ class SelectorDatagramTransportTests(uni (b'data2', ('0.0.0.0', 12345))], list(transport._buffer))
- def test_sendto_buffer_bytearray(self):
data2 = bytearray(b'data2')[](#l2.432)
transport = _SelectorDatagramTransport([](#l2.433)
self.loop, self.sock, self.protocol)[](#l2.434)
transport._buffer.append((b'data1', ('0.0.0.0', 12345)))[](#l2.435)
transport.sendto(data2, ('0.0.0.0', 12345))[](#l2.436)
self.assertFalse(self.sock.sendto.called)[](#l2.437)
self.assertEqual([](#l2.438)
[(b'data1', ('0.0.0.0', 12345)),[](#l2.439)
(b'data2', ('0.0.0.0', 12345))],[](#l2.440)
list(transport._buffer))[](#l2.441)
self.assertIsInstance(transport._buffer[1][0], bytes)[](#l2.442)
- def test_sendto_buffer_memoryview(self):
data2 = memoryview(b'data2')[](#l2.445)
transport = _SelectorDatagramTransport([](#l2.446)
self.loop, self.sock, self.protocol)[](#l2.447)
transport._buffer.append((b'data1', ('0.0.0.0', 12345)))[](#l2.448)
transport.sendto(data2, ('0.0.0.0', 12345))[](#l2.449)
self.assertFalse(self.sock.sendto.called)[](#l2.450)
self.assertEqual([](#l2.451)
[(b'data1', ('0.0.0.0', 12345)),[](#l2.452)
(b'data2', ('0.0.0.0', 12345))],[](#l2.453)
list(transport._buffer))[](#l2.454)
self.assertIsInstance(transport._buffer[1][0], bytes)[](#l2.455)
+ def test_sendto_tryagain(self): data = b'data' @@ -1439,13 +1548,13 @@ class SelectorDatagramTransportTests(uni def test_sendto_str(self): transport = _SelectorDatagramTransport( self.loop, self.sock, self.protocol)
self.assertRaises(AssertionError, transport.sendto, 'str', ())[](#l2.464)
self.assertRaises(TypeError, transport.sendto, 'str', ())[](#l2.465)
def test_sendto_connected_addr(self): transport = _SelectorDatagramTransport( self.loop, self.sock, self.protocol, ('0.0.0.0', 1)) self.assertRaises(
AssertionError, transport.sendto, b'str', ('0.0.0.0', 2))[](#l2.471)
ValueError, transport.sendto, b'str', ('0.0.0.0', 2))[](#l2.472)
def test_sendto_closing(self): transport = _SelectorDatagramTransport(