gh-96471: Add asyncio queue shutdown by EpicWink 路 Pull Request #104228 路 python/cpython (original) (raw)

I'd like to add that the shutdown() method and task groups makes certain patterns really easy to express. I was struggling with how to properly make a generic async map function with a fixed number of workers (mostly as an exercise), but when I saw the new shutdown method, it came together really easily:

async def async_map[T, R]( func: Callable[[T], Awaitable[R]], iterable: AsyncIterable[T], *, limit: int, maxsize: int = -1, ) -> AsyncIterator[R]: if maxsize < 0: maxsize = limit

arguments_queue = Queue[T](maxsize=maxsize)
results_queue = Queue[R](maxsize=maxsize)

async def drain():
    async for argument in iterable:
        await arguments_queue.put(argument)
    arguments_queue.shutdown(immediate=False)

async def worker():
    while True:
        try:
            argument = await arguments_queue.get()
        except QueueShutDown:
            break
        await results_queue.put(await func(argument))

async def background():
    async with asyncio.TaskGroup() as background_task_group:
        background_task_group.create_task(drain())
        for _ in range(limit):
            background_task_group.create_task(worker())
    results_queue.shutdown(immediate=False)

async with asyncio.TaskGroup() as task_group:
    task_group.create_task(background())

    while True:
        try:
            yield await results_queue.get()
        except QueueShutDown:
            break

I believe this has a common issue that a lot of async generators do, in that if you don't consume the entire generator, it will still continue processing and end up with a lot of futures never awaited.

I know there's some backports. Is there a backports package for asyncio stuff already?