Add draining feature to asyncio.Queue by nhumrich · Pull Request #415 · python/asyncio (original) (raw)

This feature adds the ability to drain a Queue. This is useful for cleanup steps, especially in the simple websocket case.

It adds the following things:

Imagine we have the following code:

import asyncio q = asyncio.Queue()

async def wait_for_it(): while True: t = await q.get() q.task_done()

loop = asyncio.get_event_loop() asyncio.ensure_future(wait_for_it())

try: loop.run_forever() except KeyboardInterrupt: pass

This actually creates a lot of warnings/errors on shutdown, such as

Task was destroyed but it is pending!
task: <Task pending coro=<wait_for_it() running at test.py:6> wait_for=<Future pending cb=[Task._wakeup()]>>
Exception ignored in: <coroutine object wait_for_it at 0x7f594abb7830>
Traceback (most recent call last):
  File "test.py", line 6, in wait_for_it
  File "/usr/lib/python3.5/asyncio/queues.py", line 170, in get
  File "/usr/lib/python3.5/asyncio/futures.py", line 227, in cancel
  File "/usr/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
  File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed

The fix is

import asyncio

q = asyncio.Queue()

async def wait_for_it(): while True: t = await q.get() q.task_done()

loop = asyncio.get_event_loop() task = loop.create_task(wait_for_it())

try: loop.run_forever() except KeyboardInterrupt: pass finally: task.cancel() pending = asyncio.Task.all_tasks() try: loop.run_until_complete(asyncio.gather(*pending)) except asyncio.CancelledError: print('expected') loop.close()

which is very unwieldy and not obvious.

Instead I propose a drain feature which gives us the following code

import asyncio

q = asyncio.Queue()

async def wait_for_it(): try: while True: t = await q.get() q.task_done() except asyncio.QueueClosed: print('closed')

loop = asyncio.get_event_loop() asyncio.ensure_future(wait_for_it()) asyncio.ensure_future(wait_for_it())

try: loop.run_forever() except KeyboardInterrupt: pass finally: loop.run_until_complete(q.drain()) loop.close()

which is much cleaner and simple.