How to cancel all tasks created within same TaskGroup (original) (raw)
TaskGroup is quite new thing and I didn’t find functionality for easily cancelling all other tasks that might be running in the same task group. Only method TaskGroup object has is create_task
.
Is this the right way to go?
import asyncio
from asyncio import Task
from random import random
async def job_1(tasks: list[Task]):
while True:
await asyncio.sleep(1) # Work that is being done
if random() < 0.1: # When every task should get cancelled
for task in tasks:
if task == asyncio.current_task():
print("Task 1 cancelling others first")
continue
if task.done():
print("Some task was already done")
task.cancel()
return
print("Task 1")
async def job_2(tasks: list[Task]):
while True:
await asyncio.sleep(1)
if random() < 0.1:
for task in tasks:
if task == asyncio.current_task():
print("Task 2 cancelling others first")
continue
if task.done():
print("Some task was already done")
task.cancel()
return
print("Task 2")
async def daemon():
while True:
await asyncio.sleep(1)
print("Daemon task")
async def main():
async with asyncio.TaskGroup() as group:
tasks: list[Task] = []
task_1 = group.create_task(job_1(tasks))
task_2 = group.create_task(job_2(tasks))
daemon_task = group.create_task(daemon())
tasks.append(task_1)
tasks.append(task_2)
tasks.append(daemon_task)
asyncio.run(main())
smurfix (Matthias Urlichs) September 5, 2023, 6:16am 2
Trio and anyio afford a taskgroup.cancel_scope.cancel
method for this.
Asyncio doesn’t have cancel scopes, and anyway the additional indirection doesn’t make much sense, thus I’m going to propose adding a taskgroup.cancel
method for this.
Currently the only way seems to be to cancel the task itself, which is stored in the taskgroup as a private datum and thus really shouldn’t be used. Alternately you can pass the task itself around, which is a problem because the task might not even be running within the taskgroup’s scope any more – and cancelling an inactive taskgroup should be a no-op.
arthur-tacca (Arthur Tacca) March 13, 2024, 11:12am 3
Matthias is right that you can cancel the task that the task group is running in (assuming that nothing else happens before/after the task group block, otherwise you’ll need to refactor it into its own function).
But another thing that will cancel all the tasks in a task group is when one of the tasks in the group raises an exception. That’s the whole point of task groups! So you could just manually inject an exception with TaskGroup.create_task()
and then catch it just outside. Here’s your example reformatted to do this:
Code example using group.create_task(_raise_cancel_request())
import asyncio
from random import random
class CancelRequestException(Exception):
pass
async def raise_cancel_request():
raise CancelRequestException
async def job(i, group):
while True:
print(f"Task {i} before sleep")
await asyncio.sleep(1)
print(f"Task {i} after sleep")
if random() < 0.1:
print(f"Task {i} cancelling others")
group.create_task(raise_cancel_request())
return
async def daemon():
while True:
print("Daemon before sleep")
await asyncio.sleep(1)
print("Daemon after sleep")
async def main():
try:
async with asyncio.TaskGroup() as group:
group.create_task(job(1, group))
group.create_task(job(2, group))
group.create_task(daemon())
except* CancelRequestException:
pass
asyncio.run(main())
(If you change random() < 0.1
to e.g. random() < 0.8
and run it a few times, you will see that sometimes task 1 will try to cancel but task 2 will then wake up and cancel too. That’s because create_task(_raise_cancel_request())
will schedule the _raise_cancel_request()
but not run it straight away, so task 2 is not cancelled quite immediately. That’s not necessarily a problem but it’s certainly a difference from Matthias’s method.)
By the way, with this particular example, you can simply raise the exception directly from job()
! No TaskGroup.create_task()
or extra hypothetical TaskGroup.cancel()
method needed. But I assume you’re talking about a situation where you might want to cancel a task group from a task running in some other task group.
It’s possible to wrap this technique up into a cancellable task group class if that’s useful. I’m not an expert on context managers so I’m not 100% sure this is right but it seems to work:
import asyncio
class _CancelRequestException(Exception):
pass
async def _raise_cancel_request():
raise _CancelRequestException
class CancellableTaskGroup(asyncio.TaskGroup):
async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
result = await super().__aexit__(exc_type, exc_val, exc_tb)
# This surely means that there was no exception, otherwise super().__aexit__() would
# have wrapped it in an exception group and explicitly raised it.
# (In principle, if the main body raised a SystemExit or KeyboardInterrupt then that
# could follow this route, although in practice the current implementation of
# TaskGroup always re-raises these rather than just returning True.)
return result
except* _CancelRequestException:
pass # Suppress cancel request exception
# If we got here, the only exception in the ExceptionGroup was _CancelRequestException
return True
def cancel(self):
self.create_task(_raise_cancel_request())
Example using CancellableTaskGroup
from random import random
async def job(i, group):
while True:
print(f"Task {i} before sleep")
await asyncio.sleep(1)
print(f"Task {i} after sleep")
if random() < 0.1:
print(f"Task {i} cancelling others")
group.cancel()
return
async def daemon():
while True:
print("Daemon before sleep")
await asyncio.sleep(1)
print("Daemon after sleep")
async def main():
async with CancellableTaskGroup() as group:
group.create_task(job(1, group))
group.create_task(job(2, group))
group.create_task(daemon())
asyncio.run(main())
sobolevn (Nikita Sobolev) September 5, 2024, 2:38pm 4
arthur-tacca (Arthur Tacca) January 1, 2025, 10:43pm 5
There’s active discussion about this feature natively to task groups over on that github issue.
One thing I wanted to mention here is that I had the idea of cancel_called and cancel_caught attributes too, in a similar way that Trio does for cancellation scopes. Those tend to be quite useful so perhaps they would be useful here too, and they would be really easy to add to the implementation.
guido (Guido van Rossum) January 1, 2025, 10:58pm 6
Then again isn’t this a feature to consider separately? It doesn’t seem tied to task groups (even if it is in trio).
blhsing (Ben Hsing) January 2, 2025, 2:19am 7
There was this SO question python - How to cancel all tasks in a TaskGroup - Stack Overflow a while ago, to which there were two working answers (the first of which is mine):
- Use the undocumented TaskGroup._abort method, which does exactly what you want and which I think should be made public.
- Use another task to wrap the function that creates the TaskGroup and cancel that task instead.
belm0 (John Belmonte) January 2, 2025, 7:54am 8
Unfortunately it does not, as explained in the github issue, and as evident by the proposed implementation PR.
arthur-tacca (Arthur Tacca) January 2, 2025, 9:37am 9
Definitely cancel_called
and cancel_caught
could be considered separately after/if this one gets accepted. I just mentioned them here because someone on the github issue talked about detecting why a task group finished and their idea was much more complex (raising a different cancellation-like exception).
I was caught off guard by your comment those attributes not being tied to task groups. Maybe you were thinking of them raw asyncio.Task
objects? They actually already have analogues of them: they’re asyncio.Task.cancelling()
and asyncio.Task.cancelled()
respectively.
guido (Guido van Rossum) January 2, 2025, 6:53pm 10
I’m sorry, I was on my phone and didn’t follow the links. I see now that they are tied to cancel scopes in Trio, and the closest equivalent asyncio has appears to be a task group. Nevertheless let’s open a separate Discourse topic or GitHub issue for that.
rrevans (Robert Evans) January 2, 2025, 10:05pm 11
(Reposting my design idea from github here for design discussion.)
What if TaskGroup.cancel()
is defined as explicitly raising CancelledError
from TaskGroup.__aexit__
upon successful cancellation instead of exiting normally from the suite?
Raising an exception makes the unusual cancel-exit condition more explicit and lets callers use normal exception handling to detect, recover, or propagate to enclosing scopes.
Calling cancel()
would act exactly as if the suite had itself raised CancelledError
and has no effect at all if the context has already exited or if the suite or a task raises a different exception. Calling cancel()
from the suite immediately raises CancelledError
.
This design makes TaskGroup.cancel
and Task.cancel
more closely related. Both would raise CancelledError
out of the scope being cancelled:
- For
TaskGroup.cancel()
the cancelled thing is the context of thewith
-statement - For
Task.cancel()
the cancelled thing is the whole body of the coroutine
The need for a try
statement to recover is a drawback of this approach. That said, being explicit seems better and programming errors that swallow parent cancel by mistake are probably less likely to be severe than programming errors that silently continue past an unhandled and unexpected TaskGroup
cancellation.
Example:
try:
async with asyncio.TaskGroup() as g:
# arrange to call g.cancel upon some condition
g.create_task(some_thing(g.cancel))
...
except asyncio.CancelledError:
# propagate when enclosing task is cancelled
if asyncio.current_task().cancelling():
raise
pass # g.cancel() was called
arthur-tacca (Arthur Tacca) January 5, 2025, 11:02am 12
There is so much wrong with this comment…
- Your snippet at the end assumes that
TaskGroup.cancel()
(orTaskGroup.stop()
or whatever it’s called) would propagateasyncio.CancelledError
but leavecurrent_task().cancelling() == 0
. This is not correct; a propagating cancellation should always havecancelling()
non-zero (specifically, incremented by one compared to its old value). - If you had assumed correct semantics for a propagating
CancelledError
(withcancelling()
left incremented by one) then there would be no way to tell whether it was fromTaskGroup.cancel()
or some external source. That’s really a fundemental problem with your API idea. - If you had assumed correct semantics for a propagating
CancelledError
then someone wanting to catch the exception would also need to remember to calluncancel()
. IMO, application authors shouldn’t need to do this. The whole benefit of this API is that it gives a controlled way to manually raise a cancellation and to uncancel it later without worrying about these internal details. - “being explicit seems better … unexpected TaskGroup cancellation” IMO, if you cancel a specific object (task or task group or anything else), then propagating that further is the thing that needs to be made explicit. Your API, where it automatically propagates futher, is less explicit.
- You say that
Task.cancel()
“would raise CancelledError out of the scope being cancelled” butTask.cancel()
does not do that: it’sawait task
that does this (and I would always recommendawait asyncio.wait([task])
exactly so that it doesn’t mix up child and parent task cancellation). [Edit: Even more relevant here, cancelling an individual task in a task group would also not cancel the task group or its enclosing task, not even when it gets to__aexit__()
. Again, only awaiting the task object would cause the propagation.] - Your API is not even consistent with
TaskGroup
itself! As you mentioned on Github, when it cancels its enclosing task due to a child task raising an exception, it does not propagate that out (unless there’s another cancellation for another reason). Surely this API should be consistent with that.
Ultimately, what you are suggesting is not terribly different from just cancelling the enclosing task. You can already do that. The API suggested here is genuinely useful in a way your suggestion just isn’t, as I think you would see if you genuinely had a need in your own application. Your comments have seriously railroaded this discussion, which is really frustrating there are actual subtleties to discuss (the name for the API, what it should do before/after the context manager, etc.).
rrevans (Robert Evans) January 6, 2025, 11:42am 13
Well, I did mean that. The exception signals cancel-exit, and is intended to be caught not propagated. Though if uncaught the enclosing task does exit with cancelled() == True
.
Putting that aside, what should happen below if the TaskGroup
is cancelled while suspended at one of the await
expressions?
async with asyncio.TaskGroup() as g:
something.add_callback(g.cancel)
t1 = g.create_task(first_part(...))
t2 = g.create_task(second_part(...))
r = await t1
r.merge(await t2)
return r
arthur-tacca (Arthur Tacca) January 7, 2025, 9:36pm 14
If some other task calls tg.cancel()
then, this task would be resumed and for that await would raise asyncio.CancelledError
[1]. This is completely standard; it’s exactly the same as for cancelling the whole task and for aborting a task group (due to a non-cancel exception from a child task).
When the asyncio.CancelledError
exception reaches tg.__aexit__()
, it will be swallowed and the cancellation count decremented back down to 0. All child tasks are already cancelled by this point (that happened in tg.cancel()
) but may still be running, and tg.__aexit__()
will wait for those tasks to finish. It will then return without exception.
All eventualities should compose together nicely (task group is cancelled; non-cancel exception raised from child task or body of task group; cancellation happens from outside) regardless of the order that they happen.
Maybe a longer example would help?
async def taskgroup_example():
async with asyncio.TaskGroup() as outer_tg:
async with asyncio.TaskGroup() as middle_tg:
async with asyncio.TaskGroup() as inner_tg:
middle_tg.cancel()
print("We DO reach here: tg.cancel() schedules CancelledError but does not raise it directly")
await asyncio.sleep(0)
print("We do NOT reach here: sleep(0) raises CancelledError")
print("We do NOT reach here: inner_tg.__aexit__() propagates cancellation")
print("We DO reach here: middle_tg was cancelled but is now complete")
print("We DO reach here: outer_tg is unaffected")
- There is a slight race here. The await expression might already have been scheduled to resume with some other raturn value or exception. For example, if the body of the task group is waiting on
await my_queue.get()
, and another task callsmy_queue.put_nowait(foo)
immediately followed bytg.cancel()
, then the await expression will return the value you pushed. But then the next await expression will actually raise the cancellation. But, again, this is just the usual behaviour for asyncio cancellation. ↩︎