Add draining feature to asyncio.Queue by nhumrich · Pull Request #415 · python/asyncio (original) (raw)
This feature adds the ability to drain a Queue. This is useful for cleanup steps, especially in the simple websocket case.
It adds the following things:
- (coroutine) drain - waits until the queue is drained, errors on puts, errors on gets once queue is empty
- drain_nowait
- close - closes queue immediately, errors on all gets and puts
Imagine we have the following code:
import asyncio q = asyncio.Queue()
async def wait_for_it(): while True: t = await q.get() q.task_done()
loop = asyncio.get_event_loop() asyncio.ensure_future(wait_for_it())
try: loop.run_forever() except KeyboardInterrupt: pass
This actually creates a lot of warnings/errors on shutdown, such as
Task was destroyed but it is pending!
task: <Task pending coro=<wait_for_it() running at test.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>
Exception ignored in: <coroutine object wait_for_it at 0x7f594abb7830>
Traceback (most recent call last):
File "test.py", line 6, in wait_for_it
File "/usr/lib/python3.5/asyncio/queues.py", line 170, in get
File "/usr/lib/python3.5/asyncio/futures.py", line 227, in cancel
File "/usr/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed
The fix is
import asyncio
q = asyncio.Queue()
async def wait_for_it(): while True: t = await q.get() q.task_done()
loop = asyncio.get_event_loop() task = loop.create_task(wait_for_it())
try: loop.run_forever() except KeyboardInterrupt: pass finally: task.cancel() pending = asyncio.Task.all_tasks() try: loop.run_until_complete(asyncio.gather(*pending)) except asyncio.CancelledError: print('expected') loop.close()
which is very unwieldy and not obvious.
Instead I propose a drain feature which gives us the following code
import asyncio
q = asyncio.Queue()
async def wait_for_it(): try: while True: t = await q.get() q.task_done() except asyncio.QueueClosed: print('closed')
loop = asyncio.get_event_loop() asyncio.ensure_future(wait_for_it()) asyncio.ensure_future(wait_for_it())
try: loop.run_forever() except KeyboardInterrupt: pass finally: loop.run_until_complete(q.drain()) loop.close()
which is much cleaner and simple.