Issue 31092: delicate behaviour of shared (managed) multiprocessing Queues (original) (raw)

Created on 2017-07-31 20:05 by Prof Plum, last changed 2022-04-11 14:58 by admin.

Messages (9)

msg299582 - (view)

Author: Prof Plum (Prof Plum)

Date: 2017-07-31 20:05

So I was writing code that had multiple write thread and read thread "groups" in a single pool (in a group a few write threads write to a queue that a read thread reads), and I ran into what I think is a race condition with the multiprocessing.Manager() class. It looks managed queues are returned from Manager() before they are actually initialized and safe to use, but it is only noticeable when making many managed queues in quick succession. I've attached a simple demo script to reproduce the bug, the reason I believe this is race condition is because while the sleep(0.5) line is commented out python crashes, but when it's not it doesn't.

Also I'm on windows 10 and using 64 bit Python 3.5.2

msg302742 - (view)

Author: Lang (tlxxzj)

Date: 2017-09-22 10:52

code reproduce bug

KeyError in lib\multiprocessing\managers.py in incref

import multiprocessing as mp from time import sleep

def func(queue): pass

if name == 'main': manager = mp.Manager()

pool = mp.Pool(1)

queue = manager.Queue()
r = pool.apply_async(func, args = [queue])
#sleep(1)
queue = None

pool.close()
pool.join()

msg303776 - (view)

Author: Oren Milman (Oren Milman) *

Date: 2017-10-05 16:59

IIUC: In Lang's example, doing queue = None caused the destruction of the shared queue, which caused a call to BaseProxy._decref() (in multiprocessing/managers.py), which dispatched a decref request to the manager's server process.

Meanwhile, the pool's worker process (in function worker() in multiprocessing/pool.py) tries to retrieve a task from its task queue, by calling inqueue.get(). The get() method unpickles the first pickled task in the queue, which is the function and arguments that we passed to apply_async(). The unpickling of the shared queue causes creating a proxy object for the shared queue, in which BaseProxy.init() is called, which calls BaseProxy._incref(), which dispatches an incref request to the manager's server process.

Unfortunately, the decref request gets to the server before the incref request. So when the server receives the decref request (in Server.handle_request()), and accordingly calls Server.decref(), the refcount of the shared queue in the server is 1, so the refcount is decremented to 0, and the shared queue is disposed. Then, when the server receives the incref request, it tries to increment the refcount of the shared queue (in Server.incref()), but can't find it in its refcount dict, so it raises the KeyError. (If, for example, you added a 'sleep(0.5)' before the call to dispatch() in BaseProxy._decref(), the incref request would win the race, and the KeyError wouldn't be raised.)

Should we fix this? Or is it the responsibility of the user to not destroy shared objects too soon? (In that case, maybe we should mention it in the docs?)

The situation in the example of Prof Plum is similar. Also, note that this issue is not specific to using pool workers or to Manager.Queue. For example, we get a similar error (for similar reasons) in this code:

from multiprocessing import Process, Manager from time import sleep if name == 'main': with Manager() as manager: shared_list = manager.list() p = Process(target=sorted, args=(shared_list,)) p.start() # sleep(0.5) shared_list = None p.join()

msg303778 - (view)

Author: Oren Milman (Oren Milman) *

Date: 2017-10-05 17:09

Prof Plum, i changed the type of the issue to 'behavior', because Lang and me both got a KeyError. if your interpreter actually crashed, please change it back to 'crash'.

msg303780 - (view)

Author: Prof Plum (Prof Plum)

Date: 2017-10-05 18:54

Oh I see, I thought getting an error that caused the python code execution to terminate was considered a "crash".

On the note of whether you should fix this I think the answer is yes. When I call pool.apply_async() I expect it only to return when the worker process has been started and finished it's initialization process (i.e. sending the incr-ref request). That being said I could see people wanting to utilize the minor performance gain of having the worker start AND run asynchronously so I think this option should be available via a boolean arg to apply_async() but it should be off by default because that is the safer and intuitive behavior of apply_async().

msg303812 - (view)

Author: Oren Milman (Oren Milman) *

Date: 2017-10-06 09:52

Davin and Antoine, i added you to the nosy list because you are listed as multiprocessing experts :)

msg303916 - (view)

Author: Antoine Pitrou (pitrou) * (Python committer)

Date: 2017-10-08 19:55

@Prof Plum

When I call pool.apply_async() I expect it only to return when the worker process has been started and finished it's initialization process

Well... it's called async for a reason, so I'm not sure why the behaviour would be partially synchronous.

@Oren

Should we fix this?

I'm not sure how. In mp.Pool we don't want to keep references to input objects longer than necessary.

Or is it the responsibility of the user to not destroy shared objects too soon? (In that case, maybe we should mention it in the docs?)

Yes to both questions, IMO.

(note: changing title to better reflect issue)

msg303921 - (view)

Author: Prof Plum (Prof Plum)

Date: 2017-10-08 21:22

@Antoine Pitrou

Well... it's called async for a reason, so I'm not sure why the behaviour would be partially synchronous.

To a avoid race condition

I'm not sure how. In mp.Pool we don't want to keep references to input objects longer than necessary.

Like I said you could just add some sort of "safe" flag to the apply_async() call safe=True would mean the initialization of the worker is done synchronously safe=False would be the normal behavior. Even if you decide it's the user's responsibility to not delete the queue if the user's code is exiting a function that would basically amount to them calling sleep() for some guessed amount of time. With a safe flag they wouldn't have to guess the time or call sleep which is kinda ugly IMO. Also if someone see's that apply_async() has a safe flag they are more likely to look up what it does than they are to read the full docs to apply_async().

msg303964 - (view)

Author: Antoine Pitrou (pitrou) * (Python committer)

Date: 2017-10-09 14:36

To a avoid race condition

As Oren explained, the race condition is due to your use of the managed Queue. If you keep the object alive in the main process until the tasks have finished, there shouldn't be any problem. The question is: is there any reason you don't want to?

History

Date

User

Action

Args

2022-04-11 14:58:49

admin

set

github: 75275

2017-10-09 14:36:56

pitrou

set

messages: +

2017-10-08 21:22:37

Prof Plum

set

messages: +

2017-10-08 19:55:26

pitrou

set

messages: +
title: multiprocessing.Manager() race condition -> delicate behaviour of shared (managed) multiprocessing Queues

2017-10-06 09:52:36

Oren Milman

set

nosy: + pitrou, davin
messages: +

2017-10-05 18:54:36

Prof Plum

set

messages: +

2017-10-05 17:09:39

Oren Milman

set

messages: +

2017-10-05 16:59:36

Oren Milman

set

versions: + Python 3.7
nosy: + Oren Milman

messages: +

components: + Library (Lib)
type: crash -> behavior

2017-09-22 10:52:57

tlxxzj

set

nosy: + tlxxzj
messages: +

2017-09-21 23:32:48

Prof Plum

set

title: Potential multiprocessing.Manager() race condition -> multiprocessing.Manager() race condition

2017-07-31 20:05:27

Prof Plum

create