Issue 20147: multiprocessing.Queue.get() raises queue.Empty exception if even if an item is available (original) (raw)

The behaviour of multiprocessing.Queue surprised me today in that Queue.get() may raise an exception even if an item is immediately available. I tried to flush entries without blocking by using the timeout=0 keyword argument: $ /opt/python3/bin/python3 Python 3.4.0b1 (default:247f12fecf2b, Jan 6 2014, 14:50:23) [GCC 4.6.3] on linux Type "help", "copyright", "credits" or "license" for more information.

from multiprocessing import Queue q = Queue() q.put("hi") q.get(timeout=0) Traceback (most recent call last): File "", line 1, in File "/opt/python3/lib/python3.4/multiprocessing/queues.py", line 107, in get raise Empty queue.Empty

Actually even passing a small non-zero timeout will not give me my queue entry:

q.get(timeout=1e-6) Traceback (most recent call last): File "", line 1, in File "/home/torsten/opensrc/cpython/Lib/multiprocessing/queues.py", line 107, in get raise Empty queue.Empty

Expected behaviour for me would be to return the item that is in the queue. I know that there is a kwarg block which gives me the desired behaviour:

q.get(block=False) 'hi' In my case the get call is embedded in my own module which does not currently expose the block parameter. My local solution is of course to update the wrapper:

if timeout == 0: timeout = None block = False

However I see a few smells here in the python standard library. First, everything else seems to accept timeout=0 as nonblocking:

import threading lock = threading.Lock() lock.acquire(timeout=0) True from queue import Queue q = Queue() q.put("hi") q.get(timeout=0) 'hi' Of special note is that queue.Queue behaves as I would have expected. IMHO it should be consistent with multiprocessing.Queue.

Also note that queue.Queue.get() and queue.Queue.put() name their blocking flag "block", while everybody else uses "blocking".

As a side note, I think the current approach is flawed in computing the deadline. Basically it does the following:

deadline = time.time() + timeout
if not self._rlock.acquire(block, timeout):
    raise Empty
timeout = deadline - time.time()
if timeout < 0 or not self._poll(timeout):
    raise Empty

On my system, just taking the time twice and computing the delta takes 2 microseconds:

import time t0 = time.time(); time.time() - t0 2.384185791015625e-06

Therefore calling Queue.get(block, timeout) with 0 < timeout < 2e-6 will never return anything from the queue even though Queue.get(block=False) would do that. This contradicts the idea that Queue.get(block=False) will return faster than with block=True with any timeout > 0.

Apart from that, as Python does not currently support waiting on multiple sources, we currently often check a queue with a small timeout concurrently with doing other stuff. In case the system get really loaded, I would expect this to cause problems because the updated timeout may fall below zero.

Suggested patch attached.

We have a similar bug with Queue.get().

Queue.get(False) raises an exception Queue.Empty in the case when the queue is actually not empty!

An example of the code is attached and is listed below just in case:


import multiprocessing import Queue

class TestWorker(multiprocessing.Process):

def __init__(self, inQueue):
    multiprocessing.Process.__init__(self)
    self.inQueue = inQueue

def run(self):
    while True:
        try:
            task = self.inQueue.get(False)
        except Queue.Empty:
            # I suppose that Queue.Empty exception is about empty queue 
            # and self.inQueue.empty() must be true in this case
            # try to check it using assert
            assert self.inQueue.empty()
            break

def runTest(): queue = multiprocessing.Queue() for _ in xrange(10**5): queue.put(1) workers = [TestWorker(queue) for _ in xrange(4)] map(lambda w: w.start(), workers) map(lambda w: w.join(), workers)

if name == "main": runTest()

This same issue came up recently in . Really, it should have been addressed in this issue here first and marked as a duplicate of this one but these things don't always happen in a synchronous or apparently-linear fashion.

Adding to what is captured in , specifically referring to the points raised here in this issue:

  1. A call to put does not mean that the data put on the queue is instantly/atomically available for retrieval via get. Situations where a call to put is immediately followed by a non-blocking call to get are asking for a race-condition -- this is a principal reason for having blocking calls with timeouts.
  2. A call to get resulting in an Empty exception of course does not mean that the queue is forevermore empty, only that the queue is empty at the moment the call to get was made -- the facility for trapping the Empty and trying again to get more data off the queue provides welcome flexibility on top of the use of blocking/non-blocking calls with/without timeouts.
  3. A call to empty is, as indicated in the documentation, not to be considered reliable because of the semantics in coordinating the queue's state and data between processes/threads.
  4. Alexei's contributions to this issue are very nearly identical to what is discussed in and are addressed well there.
  5. As to using a timeout value too small to be effective (i.e. < 2e-6), really this is one example of the larger concern of choosing an appropriate timeout value. In the proposed patch, ensuring that a call to self._poll is made no matter what might potentially buy additional time for the data to be synced and made available (admittedly a happy result, but a fragile, inadvertent win) but it does not address the rest of how get, put, and the others work nor will it necessarily solve the issue being raised here.

In Alexei's example, changing the call to get from a non-blocking call to a blocking call with a reasonably small timeout will reliably ensure that everything put on the queue can and will be gotten back by the rest of that code.

In multiprocessing, we have queues to help us make data available to and across processes and threads alike -- we must recognize that coordinating data across distinct processes (especially) takes a non-zero amount of time -- hence we have the tools of blocking as well as non-blocking calls both with or without timeouts to properly implement robust code in these situations.