Fix #338 os.writev() instead of os.write(b.join()) and sendmsg() instead of send() by socketpair · Pull Request #339 · python/asyncio (original) (raw)
This repository was archived by the owner on Nov 23, 2017. It is now read-only.
Conversation186 Commits4 Checks0 Files changed
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters
[ Show hidden characters]({{ revealButtonHref }})
Please don't submit a PR that fails the tests and has typos in it. At the very least run it against your test program with and without drain().
@gvanrossum I have fix typo, and checked spped:
Before my patch:
without drain(): 4.5 sec
with drain(): 0.2 sec
After my patch:
without drain(): 0.2 sec
with drain(): 0.2 sec
P.S. Thinking how to fix tests. Please point me....
I guess you have to mock os.writev too. Time to learn something!
BTW great news on the speed!
@gvanrossum
Yes, I understand you fully. Have updated PR with tests fixed, but with ugly hack to overcome @mock.patch('os.writev')
limitation....see comments
# @mock.patch('os.writev') stores internal reference to _buffer, which is MODIFIED |
---|
# during this call. So, makeing deep copy :( |
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! |
n = os.writev(self._fileno, self._buffer.copy()) |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There must be a better way here. Adding a copy just for convenient testing isn't a good idea. Please also remove all exclamation marks, smiles, and fix typos.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@1st1
I consider you don't ever think to merge PR with such hack, so I added these comments to bring your attention. PR is not ready for merge. But I must point to this bad place.
If you know 'better way' please point me. I will fix immediatelly.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK ;) Please try to find a saner approach. I'll myself try to take a look later.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has removed exclamation marks and other trash. But I stil don't know how to overcome that annoying bug in unittest.mock
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally, Have fixed test as @1st1 said.
I don't follow. Does the real os.writev()
modify its buffer argument? Or the mock one?
@gvanrossum, in short code is:
buf = [1,2,3]
writev(buf) # < mock remembers REFERENCE to buf, not contents.
buf.pop()
Note, that writev
was called with [1,2,3]
, but mock see (in assert
) that buf is [2,3]
self._buffer.append(data) # Try again later. |
self._loop.remove_writer(self._fileno) |
self._maybe_resume_protocol() # May append to buffer. |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems another bug. in sockets, self._maybe_resume_protocol()
is called before if self._buffer
. So, who is right ?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would trust the socket code more. So I'd remove line 544-545 above and see how it fares.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed and make behaviour as in sockets.
@1st1 I think the most convenient way is to merge my PR in current state, and after that, merge you PR.
Is it suitable workflow ?
@socketpair You can just incorporate my change into your PR. Please add a comment to patched_os_writev
explaining why we need that craziness.
Sorry, please don't merge PR. I have checked performance. Using just expanding bytearray is much faster. C is not Python. :(
This is reproduced both for sockets and for pipes. In theory, writev() / sendmsg() should be faster. In reality - is not. I can not believe that this is true.
I will investigate why this happen. Please don't merge PR until problem resolved.
Program used to benchmark:
#!/usr/bin/env python3.5
import asyncio
import os
async def handle_echo(rs, ws):
for i in range(30):
ws.write(b'x' * (10*1024*1024))
print('writer all completely, drainig')
await ws.drain()
print('drainig complete')
ws.close()
async def test_read_arch(rs, ws):
s = 0
while True:
d = await rs.read(65536)
if not d:
break
s += len(d)
print('Read %d bytes completely' % s)
ws.close()
async def amain():
server = await asyncio.start_server(handle_echo, '127.0.0.1', 8888)
(rs, ws) = await asyncio.open_connection('127.0.0.1', 8888)
server.close()
await server.wait_closed()
await test_read_arch(rs, ws)
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(amain())
if __name__ == '__main__':
main()
This is very strange. Using bytearray
means more memory allocations and copy calls. From what I can see, writev
implementation correctly uses buffer protocol, thus avoiding any extra memory work. What platform are you running your benchmark on? What are the numbers?
Honestly I'm not that surprised that bytearray performs better. Resizing a byte array is one of the fastest things you can do in CPython (because `realloc()` handles the copy for you).
I'm online, will continue soon. I spent last week in another country, now I returned back to home, and today is a first working day at my job.
Sure, no rush. 3.5.2 will happen relatively soon, would be nice to have this reviewed and merged before that.
great news! Now I know why bytearray was faster.
Now, my code is even faster than bytearray variant! since it true zerocopy! WOOHOO!
$ git checkout -f bytea
$ time ./bench_pipes.py
real 0m0.585s
user 0m0.336s
sys 0m0.244s
$ time ./bench_pipes.py
real 0m0.535s
user 0m0.356s
sys 0m0.148s
$ time ./bench_pipes.py
real 0m0.498s
user 0m0.344s
sys 0m0.152s
$ git checkout writev
anya@mmarkk-imac:~/src/GH/asyncio$ time ./bench_pipes.py
real 0m0.370s
user 0m0.284s
sys 0m0.084s
anya@mmarkk-imac:~/src/GH/asyncio$ time ./bench_pipes.py
real 0m0.354s
user 0m0.280s
sys 0m0.068s
n -= len(chunk) |
---|
if n < 0: |
# only part of chunk was written, so push unread part of it back to _buffer |
# memoryview is required to eliminate memory copying while slicing. |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a cause of slow throughput
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Cool! Could you please fix the code style to 79 cars per line?
@@ -617,7 +617,7 @@ def _call_connection_lost(self, exc): |
---|
self._server = None |
def get_write_buffer_size(self): |
return len(self._buffer) |
return sum(len(data) for data in self._buffer) |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe instead of running sum
each time, we can have an internal _buffer_length
attribute?
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe, but I copy-pasted it from another place. Yes I can fix that (and perform benchmarking).
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect most of the time the data
queue won't be large, so you really need to overwhelm the write buffer in your benchmarks to see the benefits of caching.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I will not fix that. Code become too complex and error prone. Especially when exception occurs in some specific places. I tried.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, I have fixed anyway. I also added many asserts.
Also, Python3.5 does not support sendmmsg()
. If it will, I can also improve datagram transport
if self._conn_lost: |
return |
try: |
n = self._sock.send(self._buffer) |
if compat.HAS_SENDMSG: |
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HAS or HAVE ? :)
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HAS is better.
ping
Sorry, Mark, I'm busy with other things. This is quite an intrusive change and it has to be merged with extra care. And we have plenty of time before 3.6.
@1st1 I can split PR to small parts. After each part, everything will be consistent. Should I do that ?
1st1 mentioned this pull request
@1st1 I can split PR to small parts. After each part, everything will be consistent. Should I do that ?
How do you want to split it?
Actually, I was just looking at this PR again, and I think it's ready. I need to spend another hour playing with the code, but I'd say it's almost there.
I think .sendto()
and .write()
should data = bytes(data)
to avoid pitfalls.
If data is bytes already, there is no performance penalty.
If data is bytearray or memoryview, user can reuse buffer to avoid penalty.
@arthurdarcet can you benchmark asyncio streams with this patch applied, using included tool (in this PR) on Mac OS X ? This patch should speedup pipes significantly on that platform too.
I feel it's a premature optimization.
I've benchmarked on Ubuntu 16.04, with slightly modified bench_streams.py
.
master branch (bytearray buffer) is faster for smaller (~500 bytes) messages.
master branch:
bench_sockets: 10000000 buffers by 10 bytes : 11.32 seconds (8.42 MiB/sec)
bench_sockets: 5000000 buffers by 20 bytes : 5.72 seconds (16.67 MiB/sec)
bench_sockets: 2000000 buffers by 50 bytes : 2.37 seconds (40.22 MiB/sec)
bench_sockets: 1000000 buffers by 100 bytes : 1.26 seconds (75.42 MiB/sec)
bench_sockets: 500000 buffers by 200 bytes : 0.70 seconds (135.37 MiB/sec)
bench_sockets: 200000 buffers by 500 bytes : 0.37 seconds (256.61 MiB/sec)
bench_sockets: 100000 buffers by 1000 bytes : 0.26 seconds (370.33 MiB/sec)
bench_sockets: 50000 buffers by 2000 bytes : 0.20 seconds (474.92 MiB/sec)
bench_sockets: 20000 buffers by 5000 bytes : 0.16 seconds (580.86 MiB/sec)
bench_sockets: 10000 buffers by 10000 bytes : 0.15 seconds (620.15 MiB/sec)
bench_sockets: 5000 buffers by 20000 bytes : 0.15 seconds (631.98 MiB/sec)
bench_sockets: 2000 buffers by 50000 bytes : 0.15 seconds (649.65 MiB/sec)
bench_sockets: 10000000 buffers by 10 bytes (with intermediate drains): 22.57 seconds (4.22 MiB/sec)
bench_sockets: 5000000 buffers by 20 bytes (with intermediate drains): 11.29 seconds (8.45 MiB/sec)
bench_sockets: 2000000 buffers by 50 bytes (with intermediate drains): 4.52 seconds (21.09 MiB/sec)
bench_sockets: 1000000 buffers by 100 bytes (with intermediate drains): 2.31 seconds (41.31 MiB/sec)
bench_sockets: 500000 buffers by 200 bytes (with intermediate drains): 1.19 seconds (80.05 MiB/sec)
bench_sockets: 200000 buffers by 500 bytes (with intermediate drains): 0.52 seconds (183.04 MiB/sec)
bench_sockets: 100000 buffers by 1000 bytes (with intermediate drains): 0.30 seconds (320.99 MiB/sec)
bench_sockets: 50000 buffers by 2000 bytes (with intermediate drains): 0.19 seconds (514.17 MiB/sec)
bench_sockets: 20000 buffers by 5000 bytes (with intermediate drains): 0.12 seconds (814.84 MiB/sec)
bench_sockets: 10000 buffers by 10000 bytes (with intermediate drains): 0.09 seconds (1013.89 MiB/sec)
bench_sockets: 5000 buffers by 20000 bytes (with intermediate drains): 0.08 seconds (1152.17 MiB/sec)
bench_sockets: 2000 buffers by 50000 bytes (with intermediate drains): 0.08 seconds (1257.04 MiB/sec)
socketpair:writev branch:
bench_sockets: 10000000 buffers by 10 bytes : 15.59 seconds (6.12 MiB/sec)
bench_sockets: 5000000 buffers by 20 bytes : 7.84 seconds (12.16 MiB/sec)
bench_sockets: 2000000 buffers by 50 bytes : 3.17 seconds (30.07 MiB/sec)
bench_sockets: 1000000 buffers by 100 bytes : 1.61 seconds (59.39 MiB/sec)
bench_sockets: 500000 buffers by 200 bytes : 0.85 seconds (111.82 MiB/sec)
bench_sockets: 200000 buffers by 500 bytes : 0.38 seconds (247.81 MiB/sec)
bench_sockets: 100000 buffers by 1000 bytes : 0.23 seconds (419.86 MiB/sec)
bench_sockets: 50000 buffers by 2000 bytes : 0.15 seconds (651.97 MiB/sec)
bench_sockets: 20000 buffers by 5000 bytes : 0.10 seconds (946.71 MiB/sec)
bench_sockets: 10000 buffers by 10000 bytes : 0.08 seconds (1131.09 MiB/sec)
bench_sockets: 5000 buffers by 20000 bytes : 0.08 seconds (1236.96 MiB/sec)
bench_sockets: 2000 buffers by 50000 bytes : 0.07 seconds (1324.88 MiB/sec)
bench_sockets: 10000000 buffers by 10 bytes (with intermediate drains): 26.38 seconds (3.61 MiB/sec)
bench_sockets: 5000000 buffers by 20 bytes (with intermediate drains): 12.92 seconds (7.38 MiB/sec)
bench_sockets: 2000000 buffers by 50 bytes (with intermediate drains): 5.34 seconds (17.87 MiB/sec)
bench_sockets: 1000000 buffers by 100 bytes (with intermediate drains): 2.38 seconds (40.15 MiB/sec)
bench_sockets: 500000 buffers by 200 bytes (with intermediate drains): 1.24 seconds (77.00 MiB/sec)
bench_sockets: 200000 buffers by 500 bytes (with intermediate drains): 0.55 seconds (174.32 MiB/sec)
bench_sockets: 100000 buffers by 1000 bytes (with intermediate drains): 0.31 seconds (311.23 MiB/sec)
bench_sockets: 50000 buffers by 2000 bytes (with intermediate drains): 0.19 seconds (498.30 MiB/sec)
bench_sockets: 20000 buffers by 5000 bytes (with intermediate drains): 0.12 seconds (804.88 MiB/sec)
bench_sockets: 10000 buffers by 10000 bytes (with intermediate drains): 0.09 seconds (1005.35 MiB/sec)
bench_sockets: 5000 buffers by 20000 bytes (with intermediate drains): 0.08 seconds (1141.88 MiB/sec)
bench_sockets: 2000 buffers by 50000 bytes (with intermediate drains): 0.08 seconds (1258.54 MiB/sec)
- And, seems, I know why. Python build
iov
array every time :( - Seems, we should test on real use cases. Actually, it will be not such big buffers with count of 10 :
writev
:
pros:
- true zero copy
- slightly faster (it needs to check if use cases in becnhamrk are valid)
- use syscall that was intended exactly for that
cons:
- writev is not supported in Windows
- adds restrictions on passing of mutable buffers
- complex and fragile code
bytearray
pros:
- much simplier
- do not require hack for Windows
cons:
- slightly slower (it needs to check if use cases in becnhamrk are valid)
@gvanrossum @1st1 @methane
I can not choose by myself. :( please help me to make decision :)
Another cons of writev
: TypeError
may happen in _write_ready()
, not write()
.
Checking "bytes-like" object without copy is very hard in pure Python.
I think following code is same to PyObject_GetBuffer(data, &view, PyBUF_SIMPLE)
,
but I'm not sure.
mv = memoryview(data) if mv.itemsize != 1 or not mv.contiguous: raise TypeError
bytearray += data
raises TypeError when data is not bytes-like object.
My preference is, use bytearray() in pure Python. But make it replacable.
Someone can implement more smart buffer like below in C.
def init(self): self.queue = deque()
def iadd(self, data): if type(data) is bytes and len(data) > 200: self.queue.append(data) return if not self.queue or type(self.queue[-1]) is bytes: self.queue.append(bytearray()) self.queue[-1] += data
1st1 mentioned this pull request
Copy-pasting here:
small data:
bytearray:
bench_sockets: 10000000 buffers by 10 bytes : 11.32 seconds (8.42 MiB/sec)
bench_sockets: 1000000 buffers by 100 bytes : 1.26 seconds (75.42 MiB/sec)
writev:
bench_sockets: 10000000 buffers by 10 bytes : 15.59 seconds (6.12 MiB/sec)
bench_sockets: 1000000 buffers by 100 bytes : 1.61 seconds (59.39 MiB/sec)
large data:
bytearray:
bench_sockets: 100000 buffers by 1000 bytes : 0.26 seconds (370.33 MiB/sec)
bench_sockets: 10000 buffers by 10000 bytes : 0.15 seconds (620.15 MiB/sec)
bench_sockets: 2000 buffers by 50000 bytes : 0.15 seconds (649.65 MiB/sec)
writev:
bench_sockets: 100000 buffers by 1000 bytes : 0.23 seconds (419.86 MiB/sec)
bench_sockets: 10000 buffers by 10000 bytes : 0.08 seconds (1131.09 MiB/sec)
bench_sockets: 2000 buffers by 50000 bytes : 0.07 seconds (1324.88 MiB/sec)
I understand, that simplicity of code is important thing. In real use cases (i.e. small amount of data and small count of chunks) both implementations give the same performance.
For pipes, use os.writev() instead of os.write(b.join()). For sockets, use sendmsg() instead of send(b''.join()).
Also change plain list of buffers to deque() of buffers.
This greatly improve performance on large buffers or on many stream.write() calls.
Mark, I'm closing this PR. I think we'd want to have vectorized writes in 3.7, so when you have a new patch please open a new PR.