bpo-29861: release references to multiprocessing Pool tasks (#743) (#… · python/cpython@80cb6ed (original) (raw)

3 files changed

lines changed

Original file line number Diff line number Diff line change
@@ -128,6 +128,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
128 128 util.debug("Possible encoding error while sending result: %s" % (
129 129 wrapped))
130 130 put((job, i, (False, wrapped)))
131 +
132 +task = job = result = func = args = kwds = None
131 133 completed += 1
132 134 util.debug('worker exiting after %d tasks' % completed)
133 135
@@ -402,10 +404,11 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache):
402 404 if set_length:
403 405 util.debug('doing set_length()')
404 406 set_length(i+1)
407 +finally:
408 +task = taskseq = job = None
405 409 else:
406 410 util.debug('task handler got sentinel')
407 411
408 -
409 412 try:
410 413 # tell result handler to finish when cache is empty
411 414 util.debug('task handler sending sentinel to result handler')
@@ -445,6 +448,7 @@ def _handle_results(outqueue, get, cache):
445 448 cache[job]._set(i, obj)
446 449 except KeyError:
447 450 pass
451 +task = job = obj = None
448 452
449 453 while cache and thread._state != TERMINATE:
450 454 try:
@@ -461,6 +465,7 @@ def _handle_results(outqueue, get, cache):
461 465 cache[job]._set(i, obj)
462 466 except KeyError:
463 467 pass
468 +task = job = obj = None
464 469
465 470 if hasattr(outqueue, '_reader'):
466 471 util.debug('ensuring that outqueue is not full')
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
18 18 import logging
19 19 import struct
20 20 import operator
21 +import weakref
21 22 import test.support
22 23 import test.support.script_helper
23 24
@@ -1668,6 +1669,19 @@ def sqr(x, wait=0.0):
1668 1669 def mul(x, y):
1669 1670 return x*y
1670 1671
1672 +def identity(x):
1673 +return x
1674 +
1675 +class CountedObject(object):
1676 +n_instances = 0
1677 +
1678 +def __new__(cls):
1679 +cls.n_instances += 1
1680 +return object.__new__(cls)
1681 +
1682 +def __del__(self):
1683 +type(self).n_instances -= 1
1684 +
1671 1685 class SayWhenError(ValueError): pass
1672 1686
1673 1687 def exception_throwing_generator(total, when):
@@ -1676,6 +1690,7 @@ def exception_throwing_generator(total, when):
1676 1690 raise SayWhenError("Somebody said when")
1677 1691 yield i
1678 1692
1693 +
1679 1694 class _TestPool(BaseTestCase):
1680 1695
1681 1696 @classmethod
@@ -1910,6 +1925,19 @@ def test_wrapped_exception(self):
1910 1925 with self.assertRaises(RuntimeError):
1911 1926 p.apply(self._test_wrapped_exception)
1912 1927
1928 +def test_release_task_refs(self):
1929 +# Issue #29861: task arguments and results should not be kept
1930 +# alive after we are done with them.
1931 +objs = [CountedObject() for i in range(10)]
1932 +refs = [weakref.ref(o) for o in objs]
1933 +self.pool.map(identity, objs)
1934 +
1935 +del objs
1936 +self.assertEqual(set(wr() for wr in refs), {None})
1937 +# With a process pool, copies of the objects are returned, check
1938 +# they were released too.
1939 +self.assertEqual(CountedObject.n_instances, 0)
1940 +
1913 1941
1914 1942 def raising():
1915 1943 raise KeyError("key")
Original file line number Diff line number Diff line change
@@ -46,6 +46,9 @@ Extension Modules
46 46 Library
47 47 -------
48 48
49 +- bpo-29861: Release references to tasks, their arguments and their results
50 + as soon as they are finished in multiprocessing.Pool.
51 +
49 52 - bpo-29884: faulthandler: Restore the old sigaltstack during teardown.
50 53 Patch by Christophe Zeitouny.
51 54