gh-96471: Add threading queue shutdown by EpicWink · Pull Request #104750 · python/cpython (original) (raw)

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Conversation33 Commits13 Checks0 Files changed

Conversation

This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.Learn more about bidirectional Unicode characters

[ Show hidden characters]({{ revealButtonHref }})

EpicWink

Contributor

@EpicWink EpicWink commented

May 22, 2023

edited by github-actionsbot

Loading

Alternate implementation of #104225, where all queue items are consumed immediately in Queue.shutdown when immediate=True is passed (see the comparison for what's changed).

This PR includes and modified changes from #104225.


📚 Documentation preview 📚: https://cpython-previews--104750.org.readthedocs.build/

@EpicWink

@YvesDup @EpicWink

@blurb-it

@EpicWink

gvanrossum

@EpicWink

@EpicWink

gvanrossum

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great progress. I mostly have some markup nits. But the tests seem to hang. Or was I just impatient?

@gvanrossum

Yeah, the test is definitely hanging for me. Have you figured that out yet?

@gvanrossum

I'd also recommend a merge. :-)

@EpicWink

@EpicWink

Yeah, the test is definitely hanging for me. Have you figured that out yet?

@gvanrossum It's intermittently hanging in the test test_shutdown_immediate_put_join. q.join is blocking, after shutting down the queue, because unfinished_tasks becomes -1 (when zero is required). This is because the test calls task_done before taking items from the queue, but the implementation of shutdown() reduces unfinished_tasks by the number of items in the queue.

I think the solution is to simply ensure unfinished_tasks doesn't go negative. I don't set it straight to zero because there may still be queue consumer who will successfully complete their task and call task_done, but they will get a ValueError if unfinished_tasks was immediately set to zero. There is a problem here however if some consumers still call task_done without getting items from the queue.

Perhaps it's simpler to set unfinished_tasks to zero, then skip the ValueError raise if the queue is shut down.

Edit: I just noticed that this pull request's description says that task_done should raise ValueError on a shut down queue. Hmmm.


It's also intermittently hanging in the test test_shutdown_put_join, simply because of a logic error: 2 items are put on the queue (sometimes), then it's shut down, then one task is marked as done, then the queue is waited for (aka joined on). If only one item was put on the queue, then the test simply fails during assertion.

Solution here (which fixes the assertion for test_shutdown_immediate_put_join as well) is to shut down the queue before running the put-then-join.


Edit: and of course now it's failing only in Windows (CI only succeeds because it re-runs test_queue)

@EpicWink

@EpicWink

@EpicWink

@EpicWink

Also shut down before put-join in shutdown-put-join tests. Also remove indent

gvanrossum

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG, assuming you believe the test is stable. I have one nit, let me know how you feel about that.

Oh, one more thing. If you feel like writing documentation, could you update Doc/whatsnew/3.13.rst? There's a section about modified modules. A few lines there will go a long way.

@EpicWink

@EpicWink

@EpicWink

LG, assuming you believe the test is stable.

Test is not stable, as test_shutdown_all_methods_in_many_threads (and the corresponding immediate version) hangs intermittently in Windows. It also fails consistently when adding debugging print-statements on my machine. I'm investigating

gvanrossum

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent! All that remains is merging it (which I will take care of), closing the alternative PR for queue.py, and then we can have another look at the asyncio and multiprocessing queues. (For the latter we still need to find a reviewer.)

@gvanrossum

@EpicWink Sorry, it looks like we have to revert this until the Windows free-threading hang has been resolved. (Although it's possible that you've uncovered a Windows-related bug in free-threading, which is a developing feature.)

@gvanrossum

FWIW, having had only a quick look at the failing test, is it possible that it relies on timeouts too much? I see that there's a 0.1 msec delay that is used for sleeping, and _read_msg_thread() makes some non-blocking get() calls, ignoring Empty exceptions. Could there be a scenario (especially when running without GIL) where different threads run in a different order than you anticipated, and it simply doesn't read enough queue items, so the join() hangs forever?

fsc-eriker pushed a commit to fsc-eriker/cpython that referenced this pull request

Feb 14, 2024

Co-authored-by: Duprat yduprat@gmail.com

This was referenced

Feb 20, 2024

@gvanrossum

From #104228 (comment): it looks like Queue.join() never raises ShutDown, contradicting the docs.

@YvesDup

Maybe I'm waking up a little too late, but despite having worked on the development 1 year ago, I have the impression that functionalities are missing in this merged PR.

In the initial version of the feature (#104225), the basic shutdown just forbade put operations, also released all threads blocked in put. On the other hand, it left get, join and task_done operations possible.

The immediate shutdown prohibited all possible previous operations and had to release all threads blocked in get, and join. And the queue was purged.

Currently, in the following methods (put, get, join and task_done), there is no specific behavior depending on the type of the shutdown. It's not even noted in an attribute.

I don't want to hurt anyone's feelings, but I think this PR needs to be reworked (including documentation).

PS: initial comment was done at the bad PR. Sorry

@EpicWink

Maybe I'm waking up a little too late, but despite having worked on the development 1 year ago, I have the impression that functionalities are missing in this merged PR.

In the initial version of the feature (#104225), the basic shutdown just forbade put operations, also released all threads blocked in put. On the other hand, it left get, join and task_done operations possible.

The immediate shutdown prohibited all possible previous operations and had to release all threads blocked in get, and join. And the queue was purged.

Currently, in the following methods (put, get, join and task_done), there is no specific behavior depending on the type of the shutdown. It's not even noted in an attribute.

@YvesDup see comment in the issue: #96471 (comment)

Basically, the goal is to have waiters (callers of queue methods) not be blocked on a queue which has been shut-down. Making the shutdown immediate simply means gets won't take anything more from the queue (and instead except).

There is a problem with this PR though, in that q.task_done() and q.join() are documented as raising ShutDown when they don't; see #115838 for the fix for that.

@YvesDup

@EpicWink , thank you for this explanation. I ve missed that :-(
Features really change from the initial version. Now it seems more efficient and simpler.
As we wrote all tests for the initial version, I 'd like to review this part because features are changed. May be we will have to remove some of tests, to modify others, or add new ones ?
Do you agree ?

@YvesDup

FYI, I fixed the failling test in _read_msg_thread() method. Loop was infinite, I insert a break on queue.Shutdow exception. .... (and I removed all time.sleep())

@gvanrossum

FYI, I fixed the failling test in _read_msg_thread() method. Loop was infinite, I insert a break on queue.Shutdow exception. .... (and I removed all time.sleep())

Where can we see this?

@YvesDup

@gvanrossum

Thanks, I'll wait for the pull request.

@aisk aisk mentioned this pull request

Feb 28, 2024

AlexWaygood

@YvesDup

Thanks, I'll wait for the pull request.

PR is ready on this issue (#115940). Sorry if I didn't understand that you wait for a PR linked to this one. Please let me know if I have to change.

@gvanrossum

@YvesDup, I am a little confused. There is indeed #115940, by you, but there is also gh-115898 by @EpicWink that also claims to fix these tests. How do the two relate to each other?

@YvesDup

How do the two relate to each other?

There are related because there are both about on the same issue. But #115898 is only a draft.
I had informed @EpicWink that I had fixed the bug on the test, which is why I had submitted a new PR.
Maybe, should we wait for his feedback before going head ?

@gvanrossum

@EpicWink

I'll review the new PR fixing the test today.

My PR is another attempt, but unfinished. Perhaps in the future it can be added as another test.

Edit: reviewed!

@YvesDup

'll review the new PR fixing the test today.

My PR is another attempt, but unfinished. Perhaps in the future it can be added as another test.

Edit: reviewed!

Thank for the review.

This was referenced

Apr 4, 2024

gvanrossum pushed a commit that referenced this pull request

Apr 10, 2024

@EpicWink

(This is a small tweak of the original gh-104750 which added shutdown.)

@EpicWink EpicWink deleted the threading-queue-shutdown-immediate-consume branch

April 11, 2024 01:09

diegorusso pushed a commit to diegorusso/cpython that referenced this pull request

Apr 17, 2024

@EpicWink @diegorusso