When there is a huge amount of `writer.write` calls followed by `await writer.drain()` on a non-paused channel, and there are no other coroutine switches, `await writer.drain()` immediately returns without a switch. This is because `asyncio.stream.FlowControlMixin._drain_helper` do not `yield` or `yield from` on a non-paused stream. Use-case: AMQP basic.publish method, for which the broker (rabbitmq) do not send any replies back. Trying to publish 4k messages results in the following warnings (PYTHONASYNCIODEBUG env variable is set): `Executing <Handle <TaskWakeupMethWrapper object at 0x1106fde28>() created at /Users/malinoff/Projects/ideas/amqproto/amqproto/channel.py:85> took 2.371 seconds` 2.371 seconds is the time spent on 4k `basic_publish` calls. You can find the test itself on github: https://github.com/malinoff/amqproto/blob/master/tests/stress/test_4k_msgs.py#L11-L12 An easy fix would be to replace return (https://github.com/python/cpython/blob/master/Lib/asyncio/streams.py#L206) with yield (and but the code below under the else clause; I'm willing to prepare a pull request), but maybe I'm missing something and such behavior is intentional?
I've changed the implementation significantly since August 2017, futures are not involved anymore so please ignore that part. However, such drain behavior is still present - but I don't think anymore that yielding to the event loop is an easy fix. I've tried to do so in my lib, and it showed significant slowdowns (around 4-5k publishes per second). It's not acceptable. I also found this message from Guido https://github.com/python/asyncio/issues/263#issuecomment-142702725. What really helped is a counter that tracks send calls without waiting for replies, and a user-provided limit; when the counter reaches the limit, an explicit yield (via await asyncio.sleep(0)) is performed. This helped to achieve around 15-16k publishes per second (3-4 times faster. Here's the code: https://github.com/malinoff/amqproto/blob/6568204b539ecf820af2da11bddcca9ce7323ac5/amqproto/adapters/asyncio_adapter.py#L53-L71 Now I'm thinking that such behavior should only be documented - so library authors can deal with it before they face this in production. But if you have other thoughts, I'd be glad to hear.