cpython: 26bbff4562a7 (original) (raw)
Mercurial > cpython
changeset 76712:26bbff4562a7 2.7
Issue #9400: Partial backport of fix for #9244 In multiprocessing, a pool worker process would die if the result/error could not be pickled. This could cause pool methods to hang. In 3.x this was fixed by 0aa8af79359d (which also added an error_callback argument to some methods), but the fix was not back ported. [#9400]
Richard Oudkerk shibturn@gmail.com | |
---|---|
date | Wed, 02 May 2012 16:36:26 +0100 |
parents | 3b2aa777b725 |
children | 45f0272f5296 |
files | Lib/multiprocessing/pool.py Lib/test/test_multiprocessing.py |
diffstat | 2 files changed, 42 insertions(+), 1 deletions(-)[+] [-] Lib/multiprocessing/pool.py 25 Lib/test/test_multiprocessing.py 18 |
line wrap: on
line diff
--- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -68,6 +68,23 @@ def mapstar(args):
Code run by worker processes
# +class MaybeEncodingError(Exception):
- def init(self, exc, value):
self.exc = repr(exc)[](#l1.12)
self.value = repr(value)[](#l1.13)
super(MaybeEncodingError, self).__init__(self.exc, self.value)[](#l1.14)
- def str(self):
return "Error sending result: '%s'. Reason: '%s'" % (self.value,[](#l1.17)
self.exc)[](#l1.18)
+ + def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None): assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0) put = outqueue.put @@ -96,7 +113,13 @@ def worker(inqueue, outqueue, initialize result = (True, func(*args, **kwds)) except Exception, e: result = (False, e)
put((job, i, result))[](#l1.31)
try:[](#l1.32)
put((job, i, result))[](#l1.33)
except Exception as e:[](#l1.34)
wrapped = MaybeEncodingError(e, result[1])[](#l1.35)
debug("Possible encoding error while sending result: %s" % ([](#l1.36)
wrapped))[](#l1.37)
debug('worker exiting after %d tasks' % completed)put((job, i, (False, wrapped)))[](#l1.38) completed += 1[](#l1.39)
--- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -1152,6 +1152,24 @@ class _TestPool(BaseTestCase): join() self.assertTrue(join.elapsed < 0.2) +def unpickleable_result():
+ +class _TestPoolWorkerErrors(BaseTestCase):
- def test_unpickleable_result(self):
from multiprocessing.pool import MaybeEncodingError[](#l2.14)
p = multiprocessing.Pool(2)[](#l2.15)
# Make sure we don't lose pool processes because of encoding errors.[](#l2.17)
for iteration in range(20):[](#l2.18)
res = p.apply_async(unpickleable_result)[](#l2.19)
self.assertRaises(MaybeEncodingError, res.get)[](#l2.20)
p.close()[](#l2.22)
p.join()[](#l2.23)
+ class _TestPoolWorkerLifetime(BaseTestCase): ALLOWED_TYPES = ('processes', )