Issue 8426: multiprocessing.Queue fails to get() very large objects (original) (raw)

Created on 2010-04-16 18:38 by Ian.Davis, last changed 2022-04-11 14:57 by admin. This issue is now closed.

Messages (17)

msg103349 - (view)

Author: Ian Davis (Ian.Davis)

Date: 2010-04-16 18:38

I'm trying to parallelize some scientific computing jobs using multiprocessing.Pool. I've also tried rolling my own Pool equivalent using Queues. In trying to return very large result objects from Pool.map()/imap() or via Queue.put(), I've noticed that multiprocessing seems to hang on the receiving end. On Cygwin 1.7.1/Python 2.5.2 it hangs with no CPU activity. On Centos 5.2/Python 2.6.2 it hangs with 100% CPU. cPickle is perfectly capable of pickling these objects, although they may be 100's of MB, so I think it's the communication. There's also some asymmetry in the error whether it's the parent or child putting the large object. The put does appear to succeed; it's the get() on the other end that hangs forever.

Example code:

from multiprocessing import *

def child(task_q, result_q): while True: print " Getting task..." task = task_q.get() print " Got task", task[:10] task = task * 100000000 print " Putting result", task[:10] result_q.put(task) print " Done putting result", task[:10] task_q.task_done()

def parent(): task_q = JoinableQueue() result_q = JoinableQueue() worker = Process(target=child, args=(task_q,result_q)) worker.daemon = True worker.start() #tasks = ["foo", "bar", "ABC" * 100000000, "baz"] tasks = ["foo", "bar", "ABC", "baz"] for task in tasks: print "Putting task", task[:10], "..." task_q.put(task) print "Done putting task", task[:10] task_q.join() for task in tasks: print "Getting result..." print "Got result", result_q.get()[:10]

if name == 'main': parent()

If run as is, I get Traceback (most recent call last): File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/queues.py", line 242, in _feed send(obj) MemoryError: out of memory (*** hangs, I hit ^C ***) Got result Traceback (most recent call last): Process Process-1: Traceback (most recent call last): File "cygwin_multiprocessing_queue.py", line 32, in File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/process.py", line 237, in _bootstrap parent() File "cygwin_multiprocessing_queue.py", line 29, in parent print "Got result", result_q.get()[:10] self.run() File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/process.py", line 93, in run File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/queues.py", line 91, in get self._target(*self._args, **self._kwargs) File "cygwin_multiprocessing_queue.py", line 6, in child res = self._recv() KeyboardInterrupt task = task_q.get() File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/queues.py", line 91, in get res = self._recv() KeyboardInterrupt

If instead I comment out the multiplication in child() and uncomment the large task in parent(), then I get Getting task... Putting task foo ... Done putting task foo Putting task bar ... Got task foo Putting result foo Done putting task bar Putting task ABCABCABCA ... Done putting task ABCABCABCA Putting task baz ... Done putting result foo Getting task... Got task bar Putting result bar Done putting result bar Getting task... Done putting task baz (*** hangs, I hit ^C ***) Traceback (most recent call last): File "cygwin_multiprocessing_queue.py", line 32, in parent() File "cygwin_multiprocessing_queue.py", line 26, in parent task_q.join() File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/queues.py", line 303, in join self._cond.wait() File "/usr/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-cygwin-1.7.1-i686.egg/multiprocessing/synchronize.py", line 212, in wait self._wait_semaphore.acquire(True, timeout) KeyboardInterrupt

msg113031 - (view)

Author: Terry J. Reedy (terry.reedy) * (Python committer)

Date: 2010-08-05 19:46

By 'crash', do you actually mean 'hang'? Jesse, is it reasonable to stuff pickles of 100s of megabytes through the connections?

msg113033 - (view)

Author: Jesse Noller (jnoller) * (Python committer)

Date: 2010-08-05 19:51

I don't know that it's unreasonable to send that much data, but it would certainly be slow, and I would not recommend it. Therefore, this is still on the list for when I have time

msg123663 - (view)

Author: Brian Cain (Brian.Cain) *

Date: 2010-12-09 00:01

I don't think the problem is limited to when hundreds of megabytes are being transmitted. I believe I am experiencing a problem with the same root cause whose symptoms are slightly different. It seems like there's a threshhold which causes not merely poor performance, but likely an unrecoverable fault.

Here's the output when I run my example on SLES11.1:

$ ./multiproc.py $((8*1024)) 2 on 2.6 (r26:66714, May 5 2010, 14:02:39) [GCC 4.3.4 [gcc-4_3-branch revision 152973]] - Linux linux 2.6.32.12-0.7-default #1 SMP 2010-05-20 11:14:20 +0200 x86_64 x86_64 0 entries in flight, join() took 5949.97 usec, get() did 0.000000 items/sec 2 entries in flight, join() took 1577.85 usec, get() did 42581.766497 items/sec 4 entries in flight, join() took 1966.00 usec, get() did 65536.000000 items/sec 6 entries in flight, join() took 1894.00 usec, get() did 105296.334728 items/sec 8 entries in flight, join() took 1420.02 usec, get() did 199728.761905 items/sec 10 entries in flight, join() took 1950.03 usec, get() did 163840.000000 items/sec 12 entries in flight, join() took 1241.92 usec, get() did 324720.309677 items/sec ... 7272 entries in flight, join() took 2516.03 usec, get() did 10983427.687432 items/sec 7274 entries in flight, join() took 1813.17 usec, get() did 10480717.037444 items/sec 7276 entries in flight, join() took 1979.11 usec, get() did 11421315.832335 items/sec 7278 entries in flight, join() took 2043.01 usec, get() did 11549808.744608 items/sec ^C7280 entries: join() ABORTED by user after 83.08 sec ...

I see similar results when I run this test with a larger step, I just wanted to get finer resolution on the failure point.

msg123666 - (view)

Author: Terry J. Reedy (terry.reedy) * (Python committer)

Date: 2010-12-09 02:28

2.6.6 was the last bugfix release

msg123671 - (view)

Author: Brian Cain (Brian.Cain) *

Date: 2010-12-09 06:16

I was able to reproduce the problem on a more recent release.

7279 entries fails, 7278 entries succeeds.

$ ./multiproc3.py on 3.1.2 (r312:79147, Apr 15 2010, 12:35:07) [GCC 4.4.3] - Linux mini 2.6.32-26-generic #47-Ubuntu SMP Wed Nov 17 15:59:05 UTC 2010 i686 7278 entries in flight, join() took 12517.93 usec, get() did 413756.736588 items/sec 7278 entries in flight, join() took 19458.06 usec, get() did 345568.562217 items/sec 7278 entries in flight, join() took 21326.07 usec, get() did 382006.563784 items/sec 7278 entries in flight, join() took 14937.16 usec, get() did 404244.835554 items/sec 7278 entries in flight, join() took 18877.98 usec, get() did 354435.878968 items/sec 7278 entries in flight, join() took 20811.08 usec, get() did 408343.738456 items/sec 7278 entries in flight, join() took 14394.04 usec, get() did 423727.055218 items/sec 7278 entries in flight, join() took 18940.21 usec, get() did 361012.624762 items/sec 7278 entries in flight, join() took 19073.96 usec, get() did 367559.024118 items/sec 7278 entries in flight, join() took 16229.87 usec, get() did 424764.763755 items/sec 7278 entries in flight, join() took 18527.03 usec, get() did 355546.367937 items/sec 7278 entries in flight, join() took 21500.11 usec, get() did 390429.802164 items/sec 7278 entries in flight, join() took 13646.84 usec, get() did 410468.669903 items/sec 7278 entries in flight, join() took 18921.14 usec, get() did 355873.819767 items/sec 7278 entries in flight, join() took 13582.94 usec, get() did 287553.877353 items/sec 7278 entries in flight, join() took 21958.11 usec, get() did 405549.873285 items/sec ^C7279 entries: join() ABORTED by user after 5.54 sec ^CError in atexit._run_exitfuncs: Segmentation fault

msg123672 - (view)

Author: Brian Cain (Brian.Cain) *

Date: 2010-12-09 06:18

Detailed stack trace when the failure occurs (gdb_stack_trace.txt)

msg129229 - (view)

Author: Charles-François Natali (neologix) * (Python committer)

Date: 2011-02-23 23:05

Alright, it's normal behaviour, but since it doesn't seem to be documented, it can be quite surprising. A queue works like this:

In multiproc3.py, the items are first appended to the queue, then the sender process is waited on. But when size = 7279, the data submitted reaches 64k, so the writting thread blocks on the write syscall. And since a join is performed before dequeing the item, you just deadlock, since the join waits for the sending thread to complete, and the write can't complete since the pipe/socket is full! If you dequeue the item before waiting the submitter process, everything works fine:

t0 = time.time()
try:
    get_t0 = time.time()
    vals = q.get(timeout=3.)
    get_duration = time.time() - get_t0

    s.join()

Now, for the initial report, the problem is related:

def child(task_q, result_q): while True: print " Getting task..." task = task_q.get() print " Got task", task[:10] task = task * 100000000 print " Putting result", task[:10] result_q.put(task) print " Done putting result", task[:10] task_q.task_done()

tasks = ["foo", "bar", "ABC", "baz"]
for task in tasks:
    print "Putting task", task[:10], "..."
    task_q.put(task)
    print "Done putting task", task[:10]
task_q.join()
for task in tasks:
    print "Getting result..."
    print "Got result", result_q.get()[:10]

When the child puts results, since they're bigger tha 64k, the underlying pipe/socket fills up. Thus, the sending thread blocks on the write, and doesn't dequeue the result_q, which keeps growing. So you end up storing in the result_q every object before starting to dequeue them, which represents roughly 4 * 3 * 1e8 = 1.2GB, which could explain the out-of-memory errors (and if it's Unicode string it's even much more)... So the moral is: don't put() to much data to a queue without dequeuing them in a concurrent process...

msg133654 - (view)

Author: Charles-François Natali (neologix) * (Python committer)

Date: 2011-04-13 08:51

It's documented in http://docs.python.org/library/multiprocessing.html#multiprocessing-programming : """ Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread() method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be automatically be joined. """

Suggesting to close.

msg134009 - (view)

Author: Brian Cain (Brian.Cain) *

Date: 2011-04-19 02:54

Please don't close the issue.

Joining aside, the basic point ("But when size = 7279, the data submitted reaches 64k, so the writting thread blocks on the write syscall.") is not clear from the docs, right?

IMO, it would be nice if I could ask my queue, "Just what is your capacity (in bytes, not entries) anyways? I want to know how much I can put in here without worrying about whether the remote side is dequeueing." I guess I'd settle for explicit documentation that the bound exists. But how should I expect my code to be portable? Are there platforms which provide less than 64k? Less than 1k? Less than 256 bytes?

msg134010 - (view)

Author: Terry J. Reedy (terry.reedy) * (Python committer)

Date: 2011-04-19 03:38

Please do not send html responses, as they result in a spurious 'unnamed' file being attached.

Please do suggest a specific change.

Should this be changed to a doc issue?

msg134082 - (view)

Author: Charles-François Natali (neologix) * (Python committer)

Date: 2011-04-19 17:17

IMO, it would be nice if I could ask my queue, "Just what is your capacity (in bytes, not entries) anyways? I want to know how much I can put in here without worrying about whether the remote side is dequeueing." I guess I'd settle for explicit documentation that the bound exists.

It is documented. See the comment about the "underlying pipe".

But how should I expect my code to be portable? Are there platforms which provide less than 64k? Less than 1k? Less than 256 bytes?

It depends :-) If the implementation is using pipes, under Linux before 2.6.9 (I think), a pipe was limited by the size of a page, i.e. 4K on x86. Now, it's 64K. If it's a Unix socket (via socketpair), the maximum size can be set through sysctl, etc. So you can't basically state a limit, and IMHO, you should't be concerned with that if you want your code to be portable. I find the warning excplicit enough, be that's maybe because I'm familiar with this low-level details.

msg134369 - (view)

Author: Matt Goodman (Matt.Goodman)

Date: 2011-04-25 02:30

You can not pickle individual objects larger than 2**31. This failure is not handled cleanly in the core module, and I suspect masked by above processes.

Try piping "a"*(2**31) through you pipe, or pickling it to disk . . .

msg135232 - (view)

Author: Charles-François Natali (neologix) * (Python committer)

Date: 2011-05-05 19:08

You can not pickle individual objects larger than 2**31.

Indeed, but that's not what's happening here, the failure occurs with much smaller objects (also, note the OP's "cPickle is perfectly capable of pickling these objects"). The problem is really with the underlying pipe/unix socket filling up.

msg143075 - (view)

Author: Vinay Sajip (vinay.sajip) * (Python committer)

Date: 2011-08-27 15:13

I think it's just a documentation issue. The problem with documenting limits is that they are system-specific and, even if the current limits that Charles-François has mentioned are documented, these could become outdated. Perhaps a suggestion could be added to the documentation:

"Avoid sending very large amounts of data via queues, as you could come up against system-dependent limits according to the operating system and whether pipes or sockets are used. You could consider an alternative strategy, such as writing large data blocks to temporary files and sending just the temporary file names via queues, relying on the consumer to delete the temporary files after processing."

msg143081 - (view)

Author: Charles-François Natali (neologix) * (Python committer)

Date: 2011-08-27 15:46

"Avoid sending very large amounts of data via queues, as you could come up against system-dependent limits according to the operating system and whether pipes or sockets are used. You could consider an alternative strategy, such as writing large data blocks to temporary files and sending just the temporary file names via queues, relying on the consumer to delete the temporary files after processing."

There's a misunderstanding here: there is absolutely no limit on the size of objects that can be put through a queue (apart from the host's memory and the 32-bit limit): the problem is really that you can't just put an arbitrary buch of data to a queue, and then join it before making sure other processes will eventually pop all the data from the queue. I.e., you can't do:

q = Queue() for i in range(1000000): q.put() q.join()

for i in range(10000000): q.get()

That's because join() will wait until the feeder thread has managed to write all the items to the underlying pipe/Unix socket, and this might hang if the underlying pipe/socket is full (which will happen after one has put around 128K without having popped any item).

That's what's explained here:

It's documented in http://docs.python.org/library/multiprocessing.html#multiprocessing-programming : """ Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the Queue.cancel_join_thread() method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be automatically be joined. """

If find this wording really clear, but if someone comes up with a better - i.e. less technical - wording, go ahead.

msg254550 - (view)

Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer)

Date: 2015-11-12 20:09

I agree with Charles-François and think this issue should be closed. There is no a bug, and the behavior is documented.

History

Date

User

Action

Args

2022-04-11 14:57:00

admin

set

github: 52673

2015-11-26 17:21:29

serhiy.storchaka

set

status: pending -> closed
resolution: not a bug
stage: resolved

2015-11-12 20:09:35

serhiy.storchaka

set

status: open -> pending
nosy: + serhiy.storchaka
messages: +

2011-09-16 07:59:19

neologix

link

issue8237 superseder

2011-08-28 16:32:13

neologix

set

priority: normal -> low
nosy: + docs@python
components: + Documentation, - Library (Lib)

2011-08-27 15:47:00

neologix

set

messages: +

2011-08-27 15:13:03

vinay.sajip

set

nosy: + vinay.sajip
messages: +

2011-05-10 01:11:02

osvenskan

set

nosy: + osvenskan

2011-05-05 19:08:29

neologix

set

messages: +

2011-04-25 02:30:40

Matt.Goodman

set

nosy: + Matt.Goodman
messages: +

2011-04-19 17:17:06

neologix

set

messages: +

2011-04-19 03:38:30

terry.reedy

set

messages: +

2011-04-19 03:35:40

terry.reedy

set

files: - unnamed

2011-04-19 02:54:10

Brian.Cain

set

files: + unnamed

messages: +

2011-04-13 08:51:48

neologix

set

messages: +

2011-02-23 23:05:44

neologix

set

nosy: + neologix
messages: +

2010-12-09 06🔞35

Brian.Cain

set

files: + gdb_stack_trace.txt

messages: +

2010-12-09 06:16:42

Brian.Cain

set

files: + multiproc3.py

messages: +

2010-12-09 02:28:14

terry.reedy

set

type: crash -> behavior
messages: +
versions: - Python 2.6

2010-12-09 00:01:39

Brian.Cain

set

files: + multiproc.py
versions: + Python 2.6
nosy: + Brian.Cain

messages: +

2010-08-05 19:51:22

jnoller

set

messages: +

2010-08-05 19:46:49

terry.reedy

set

nosy: + terry.reedy

messages: +
versions: - Python 2.6

2010-04-18 18:43:12

pitrou

set

priority: normal
assignee: jnoller

nosy: + jnoller
versions: + Python 3.1, Python 2.7, Python 3.2, - Python 2.5

2010-04-16 18:38:05

Ian.Davis

create