gh-96471: Add asyncio queue shutdown by EpicWink 路 Pull Request #104228 路 python/cpython (original) (raw)
I'd like to add that the shutdown()
method and task groups makes certain patterns really easy to express. I was struggling with how to properly make a generic async map function with a fixed number of workers (mostly as an exercise), but when I saw the new shutdown method, it came together really easily:
async def async_map[T, R]( func: Callable[[T], Awaitable[R]], iterable: AsyncIterable[T], *, limit: int, maxsize: int = -1, ) -> AsyncIterator[R]: if maxsize < 0: maxsize = limit
arguments_queue = Queue[T](maxsize=maxsize)
results_queue = Queue[R](maxsize=maxsize)
async def drain():
async for argument in iterable:
await arguments_queue.put(argument)
arguments_queue.shutdown(immediate=False)
async def worker():
while True:
try:
argument = await arguments_queue.get()
except QueueShutDown:
break
await results_queue.put(await func(argument))
async def background():
async with asyncio.TaskGroup() as background_task_group:
background_task_group.create_task(drain())
for _ in range(limit):
background_task_group.create_task(worker())
results_queue.shutdown(immediate=False)
async with asyncio.TaskGroup() as task_group:
task_group.create_task(background())
while True:
try:
yield await results_queue.get()
except QueueShutDown:
break
I believe this has a common issue that a lot of async generators do, in that if you don't consume the entire generator, it will still continue processing and end up with a lot of futures never awaited.
I know there's some backports
. Is there a backports package for asyncio stuff already?