bpo-36719: Fix regrtest MultiprocessThread (GH-13301) (GH-13303) · python/cpython@d8e123a (original) (raw)
`@@ -21,6 +21,9 @@
`
21
21
`# Display the running tests if nothing happened last N seconds
`
22
22
`PROGRESS_UPDATE = 30.0 # seconds
`
23
23
``
``
24
`+
Time to wait until a worker completes: should be immediate
`
``
25
`+
JOIN_TIMEOUT = 30.0 # seconds
`
``
26
+
24
27
``
25
28
`def must_stop(result, ns):
`
26
29
`if result.result == INTERRUPTED:
`
`@@ -91,6 +94,10 @@ def stop(self):
`
91
94
`MultiprocessResult = collections.namedtuple('MultiprocessResult',
`
92
95
`'result stdout stderr error_msg')
`
93
96
``
``
97
`+
class ExitThread(Exception):
`
``
98
`+
pass
`
``
99
+
``
100
+
94
101
`class MultiprocessThread(threading.Thread):
`
95
102
`def init(self, pending, output, ns):
`
96
103
`super().init()
`
`@@ -100,13 +107,31 @@ def init(self, pending, output, ns):
`
100
107
`self.current_test_name = None
`
101
108
`self.start_time = None
`
102
109
`self._popen = None
`
``
110
`+
self._killed = False
`
``
111
+
``
112
`+
def repr(self):
`
``
113
`+
info = ['MultiprocessThread']
`
``
114
`+
test = self.current_test_name
`
``
115
`+
if self.is_alive():
`
``
116
`+
info.append('alive')
`
``
117
`+
if test:
`
``
118
`+
info.append(f'test={test}')
`
``
119
`+
popen = self._popen
`
``
120
`+
if popen:
`
``
121
`+
info.append(f'pid={popen.pid}')
`
``
122
`+
return '<%s>' % ' '.join(info)
`
103
123
``
104
124
`def kill(self):
`
``
125
`+
self._killed = True
`
``
126
+
105
127
`popen = self._popen
`
106
128
`if popen is None:
`
107
129
`return
`
108
``
`-
print("Kill regrtest worker process %s" % popen.pid)
`
109
130
`popen.kill()
`
``
131
`+
stdout and stderr must be closed to ensure that communicate()
`
``
132
`+
does not hang
`
``
133
`+
popen.stdout.close()
`
``
134
`+
popen.stderr.close()
`
110
135
``
111
136
`def _runtest(self, test_name):
`
112
137
`try:
`
`@@ -117,7 +142,21 @@ def _runtest(self, test_name):
`
117
142
`popen = self._popen
`
118
143
`with popen:
`
119
144
`try:
`
120
``
`-
stdout, stderr = popen.communicate()
`
``
145
`+
if self._killed:
`
``
146
`+
If kill() has been called before self._popen is set,
`
``
147
`+
self._popen is still running. Call again kill()
`
``
148
`+
to ensure that the process is killed.
`
``
149
`+
self.kill()
`
``
150
`+
raise ExitThread
`
``
151
+
``
152
`+
try:
`
``
153
`+
stdout, stderr = popen.communicate()
`
``
154
`+
except OSError:
`
``
155
`+
if self._killed:
`
``
156
`+
kill() has been called: communicate() fails
`
``
157
`+
on reading closed stdout/stderr
`
``
158
`+
raise ExitThread
`
``
159
`+
raise
`
121
160
`except:
`
122
161
`self.kill()
`
123
162
`popen.wait()
`
`@@ -154,7 +193,7 @@ def _runtest(self, test_name):
`
154
193
`return MultiprocessResult(result, stdout, stderr, err_msg)
`
155
194
``
156
195
`def run(self):
`
157
``
`-
while True:
`
``
196
`+
while not self._killed:
`
158
197
`try:
`
159
198
`try:
`
160
199
`test_name = next(self.pending)
`
`@@ -166,6 +205,8 @@ def run(self):
`
166
205
``
167
206
`if must_stop(mp_result.result, self.ns):
`
168
207
`break
`
``
208
`+
except ExitThread:
`
``
209
`+
break
`
169
210
`except BaseException:
`
170
211
`self.output.put((True, traceback.format_exc()))
`
171
212
`break
`
`@@ -205,10 +246,20 @@ def start_workers(self):
`
205
246
`worker.start()
`
206
247
``
207
248
`def wait_workers(self):
`
``
249
`+
start_time = time.monotonic()
`
208
250
`for worker in self.workers:
`
209
251
`worker.kill()
`
210
252
`for worker in self.workers:
`
211
``
`-
worker.join()
`
``
253
`+
while True:
`
``
254
`+
worker.join(1.0)
`
``
255
`+
if not worker.is_alive():
`
``
256
`+
break
`
``
257
`+
dt = time.monotonic() - start_time
`
``
258
`+
print("Wait for regrtest worker %r for %.1f sec" % (worker, dt))
`
``
259
`+
if dt > JOIN_TIMEOUT:
`
``
260
`+
print("Warning -- failed to join a regrtest worker %s"
`
``
261
`+
% worker)
`
``
262
`+
break
`
212
263
``
213
264
`def _get_result(self):
`
214
265
`if not any(worker.is_alive() for worker in self.workers):
`