[2.7] bpo-34172: multiprocessing.Pool leaks resources after being del… · python/cpython@4a7dd30 (original) (raw)
`@@ -162,7 +162,9 @@ def init(self, processes=None, initializer=None, initargs=(),
`
162
162
``
163
163
`self._worker_handler = threading.Thread(
`
164
164
`target=Pool._handle_workers,
`
165
``
`-
args=(self, )
`
``
165
`+
args=(self._cache, self._processes, self._pool, self.Process,
`
``
166
`+
self._inqueue, self._outqueue, self._initializer,
`
``
167
`+
self._initargs, self._maxtasksperchild, self._taskqueue)
`
166
168
` )
`
167
169
`self._worker_handler.daemon = True
`
168
170
`self._worker_handler._state = RUN
`
`@@ -194,42 +196,56 @@ def init(self, processes=None, initializer=None, initargs=(),
`
194
196
`exitpriority=15
`
195
197
` )
`
196
198
``
197
``
`-
def _join_exited_workers(self):
`
``
199
`+
@staticmethod
`
``
200
`+
def _join_exited_workers(pool):
`
198
201
`"""Cleanup after any worker processes which have exited due to reaching
`
199
202
` their specified lifetime. Returns True if any workers were cleaned up.
`
200
203
` """
`
201
204
`cleaned = False
`
202
``
`-
for i in reversed(range(len(self._pool))):
`
203
``
`-
worker = self._pool[i]
`
``
205
`+
for i in reversed(range(len(pool))):
`
``
206
`+
worker = pool[i]
`
204
207
`if worker.exitcode is not None:
`
205
208
`# worker exited
`
206
209
`debug('cleaning up worker %d' % i)
`
207
210
`worker.join()
`
208
211
`cleaned = True
`
209
``
`-
del self._pool[i]
`
``
212
`+
del pool[i]
`
210
213
`return cleaned
`
211
214
``
212
215
`def _repopulate_pool(self):
`
``
216
`+
return self._repopulate_pool_static(self._processes, self._pool,
`
``
217
`+
self.Process, self._inqueue,
`
``
218
`+
self._outqueue, self._initializer,
`
``
219
`+
self._initargs,
`
``
220
`+
self._maxtasksperchild)
`
``
221
+
``
222
`+
@staticmethod
`
``
223
`+
def _repopulate_pool_static(processes, pool, Process, inqueue, outqueue,
`
``
224
`+
initializer, initargs, maxtasksperchild):
`
213
225
`"""Bring the number of pool processes up to the specified number,
`
214
226
` for use after reaping workers which have exited.
`
215
227
` """
`
216
``
`-
for i in range(self._processes - len(self._pool)):
`
217
``
`-
w = self.Process(target=worker,
`
218
``
`-
args=(self._inqueue, self._outqueue,
`
219
``
`-
self._initializer,
`
220
``
`-
self._initargs, self._maxtasksperchild)
`
221
``
`-
)
`
222
``
`-
self._pool.append(w)
`
``
228
`+
for i in range(processes - len(pool)):
`
``
229
`+
w = Process(target=worker,
`
``
230
`+
args=(inqueue, outqueue,
`
``
231
`+
initializer,
`
``
232
`+
initargs, maxtasksperchild)
`
``
233
`+
)
`
``
234
`+
pool.append(w)
`
223
235
`w.name = w.name.replace('Process', 'PoolWorker')
`
224
236
`w.daemon = True
`
225
237
`w.start()
`
226
238
`debug('added worker')
`
227
239
``
228
``
`-
def _maintain_pool(self):
`
``
240
`+
@staticmethod
`
``
241
`+
def _maintain_pool(processes, pool, Process, inqueue, outqueue,
`
``
242
`+
initializer, initargs, maxtasksperchild):
`
229
243
`"""Clean up any exited workers and start replacements for them.
`
230
244
` """
`
231
``
`-
if self._join_exited_workers():
`
232
``
`-
self._repopulate_pool()
`
``
245
`+
if Pool._join_exited_workers(pool):
`
``
246
`+
Pool._repopulate_pool_static(processes, pool, Process, inqueue,
`
``
247
`+
outqueue, initializer, initargs,
`
``
248
`+
maxtasksperchild)
`
233
249
``
234
250
`def _setup_queues(self):
`
235
251
`from .queues import SimpleQueue
`
`@@ -319,16 +335,18 @@ def map_async(self, func, iterable, chunksize=None, callback=None):
`
319
335
`return result
`
320
336
``
321
337
`@staticmethod
`
322
``
`-
def _handle_workers(pool):
`
``
338
`+
def _handle_workers(cache, processes, pool, Process, inqueue, outqueue,
`
``
339
`+
initializer, initargs, maxtasksperchild, taskqueue):
`
323
340
`thread = threading.current_thread()
`
324
341
``
325
342
`# Keep maintaining workers until the cache gets drained, unless the pool
`
326
343
`# is terminated.
`
327
``
`-
while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
`
328
``
`-
pool._maintain_pool()
`
``
344
`+
while thread._state == RUN or (cache and thread._state != TERMINATE):
`
``
345
`+
Pool._maintain_pool(processes, pool, Process, inqueue, outqueue,
`
``
346
`+
initializer, initargs, maxtasksperchild)
`
329
347
`time.sleep(0.1)
`
330
348
`# send sentinel to stop workers
`
331
``
`-
pool._taskqueue.put(None)
`
``
349
`+
taskqueue.put(None)
`
332
350
`debug('worker handler exiting')
`
333
351
``
334
352
`@staticmethod
`