cpython: 1ba0deb52223 (original) (raw)
Mercurial > cpython
changeset 100221:1ba0deb52223
Issue #23992: multiprocessing: make MapResult not fail-fast upon exception. [#23992]
Charles-François Natali cf.natali@gmail.com | |
---|---|
date | Wed, 10 Feb 2016 22:58:18 +0000 |
parents | eae2bb1930c1 |
children | c7eff18f3840 |
files | Lib/multiprocessing/pool.py Lib/test/_test_multiprocessing.py Misc/NEWS |
diffstat | 3 files changed, 38 insertions(+), 8 deletions(-)[+] [-] Lib/multiprocessing/pool.py 20 Lib/test/_test_multiprocessing.py 24 Misc/NEWS 2 |
line wrap: on
line diff
--- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -638,22 +638,26 @@ class MapResult(ApplyResult): self._number_left = length//chunksize + bool(length % chunksize) def _set(self, i, success_result):
self._number_left -= 1[](#l1.7) success, result = success_result[](#l1.8)
if success:[](#l1.9)
if success and self._success:[](#l1.10) self._value[i*self._chunksize:(i+1)*self._chunksize] = result[](#l1.11)
self._number_left -= 1[](#l1.12) if self._number_left == 0:[](#l1.13) if self._callback:[](#l1.14) self._callback(self._value)[](#l1.15) del self._cache[self._job][](#l1.16) self._event.set()[](#l1.17) else:[](#l1.18)
self._success = False[](#l1.19)
self._value = result[](#l1.20)
if self._error_callback:[](#l1.21)
self._error_callback(self._value)[](#l1.22)
del self._cache[self._job][](#l1.23)
self._event.set()[](#l1.24)
if not success and self._success:[](#l1.25)
# only store first exception[](#l1.26)
self._success = False[](#l1.27)
self._value = result[](#l1.28)
if self._number_left == 0:[](#l1.29)
# only consider the result ready once all jobs are done[](#l1.30)
if self._error_callback:[](#l1.31)
self._error_callback(self._value)[](#l1.32)
del self._cache[self._job][](#l1.33)
self._event.set()[](#l1.34)
Class whose instances are returned by Pool.imap()
--- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1660,6 +1660,10 @@ def sqr(x, wait=0.0): def mul(x, y): return x*y +def raise_large_valuerror(wait):
+ class SayWhenError(ValueError): pass def exception_throwing_generator(total, when): @@ -1895,6 +1899,26 @@ class _TestPool(BaseTestCase): with self.assertRaises(RuntimeError): p.apply(self._test_wrapped_exception)
- def test_map_no_failfast(self):
# Issue #23992: the fail-fast behaviour when an exception is raised[](#l2.19)
# during map() would make Pool.join() deadlock, because a worker[](#l2.20)
# process would fill the result queue (after the result handler thread[](#l2.21)
# terminated, hence not draining it anymore).[](#l2.22)
t_start = time.time()[](#l2.24)
with self.assertRaises(ValueError):[](#l2.26)
with self.Pool(2) as p:[](#l2.27)
try:[](#l2.28)
p.map(raise_large_valuerror, [0, 1])[](#l2.29)
finally:[](#l2.30)
time.sleep(0.5)[](#l2.31)
p.close()[](#l2.32)
p.join()[](#l2.33)
# check that we indeed waited for all jobs[](#l2.35)
self.assertGreater(time.time() - t_start, 0.9)[](#l2.36)
+ def raising(): raise KeyError("key")
--- a/Misc/NEWS +++ b/Misc/NEWS @@ -179,6 +179,8 @@ Core and Builtins Library ------- +- Issue #23992: multiprocessing: make MapResult not fail-fast upon exception. +