Issue 31092: delicate behaviour of shared (managed) multiprocessing Queues (original) (raw)
Created on 2017-07-31 20:05 by Prof Plum, last changed 2022-04-11 14:58 by admin.
Messages (9)
Author: Prof Plum (Prof Plum)
Date: 2017-07-31 20:05
So I was writing code that had multiple write thread and read thread "groups" in a single pool (in a group a few write threads write to a queue that a read thread reads), and I ran into what I think is a race condition with the multiprocessing.Manager() class. It looks managed queues are returned from Manager() before they are actually initialized and safe to use, but it is only noticeable when making many managed queues in quick succession. I've attached a simple demo script to reproduce the bug, the reason I believe this is race condition is because while the sleep(0.5) line is commented out python crashes, but when it's not it doesn't.
Also I'm on windows 10 and using 64 bit Python 3.5.2
Author: Lang (tlxxzj)
Date: 2017-09-22 10:52
code reproduce bug
KeyError in lib\multiprocessing\managers.py in incref
import multiprocessing as mp from time import sleep
def func(queue): pass
if name == 'main': manager = mp.Manager()
pool = mp.Pool(1)
queue = manager.Queue()
r = pool.apply_async(func, args = [queue])
#sleep(1)
queue = None
pool.close()
pool.join()
Author: Oren Milman (Oren Milman) *
Date: 2017-10-05 16:59
IIUC:
In Lang's example, doing queue = None
caused the destruction of the shared
queue, which caused a call to BaseProxy._decref() (in multiprocessing/managers.py),
which dispatched a decref request to the manager's server process.
Meanwhile, the pool's worker process (in function worker() in multiprocessing/pool.py) tries to retrieve a task from its task queue, by calling inqueue.get(). The get() method unpickles the first pickled task in the queue, which is the function and arguments that we passed to apply_async(). The unpickling of the shared queue causes creating a proxy object for the shared queue, in which BaseProxy.init() is called, which calls BaseProxy._incref(), which dispatches an incref request to the manager's server process.
Unfortunately, the decref request gets to the server before the incref request. So when the server receives the decref request (in Server.handle_request()), and accordingly calls Server.decref(), the refcount of the shared queue in the server is 1, so the refcount is decremented to 0, and the shared queue is disposed. Then, when the server receives the incref request, it tries to increment the refcount of the shared queue (in Server.incref()), but can't find it in its refcount dict, so it raises the KeyError. (If, for example, you added a 'sleep(0.5)' before the call to dispatch() in BaseProxy._decref(), the incref request would win the race, and the KeyError wouldn't be raised.)
Should we fix this? Or is it the responsibility of the user to not destroy shared objects too soon? (In that case, maybe we should mention it in the docs?)
The situation in the example of Prof Plum is similar. Also, note that this issue is not specific to using pool workers or to Manager.Queue. For example, we get a similar error (for similar reasons) in this code:
from multiprocessing import Process, Manager from time import sleep if name == 'main': with Manager() as manager: shared_list = manager.list() p = Process(target=sorted, args=(shared_list,)) p.start() # sleep(0.5) shared_list = None p.join()
Author: Oren Milman (Oren Milman) *
Date: 2017-10-05 17:09
Prof Plum, i changed the type of the issue to 'behavior', because Lang and me both got a KeyError. if your interpreter actually crashed, please change it back to 'crash'.
Author: Prof Plum (Prof Plum)
Date: 2017-10-05 18:54
Oh I see, I thought getting an error that caused the python code execution to terminate was considered a "crash".
On the note of whether you should fix this I think the answer is yes. When I call pool.apply_async() I expect it only to return when the worker process has been started and finished it's initialization process (i.e. sending the incr-ref request). That being said I could see people wanting to utilize the minor performance gain of having the worker start AND run asynchronously so I think this option should be available via a boolean arg to apply_async() but it should be off by default because that is the safer and intuitive behavior of apply_async().
Author: Oren Milman (Oren Milman) *
Date: 2017-10-06 09:52
Davin and Antoine, i added you to the nosy list because you are listed as multiprocessing experts :)
Author: Antoine Pitrou (pitrou) *
Date: 2017-10-08 19:55
@Prof Plum
When I call pool.apply_async() I expect it only to return when the worker process has been started and finished it's initialization process
Well... it's called async for a reason, so I'm not sure why the behaviour would be partially synchronous.
@Oren
Should we fix this?
I'm not sure how. In mp.Pool we don't want to keep references to input objects longer than necessary.
Or is it the responsibility of the user to not destroy shared objects too soon? (In that case, maybe we should mention it in the docs?)
Yes to both questions, IMO.
(note: changing title to better reflect issue)
Author: Prof Plum (Prof Plum)
Date: 2017-10-08 21:22
@Antoine Pitrou
Well... it's called async for a reason, so I'm not sure why the behaviour would be partially synchronous.
To a avoid race condition
I'm not sure how. In mp.Pool we don't want to keep references to input objects longer than necessary.
Like I said you could just add some sort of "safe" flag to the apply_async() call safe=True would mean the initialization of the worker is done synchronously safe=False would be the normal behavior. Even if you decide it's the user's responsibility to not delete the queue if the user's code is exiting a function that would basically amount to them calling sleep() for some guessed amount of time. With a safe flag they wouldn't have to guess the time or call sleep which is kinda ugly IMO. Also if someone see's that apply_async() has a safe flag they are more likely to look up what it does than they are to read the full docs to apply_async().
Author: Antoine Pitrou (pitrou) *
Date: 2017-10-09 14:36
To a avoid race condition
As Oren explained, the race condition is due to your use of the managed Queue. If you keep the object alive in the main process until the tasks have finished, there shouldn't be any problem. The question is: is there any reason you don't want to?
History
Date
User
Action
Args
2022-04-11 14:58:49
admin
set
github: 75275
2017-10-09 14:36:56
pitrou
set
messages: +
2017-10-08 21:22:37
Prof Plum
set
messages: +
2017-10-08 19:55:26
pitrou
set
messages: +
title: multiprocessing.Manager() race condition -> delicate behaviour of shared (managed) multiprocessing Queues
2017-10-06 09:52:36
Oren Milman
set
nosy: + pitrou, davin
messages: +
2017-10-05 18:54:36
Prof Plum
set
messages: +
2017-10-05 17:09:39
Oren Milman
set
messages: +
2017-10-05 16:59:36
Oren Milman
set
versions: + Python 3.7
nosy: + Oren Milman
messages: +
components: + Library (Lib)
type: crash -> behavior
2017-09-22 10:52:57
tlxxzj
set
nosy: + tlxxzj
messages: +
2017-09-21 23:32:48
Prof Plum
set
title: Potential multiprocessing.Manager() race condition -> multiprocessing.Manager() race condition
2017-07-31 20:05:27
Prof Plum
create