[3.6] Revert "bpo-34172: multiprocessing.Pool leaks resources after b… · python/cpython@eb38ee0 (original) (raw)
`@@ -147,9 +147,8 @@ class Pool(object):
`
147
147
` '''
`
148
148
`_wrap_exception = True
`
149
149
``
150
``
`-
@staticmethod
`
151
``
`-
def Process(ctx, *args, **kwds):
`
152
``
`-
return ctx.Process(*args, **kwds)
`
``
150
`+
def Process(self, *args, **kwds):
`
``
151
`+
return self._ctx.Process(*args, **kwds)
`
153
152
``
154
153
`def init(self, processes=None, initializer=None, initargs=(),
`
155
154
`maxtasksperchild=None, context=None):
`
`@@ -176,15 +175,13 @@ def init(self, processes=None, initializer=None, initargs=(),
`
176
175
``
177
176
`self._worker_handler = threading.Thread(
`
178
177
`target=Pool._handle_workers,
`
179
``
`-
args=(self._cache, self._taskqueue, self._ctx, self.Process,
`
180
``
`-
self._processes, self._pool, self._inqueue, self._outqueue,
`
181
``
`-
self._initializer, self._initargs, self._maxtasksperchild,
`
182
``
`-
self._wrap_exception)
`
``
178
`+
args=(self, )
`
183
179
` )
`
184
180
`self._worker_handler.daemon = True
`
185
181
`self._worker_handler._state = RUN
`
186
182
`self._worker_handler.start()
`
187
183
``
``
184
+
188
185
`self._task_handler = threading.Thread(
`
189
186
`target=Pool._handle_tasks,
`
190
187
`args=(self._taskqueue, self._quick_put, self._outqueue,
`
`@@ -210,62 +207,43 @@ def init(self, processes=None, initializer=None, initargs=(),
`
210
207
`exitpriority=15
`
211
208
` )
`
212
209
``
213
``
`-
@staticmethod
`
214
``
`-
def _join_exited_workers(pool):
`
``
210
`+
def _join_exited_workers(self):
`
215
211
`"""Cleanup after any worker processes which have exited due to reaching
`
216
212
` their specified lifetime. Returns True if any workers were cleaned up.
`
217
213
` """
`
218
214
`cleaned = False
`
219
``
`-
for i in reversed(range(len(pool))):
`
220
``
`-
worker = pool[i]
`
``
215
`+
for i in reversed(range(len(self._pool))):
`
``
216
`+
worker = self._pool[i]
`
221
217
`if worker.exitcode is not None:
`
222
218
`# worker exited
`
223
219
`util.debug('cleaning up worker %d' % i)
`
224
220
`worker.join()
`
225
221
`cleaned = True
`
226
``
`-
del pool[i]
`
``
222
`+
del self._pool[i]
`
227
223
`return cleaned
`
228
224
``
229
225
`def _repopulate_pool(self):
`
230
``
`-
return self._repopulate_pool_static(self._ctx, self.Process,
`
231
``
`-
self._processes,
`
232
``
`-
self._pool, self._inqueue,
`
233
``
`-
self._outqueue, self._initializer,
`
234
``
`-
self._initargs,
`
235
``
`-
self._maxtasksperchild,
`
236
``
`-
self._wrap_exception)
`
237
``
-
238
``
`-
@staticmethod
`
239
``
`-
def _repopulate_pool_static(ctx, Process, processes, pool, inqueue,
`
240
``
`-
outqueue, initializer, initargs,
`
241
``
`-
maxtasksperchild, wrap_exception):
`
242
226
`"""Bring the number of pool processes up to the specified number,
`
243
227
` for use after reaping workers which have exited.
`
244
228
` """
`
245
``
`-
for i in range(processes - len(pool)):
`
246
``
`-
w = Process(ctx, target=worker,
`
247
``
`-
args=(inqueue, outqueue,
`
248
``
`-
initializer,
`
249
``
`-
initargs, maxtasksperchild,
`
250
``
`-
wrap_exception)
`
251
``
`-
)
`
252
``
`-
pool.append(w)
`
``
229
`+
for i in range(self._processes - len(self._pool)):
`
``
230
`+
w = self.Process(target=worker,
`
``
231
`+
args=(self._inqueue, self._outqueue,
`
``
232
`+
self._initializer,
`
``
233
`+
self._initargs, self._maxtasksperchild,
`
``
234
`+
self._wrap_exception)
`
``
235
`+
)
`
``
236
`+
self._pool.append(w)
`
253
237
`w.name = w.name.replace('Process', 'PoolWorker')
`
254
238
`w.daemon = True
`
255
239
`w.start()
`
256
240
`util.debug('added worker')
`
257
241
``
258
``
`-
@staticmethod
`
259
``
`-
def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue,
`
260
``
`-
initializer, initargs, maxtasksperchild,
`
261
``
`-
wrap_exception):
`
``
242
`+
def _maintain_pool(self):
`
262
243
`"""Clean up any exited workers and start replacements for them.
`
263
244
` """
`
264
``
`-
if Pool._join_exited_workers(pool):
`
265
``
`-
Pool._repopulate_pool_static(ctx, Process, processes, pool,
`
266
``
`-
inqueue, outqueue, initializer,
`
267
``
`-
initargs, maxtasksperchild,
`
268
``
`-
wrap_exception)
`
``
245
`+
if self._join_exited_workers():
`
``
246
`+
self._repopulate_pool()
`
269
247
``
270
248
`def _setup_queues(self):
`
271
249
`self._inqueue = self._ctx.SimpleQueue()
`
`@@ -418,20 +396,16 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
`
418
396
`return result
`
419
397
``
420
398
`@staticmethod
`
421
``
`-
def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
`
422
``
`-
inqueue, outqueue, initializer, initargs,
`
423
``
`-
maxtasksperchild, wrap_exception):
`
``
399
`+
def _handle_workers(pool):
`
424
400
`thread = threading.current_thread()
`
425
401
``
426
402
`# Keep maintaining workers until the cache gets drained, unless the pool
`
427
403
`# is terminated.
`
428
``
`-
while thread._state == RUN or (cache and thread._state != TERMINATE):
`
429
``
`-
Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
`
430
``
`-
outqueue, initializer, initargs,
`
431
``
`-
maxtasksperchild, wrap_exception)
`
``
404
`+
while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
`
``
405
`+
pool._maintain_pool()
`
432
406
`time.sleep(0.1)
`
433
407
`# send sentinel to stop workers
`
434
``
`-
taskqueue.put(None)
`
``
408
`+
pool._taskqueue.put(None)
`
435
409
`util.debug('worker handler exiting')
`
436
410
``
437
411
`@staticmethod
`
`@@ -807,7 +781,7 @@ class ThreadPool(Pool):
`
807
781
`_wrap_exception = False
`
808
782
``
809
783
`@staticmethod
`
810
``
`-
def Process(ctx, *args, **kwds):
`
``
784
`+
def Process(*args, **kwds):
`
811
785
`from .dummy import Process
`
812
786
`return Process(*args, **kwds)
`
813
787
``