bpo-22087: Fix Policy.get_event_loop() to detect fork (GH-7208) · python/cpython@5d97b7b (original) (raw)
3 files changed
lines changed
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -625,16 +625,23 @@ class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): | ||
625 | 625 | |
626 | 626 | class _Local(threading.local): |
627 | 627 | _loop = None |
628 | +_pid = None | |
628 | 629 | _set_called = False |
629 | 630 | |
630 | 631 | def __init__(self): |
631 | 632 | self._local = self._Local() |
633 | +self._local._pid = os.getpid() | |
632 | 634 | |
633 | 635 | def get_event_loop(self): |
634 | 636 | """Get the event loop. |
635 | 637 | |
636 | 638 | This may be None or an instance of EventLoop. |
637 | 639 | """ |
640 | +if self._local._pid != os.getpid(): | |
641 | +# If we detect we're in a child process forked by multiprocessing, | |
642 | +# we reset self._local so that we'll get a new event loop. | |
643 | +self._local = self._Local() | |
644 | + | |
638 | 645 | if (self._local._loop is None and |
639 | 646 | not self._local._set_called and |
640 | 647 | isinstance(threading.current_thread(), threading._MainThread)): |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -13,6 +13,7 @@ | ||
13 | 13 | import tempfile |
14 | 14 | import threading |
15 | 15 | import unittest |
16 | +import multiprocessing | |
16 | 17 | from unittest import mock |
17 | 18 | from test import support |
18 | 19 | |
@@ -1804,6 +1805,37 @@ def create_watcher(self): | ||
1804 | 1805 | return asyncio.FastChildWatcher() |
1805 | 1806 | |
1806 | 1807 | |
1808 | +class ForkedProcessTests(unittest.TestCase): | |
1809 | +def setUp(self): | |
1810 | +self.parent_loop = asyncio.SelectorEventLoop() | |
1811 | +asyncio.set_event_loop(self.parent_loop) | |
1812 | +self.ctx = multiprocessing.get_context("fork") | |
1813 | + | |
1814 | +def tearDown(self): | |
1815 | +self.parent_loop.close() | |
1816 | + | |
1817 | +def _check_loops_not_equal(self, old_loop): | |
1818 | +loop = asyncio.get_event_loop() | |
1819 | +if loop is old_loop: | |
1820 | +raise RuntimeError("Child process inherited parent's event loop") | |
1821 | + | |
1822 | +try: | |
1823 | +val = loop.run_until_complete(asyncio.sleep(0.05, result=42)) | |
1824 | +if val != 42: | |
1825 | +raise RuntimeError("new event loop does not work") | |
1826 | +finally: | |
1827 | +loop.close() | |
1828 | + | |
1829 | +sys.exit(loop is old_loop) | |
1830 | + | |
1831 | +def test_new_loop_in_child(self): | |
1832 | +p = self.ctx.Process(target=self._check_loops_not_equal, | |
1833 | +args=(self.parent_loop,)) | |
1834 | +p.start() | |
1835 | +p.join() | |
1836 | +self.assertEqual(p.exitcode, 0) | |
1837 | + | |
1838 | + | |
1807 | 1839 | class PolicyTests(unittest.TestCase): |
1808 | 1840 | |
1809 | 1841 | def create_policy(self): |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
1 | +Fix Policy.get_event_loop() to detect fork and return a new loop. | |
2 | + | |
3 | +Original patch by Dan O'Reilly. |