bpo-32314: Implement asyncio.run() (#4852) · python/cpython@02a0a19 (original) (raw)
``
1
`+
import asyncio
`
``
2
`+
import unittest
`
``
3
+
``
4
`+
from unittest import mock
`
``
5
+
``
6
+
``
7
`+
class TestPolicy(asyncio.AbstractEventLoopPolicy):
`
``
8
+
``
9
`+
def init(self, loop_factory):
`
``
10
`+
self.loop_factory = loop_factory
`
``
11
`+
self.loop = None
`
``
12
+
``
13
`+
def get_event_loop(self):
`
``
14
`+
shouldn't ever be called by asyncio.run()
`
``
15
`+
raise RuntimeError
`
``
16
+
``
17
`+
def new_event_loop(self):
`
``
18
`+
return self.loop_factory()
`
``
19
+
``
20
`+
def set_event_loop(self, loop):
`
``
21
`+
if loop is not None:
`
``
22
`+
we want to check if the loop is closed
`
``
23
`+
in BaseTest.tearDown
`
``
24
`+
self.loop = loop
`
``
25
+
``
26
+
``
27
`+
class BaseTest(unittest.TestCase):
`
``
28
+
``
29
`+
def new_loop(self):
`
``
30
`+
loop = asyncio.BaseEventLoop()
`
``
31
`+
loop._process_events = mock.Mock()
`
``
32
`+
loop._selector = mock.Mock()
`
``
33
`+
loop._selector.select.return_value = ()
`
``
34
`+
loop.shutdown_ag_run = False
`
``
35
+
``
36
`+
async def shutdown_asyncgens():
`
``
37
`+
loop.shutdown_ag_run = True
`
``
38
`+
loop.shutdown_asyncgens = shutdown_asyncgens
`
``
39
+
``
40
`+
return loop
`
``
41
+
``
42
`+
def setUp(self):
`
``
43
`+
super().setUp()
`
``
44
+
``
45
`+
policy = TestPolicy(self.new_loop)
`
``
46
`+
asyncio.set_event_loop_policy(policy)
`
``
47
+
``
48
`+
def tearDown(self):
`
``
49
`+
policy = asyncio.get_event_loop_policy()
`
``
50
`+
if policy.loop is not None:
`
``
51
`+
self.assertTrue(policy.loop.is_closed())
`
``
52
`+
self.assertTrue(policy.loop.shutdown_ag_run)
`
``
53
+
``
54
`+
asyncio.set_event_loop_policy(None)
`
``
55
`+
super().tearDown()
`
``
56
+
``
57
+
``
58
`+
class RunTests(BaseTest):
`
``
59
+
``
60
`+
def test_asyncio_run_return(self):
`
``
61
`+
async def main():
`
``
62
`+
await asyncio.sleep(0)
`
``
63
`+
return 42
`
``
64
+
``
65
`+
self.assertEqual(asyncio.run(main()), 42)
`
``
66
+
``
67
`+
def test_asyncio_run_raises(self):
`
``
68
`+
async def main():
`
``
69
`+
await asyncio.sleep(0)
`
``
70
`+
raise ValueError('spam')
`
``
71
+
``
72
`+
with self.assertRaisesRegex(ValueError, 'spam'):
`
``
73
`+
asyncio.run(main())
`
``
74
+
``
75
`+
def test_asyncio_run_only_coro(self):
`
``
76
`+
for o in {1, lambda: None}:
`
``
77
`+
with self.subTest(obj=o), \
`
``
78
`+
self.assertRaisesRegex(ValueError,
`
``
79
`+
'a coroutine was expected'):
`
``
80
`+
asyncio.run(o)
`
``
81
+
``
82
`+
def test_asyncio_run_debug(self):
`
``
83
`+
async def main(expected):
`
``
84
`+
loop = asyncio.get_event_loop()
`
``
85
`+
self.assertIs(loop.get_debug(), expected)
`
``
86
+
``
87
`+
asyncio.run(main(False))
`
``
88
`+
asyncio.run(main(True), debug=True)
`
``
89
+
``
90
`+
def test_asyncio_run_from_running_loop(self):
`
``
91
`+
async def main():
`
``
92
`+
coro = main()
`
``
93
`+
try:
`
``
94
`+
asyncio.run(coro)
`
``
95
`+
finally:
`
``
96
`+
coro.close() # Suppress ResourceWarning
`
``
97
+
``
98
`+
with self.assertRaisesRegex(RuntimeError,
`
``
99
`+
'cannot be called from a running'):
`
``
100
`+
asyncio.run(main())
`