High-performance Asyncio networking: sockets vs streams vs protocols (original) (raw)
December 5, 2024, 8:38pm 1
I’ve working on writing high-performance asyncio network code and after having tried all three of the available APIs, I’m wondering if I’m writing my code incorrectly.
The official docs suggest that working with socket objects directly is more convenient at the expense of performance, but my testing shows the opposite.
A simple benchmark using this server:
import socket
def socket_server():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('127.0.0.1', 1234))
s.listen()
while True:
conn, _= s.accept()
bytes_read = 0
mv = memoryview(bytearray(MESSAGE_SIZE))
while bytes_read < MESSAGE_SIZE:
read = conn.recv_into(mv[bytes_read:])
if read == 0:
raise OSError("Closed by peer")
bytes_read += read
conn.sendall(MESSAGE)
conn.close()
socket_server()
And these clients:
import socket, asyncio
def socket_client():
async def inner():
loop = asyncio.get_running_loop()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1', 1234))
s.setblocking(False)
await loop.sock_sendall(s, MESSAGE)
bytes_read = 0
mv = memoryview(bytearray(MESSAGE_SIZE))
while bytes_read < MESSAGE_SIZE:
read = await loop.sock_recv_into(s, mv[bytes_read:])
if read == 0:
raise OSError("Broken!")
bytes_read += read
s.close()
asyncio.run(inner())
def streams_client():
async def inner():
reader, writer = await asyncio.open_connection(
'127.0.0.1', 1234)
writer.write(MESSAGE)
await writer.drain()
data = await reader.readexactly(MESSAGE_SIZE)
assert len(data) == MESSAGE_SIZE
writer.close()
await writer.wait_closed()
asyncio.run(inner())
Shows that the streams-based client is much simpler than the socket approach, but performs worse for larger messages:
Message size 5MB:
Sockets: 0.25
Streams: 0.37
Message size 50MB:
Sockets: 2.12
Streams: 3.09
Message size 100MB:
Sockets: 4.41
Streams: 6.89
Am I misunderstanding the use case of streams or using them in an poorly-performing manner?
I’ve also written a similar Protocol-based test client that slightly outperforms sockets as data size increases, but the lack of an awaitable write() method makes them a non-starter for my use case. Is there a way to ensure that a Transport completes sending its message over the wire before continuing execution?
NoahStapp (Noah Stapp) December 12, 2024, 2:36pm 2
@asvetlov @graingert you’ve both contributed to Motor in the past, do you have any insights into asyncio network performance?
graingert (Thomas Grainger) December 12, 2024, 2:51pm 3
How does trio and anyio streams compare?
asvetlov (Andrew Svetlov) December 13, 2024, 10:32am 4
asyncio streams are the slowest.loop.sock_*()
functions are not bad but imply many epoll register/unregister calls for processed sockets.
The best performant and the hardest to implement approach is using asyncio protocols. I wrap all underlying logic into a class like Connection to provide high-level API with await conn.read()
and await conn.write()
interface.
The protocol-based way consists of two parts: reading and writing.
The writing requires correct handling of pause_reading()
/ resume_reading()
callbacks; you can look at ./Lib/asyncio/streams.py:FlowControlMixin
for inspiration. The flow control allows the implementation of await conn.write()
on top of this concept. From the performance perspective, the crucial thing is to use zero-copy where possible. For example, asyncio transport has tr.writelines()
method; the method calls sock.send_msg()
for Python 3.12+ to avoid extra memcpy()
.
Efficient reading could be achieved by utilizing zero-copy BufferedProtocol
as well. The idea is to allocate a buffer (bytearray) first and read into the buffer instead of alloc → read → process → free cycle provided by Protocol.data_received()
. BufferedProtocol.get_buffer()
returns a memoryview(self._buf)[self._offset:]
, BufferedProtocol.buffer_updated(n)
does self._offset += n
until the buffer has enough free space, self._offset = 0
otherwise.BufferedProtocol
is more effective if the parser of incoming data also works with memory views without actual data copying and constructs python objects just before emitting them.
For example, HTTP headers parser that converts a buffer into a python string, splits the string into lines by s.split('\r\n')
, splits each line into a header name and value, and emits corresponding name: value
pairs is not very fast: each str.split()
creates a bunch of new python strings (alloc+memcpy+free). The fast parser could first calculate offsets for each header name and value and create emitted Python strings only once. It would be the best solution if a non-copy parser already exists as a third-party C library.
Sorry for the long text; I hope you’ll get my point. Please feel free to ask if you have any questions.
NoahStapp (Noah Stapp) December 16, 2024, 7:17pm 5
Thanks for the detailed breakdown, Andrew! It sounds like the approach of essentially cloning FlowControlMixin
for write handling and implementing a simple BufferedProtocol
protocol should be highly performant. Am I misunderstanding the complexity of doing this? Here’s a small test example I wrote up:
class FastProtocol(asyncio.BufferedProtocol):
def __init__(self):
super().__init__()
self._buffer = memoryview(bytearray(MESSAGE_SIZE))
self._offset = 0
self._done = None
self._connection_lost = False
self._paused = False
self._drain_waiters = collections.deque()
self._loop = asyncio.get_running_loop()
def connection_made(self, transport):
self.transport = transport
async def write(self, message: bytes):
self.transport.write(message)
await self._drain_helper()
async def read(self):
self._done = self._loop.create_future()
await self._done
def get_buffer(self, sizehint: int):
return self._buffer[self._offset:]
def buffer_updated(self, nbytes: int):
if not self._done.done():
self._offset += nbytes
if self._offset == MESSAGE_SIZE:
self._done.set_result(True)
def pause_writing(self):
assert not self._paused
self._paused = True
def resume_writing(self):
assert self._paused
self._paused = False
for waiter in self._drain_waiters:
if not waiter.done():
waiter.set_result(None)
def connection_lost(self, exc):
self._connection_lost = True
# Wake up the writer(s) if currently paused.
if not self._paused:
return
for waiter in self._drain_waiters:
if not waiter.done():
if exc is None:
waiter.set_result(None)
else:
waiter.set_exception(exc)
async def _drain_helper(self):
if self._connection_lost:
raise ConnectionResetError('Connection lost')
if not self._paused:
return
waiter = self._loop.create_future()
self._drain_waiters.append(waiter)
try:
await waiter
finally:
self._drain_waiters.remove(waiter)
def data(self):
return self._buffer
Is there a reason asyncio streams aren’t implemented in this way? They already wrap transports/protocols internally, so speeding up their performance significantly seems to be mostly a matter of improving the underlying implementations. Are there other motivations behind the behavior of streams that preclude better native performance?
asvetlov (Andrew Svetlov) December 18, 2024, 7:39pm 6
The snippet is good in general, but there is an important misunderstanding:
The message size usually varies, and the allocated buffer is usually much larger than the message size.
Say the message size is 1-500 KB, and the buffer size is 4MB.
The implementation needs two offsets: read_off
and ready_off
.
<--------------------------- buffer --------------------->
: <= ready_off : <= read_off
<---- data to process -----> <-- reading space -->
async def FastProtocol.read():
should return a memoryview for buf[:ready_off: read_off]
; or it waits for the data if nothing is available.
In turn, the protocol should ask for reading into buf[read_off:]
until read_off
doesn’t reach the right bound (or something close to it, e.g. right_bound - buf_size // 4
to avoid enforcement of reading a few bytes).
After reaching the right bound, the reading space moves to the left: read_off = 0; buf[read_off: ready_off]
.
<--------------------------- buffer ------------------------------------>
: <= read_off : <= ready_off : <= right_bound
<----- reading space ----> <---- data to process ---->
When all data is processed, ready_off
is moved to the left (ready_off = 0
), and everything starts from the beginning.
If the idea is still unclear, I can try to display Python code.
NoahStapp (Noah Stapp) December 19, 2024, 8:08pm 7
Thanks again for the help!
I see, so the intent is to implement a semi-circular buffer inside the Protocol, wrapping around when the writing offset reaches the end?
Can you explain the difference between right_bound
and the end of the buffer? From my testing so far, if you track offsets and messages carefully, a message could have a section both at the end of the buffer and at the start, having wrapped around with the writing offset. Does adding a separate right_bound
simplify this?
NoahStapp (Noah Stapp) December 19, 2024, 8:10pm 8
Great suggestion, thanks!
I went ahead and tested out both stream alternatives. They were both faster than asyncio’s streams, but still slower than our implementation using sockets. We’d also prefer to avoid adding external dependencies to our library, even ones as popular as trio and anyio.
asvetlov (Andrew Svetlov) December 19, 2024, 8:14pm 9
By socket based implementation do you mean sock_send() / sock_recv() API?
NoahStapp (Noah Stapp) December 19, 2024, 9:16pm 10
Our initial implementation used the asyncio loop.* socket methods to implement network I/O since we use the synchronous versions in our synchronous API.
asvetlov (Andrew Svetlov) December 23, 2024, 3:32pm 11
@NoahStapp Does adding a separate
right_bound
simplify this?
It depends on your message structure. Sometimes, the wrapping is undesirable, and proper tuning of right_bound
could prevent it.
In another case, the parser could construct the message from two parts, the first at the right and the second at the left.
Even if you have such an advanced parser, reducing the right bound could help reduce the number of syscalls.
For example, len(buf)
is 1024 (in reality, it should be 4MB or even 32MB for better network utilization), read_off
is 1000
, and the socket buffer has 200 unread bytes.
If proto.get_buffer()
returns memory_view(buf)[1000:]
the reading of 200 ‘ready’ bytes requires two syscalls: read_info(buf[1000:])
for fetching first 24 ‘ready’ bytes and next read_into(buf[0:ready_off])
for fetching next 176 bytes for the socket buffer.
Fine-tuning of right_bound
could avoid the first call and switch to read_into(buf[0:ready_off])
to get all the data from the socket read buffer in the single syscall.
It can make a visible speedup. Also, having a buffer big enough to store multiple messages in the buffer by the single read_into()
syscall is essential.
If the message parsing is fast, reducing the number of syscalls is the key. If not, the task is not io-bound anymore, and network performance is not a bottleneck.