Issue 1455676: Simplify using Queues with consumer threads (original) (raw)

Created on 2006-03-21 21:36 by rhettinger, last changed 2022-04-11 14:56 by admin. This issue is now closed.

Messages (6)

msg49806 - (view)

Author: Raymond Hettinger (rhettinger) * (Python committer)

Date: 2006-03-21 21:36

When Queues are used to communicate between producer and consumer threads, there is often a need to determine when all of the enqueued tasks have been completed.

With this small patch, determining when all work is done is as simple as adding q.task_done() to each consumer thread and q.join() to the main thread.

Without the patch, the next best approach is to count the number of puts, create a second queue filled by the consumer when a task is done, and for the main thread to call successive blocking gets on the result queue until all of the puts have been accounted for:

def worker(): 
    while 1: 
        task = tasks_in.get() 
        do_work(task) 
        tasks_out.put(None)

tasks_in = Queue() 
tasks_out = Queue() 
for i in range(num_worker_threads): 
     Thread(target=worker).start()

n = 0 
for elem in source():
    n += 1
    tasks_in.put(elem) 

# block until tasks are done 
for i in range(n): 
    tasks_out.get()

That approach is not complicated but it does entail more lines of code and tracking some auxiliary data. This becomes cumersome and error-prone when an app has multiple occurences of q.put() and q.get().

The patch essentially encapsulates this approach into two methods, making it effortless to use and easy to graft on to existing uses of Queue. So, the above code simplies to:

def worker(): 
    while 1: 
        task = q.get() 
        do_work(task) 
        q.task_done() 

q = Queue() 
for i in range(num_worker_threads): 
     Thread(target=worker).start() 

for elem in source():
    q.put(elem) 

# block until tasks are done 
q.join() 

The put counting is automatic, there is no need for a separate queue object, the code readably expresses its intent with clarity. Also, it is easy to inpect for accuracy, each get() followed by a task_done().
The ease of inspection remains even when there are multiple gets and puts scattered through the code (a situtation which would become complicated for the two Queue approach).

If accepted, will add docs with an example.

Besides being a fast, lean, elegant solution, the other reason to accept the patch is that the underlying problem appears again and again, requiring some measure to invention to solve it each time.
There are a number of approaches but none as simple, fast, or as broadly applicable as having the queue itself track items loaded and items completed.

msg49807 - (view)

Author: Raymond Hettinger (rhettinger) * (Python committer)

Date: 2006-03-21 22:27

Logged In: YES user_id=80475

Tim, do you have a chance to look at this?

msg49808 - (view)

Author: Tim Peters (tim.peters) * (Python committer)

Date: 2006-03-22 01:42

Logged In: YES user_id=31435

Yup, I'll try to make time tomorrow (can't today). Offhand it sounds like a nice addition to me.

msg49809 - (view)

Author: Raymond Hettinger (rhettinger) * (Python committer)

Date: 2006-03-22 06:02

Logged In: YES user_id=80475

Thanks. There are two particular areas for extra attention.

First, should the waiter acquire/release pairs be in a try/finally (iow, is there some behavior in notify() or release() that potentially needs to be trapped)?

Second, should the notify() in task_done() really be a notifyAll() (iow, does it make sense that multiple joins may be pending)?

Thanks again.

msg49810 - (view)

Author: Tim Peters (tim.peters) * (Python committer)

Date: 2006-03-24 00:17

Logged In: YES user_id=31435

I marked this as Accepted, but there are some things I'd like to see changed:

msg49811 - (view)

Author: Raymond Hettinger (rhettinger) * (Python committer)

Date: 2006-03-24 20:44

Logged In: YES user_id=80475

Committed as revision 43298.