Issue 32382: Python mulitiprocessing.Queue fail to get according to correct sequence (original) (raw)
I try to implement a "producer consumer" like design with mulitiprocessing module in my project, but I found that mulitiprocessing.Queue 's behavior is not as my expected. It seems Queue.get method return the end flag at the end of my queue too early.
I am not experienced at muliti-process programing, I am not sure it's a bug or not. For reproduce this, I have simplified my code as following:
import time
import multiprocessing as mp
def worker(task_queue, output_queue):
while 1:
i = task_queue.get()
if i is None:
print("Process-%d done"%mp.current_process().pid)
task_queue.task_done()
break
output_queue.put(i+1)
task_queue.task_done()
def outputer(output_queue):
c = 0 # val for count how many obj geted
while 1:
j = output_queue.get()
if j is None:
print("Process(output)-%d done"%mp.current_process().pid)
c += 1
print("outputer get %d objects from the output_queue"%c)
assert output_queue.empty(), "output queue should be empty here"
break
time.sleep(0.0001) # do output here
c += 1
if __name__ == "__main__":
task_queue = mp.JoinableQueue()
#output_queue = mp.JoinableQueue()
output_queue = mp.Queue()
workers = [mp.Process(target=worker, args=(task_queue, output_queue))
for i in range(10)]
outputer = mp.Process(target=outputer, args=(output_queue,))
for w in workers:
w.start()
outputer.start()
for i in range(10**6):
task_queue.put(i)
for w in workers: # put end flag to task queue
task_queue.put(None)
task_queue.join() # wait all tasks done
print("all tasks done.")
print("queue size before put end flag: %d"%output_queue.qsize())
output_queue.put(None) # put end flag to output queue
print("end")
Get the output:
Process-20923 done Process-20931 done Process-20925 done Process-20930 done Process-20927 done Process-20929 done Process-20928 done Process-20926 done Process-20924 done Process-20932 done all tasks done. queue size before put end flag: 914789 end Process(output)-20933 done outputer get 90383 objects from the output_queue Process Process-11: Traceback (most recent call last): File "/home/nanguage/S/miniconda3/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap self.run() File "/home/nanguage/S/miniconda3/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "joinablequeue.py", line 27, in outputer assert output_queue.empty(), "output queue should be empty here" AssertionError: output queue should be empty here
I have wait all worker put the output to the output queue use taks_queue.join(), then I put the end flag to the output queue, but according to outputer's printed information, it get the None
end flag before other value in the queue. It seems queue not get value according to 'FIFO' rule.
First thing: the code uses the global name outputer
for two different things, as the name of a module function and as the global name given to the Process object running that function. At least on Windows under Python 3.6.4 that confusion prevents the program from running. So rename one of them.
Then comes the pain ;-) A multiprocessing queue is a rather complex object under the covers, and the docs don't really spell out all the details. Maybe they should.
The docs do vaguely sketch that a "feeder thread" is created in each process using an mp.queue, which feeds object you .put() from an internal buffer into an interprocess pipe. The internal buffer is needed in case you .put() so many objects so fast that feeding them into a pipe directly would cause the OS pipe functions to fail.
And that happens in your case: you have 10 producers running at full speed overwhelming a single slow consumer. Most of the data enqueued by output_queue.put(i+1) is sitting in those internal buffers most of the time, and the base interprocess pipe doesn't know anything about them for the duration.
The practical consequence: while the queue always reflects the order in which objects were .put() within a single process, there's no guarantee about ordering across processes. Objects are fed from internal buffers into the shared pipe whenever a process's feeder thread happens to wake up and sees that the pipe isn't "too full". task_queue.task_done() only records that an object has been taken off of task_queue and given to output_queue.put(i+1); most of the time, the latter just sticks i+1 into an internal buffer because output_queue's shared pipe is too full to accept another object.
Given that this is how things actually work, what you can do instead is add:
for w in workers:
w.join()
somwehere before output_queue.put(None). A worker process doesn't end until its feeder thread(s) complete feeding all the internal buffer objects into pipes, so .join()'ing a worker is the one "obvious" way to guarantee that all the worker's .put() actions have wholly completed.
In which case, there's no point to using a JoinableQueue at all - .task_done() no longer serves any real purpose in the code then.