bpo-28699: fix abnormal behaviour of pools in multiprocessing.pool (G… · python/cpython@346dcd6 (original) (raw)

`@@ -118,7 +118,7 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,

`

118

118

`try:

`

119

119

`result = (True, func(*args, **kwds))

`

120

120

`except Exception as e:

`

121

``

`-

if wrap_exception:

`

``

121

`+

if wrap_exception and func is not _helper_reraises_exception:

`

122

122

`e = ExceptionWithTraceback(e, e.traceback)

`

123

123

`result = (False, e)

`

124

124

`try:

`

`@@ -133,6 +133,10 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,

`

133

133

`completed += 1

`

134

134

`util.debug('worker exiting after %d tasks' % completed)

`

135

135

``

``

136

`+

def _helper_reraises_exception(ex):

`

``

137

`+

'Pickle-able helper function for use by _guarded_task_generation.'

`

``

138

`+

raise ex

`

``

139

+

136

140

`#

`

137

141

`# Class representing a process pool

`

138

142

`#

`

`@@ -277,6 +281,17 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None,

`

277

281

`return self._map_async(func, iterable, starmapstar, chunksize,

`

278

282

`callback, error_callback)

`

279

283

``

``

284

`+

def _guarded_task_generation(self, result_job, func, iterable):

`

``

285

`+

'''Provides a generator of tasks for imap and imap_unordered with

`

``

286

`+

appropriate handling for iterables which throw exceptions during

`

``

287

`+

iteration.'''

`

``

288

`+

try:

`

``

289

`+

i = -1

`

``

290

`+

for i, x in enumerate(iterable):

`

``

291

`+

yield (result_job, i, func, (x,), {})

`

``

292

`+

except Exception as e:

`

``

293

`+

yield (result_job, i+1, _helper_reraises_exception, (e,), {})

`

``

294

+

280

295

`def imap(self, func, iterable, chunksize=1):

`

281

296

`'''

`

282

297

`` Equivalent of map() -- can be MUCH slower than Pool.map().

``

`@@ -285,15 +300,23 @@ def imap(self, func, iterable, chunksize=1):

`

285

300

`raise ValueError("Pool not running")

`

286

301

`if chunksize == 1:

`

287

302

`result = IMapIterator(self._cache)

`

288

``

`-

self._taskqueue.put((((result._job, i, func, (x,), {})

`

289

``

`-

for i, x in enumerate(iterable)), result._set_length))

`

``

303

`+

self._taskqueue.put(

`

``

304

`+

(

`

``

305

`+

self._guarded_task_generation(result._job, func, iterable),

`

``

306

`+

result._set_length

`

``

307

`+

))

`

290

308

`return result

`

291

309

`else:

`

292

310

`assert chunksize > 1

`

293

311

`task_batches = Pool._get_tasks(func, iterable, chunksize)

`

294

312

`result = IMapIterator(self._cache)

`

295

``

`-

self._taskqueue.put((((result._job, i, mapstar, (x,), {})

`

296

``

`-

for i, x in enumerate(task_batches)), result._set_length))

`

``

313

`+

self._taskqueue.put(

`

``

314

`+

(

`

``

315

`+

self._guarded_task_generation(result._job,

`

``

316

`+

mapstar,

`

``

317

`+

task_batches),

`

``

318

`+

result._set_length

`

``

319

`+

))

`

297

320

`return (item for chunk in result for item in chunk)

`

298

321

``

299

322

`def imap_unordered(self, func, iterable, chunksize=1):

`

`@@ -304,15 +327,23 @@ def imap_unordered(self, func, iterable, chunksize=1):

`

304

327

`raise ValueError("Pool not running")

`

305

328

`if chunksize == 1:

`

306

329

`result = IMapUnorderedIterator(self._cache)

`

307

``

`-

self._taskqueue.put((((result._job, i, func, (x,), {})

`

308

``

`-

for i, x in enumerate(iterable)), result._set_length))

`

``

330

`+

self._taskqueue.put(

`

``

331

`+

(

`

``

332

`+

self._guarded_task_generation(result._job, func, iterable),

`

``

333

`+

result._set_length

`

``

334

`+

))

`

309

335

`return result

`

310

336

`else:

`

311

337

`assert chunksize > 1

`

312

338

`task_batches = Pool._get_tasks(func, iterable, chunksize)

`

313

339

`result = IMapUnorderedIterator(self._cache)

`

314

``

`-

self._taskqueue.put((((result._job, i, mapstar, (x,), {})

`

315

``

`-

for i, x in enumerate(task_batches)), result._set_length))

`

``

340

`+

self._taskqueue.put(

`

``

341

`+

(

`

``

342

`+

self._guarded_task_generation(result._job,

`

``

343

`+

mapstar,

`

``

344

`+

task_batches),

`

``

345

`+

result._set_length

`

``

346

`+

))

`

316

347

`return (item for chunk in result for item in chunk)

`

317

348

``

318

349

`def apply_async(self, func, args=(), kwds={}, callback=None,

`

`@@ -323,7 +354,7 @@ def apply_async(self, func, args=(), kwds={}, callback=None,

`

323

354

`if self._state != RUN:

`

324

355

`raise ValueError("Pool not running")

`

325

356

`result = ApplyResult(self._cache, callback, error_callback)

`

326

``

`-

self._taskqueue.put(([(result._job, None, func, args, kwds)], None))

`

``

357

`+

self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))

`

327

358

`return result

`

328

359

``

329

360

`def map_async(self, func, iterable, chunksize=None, callback=None,

`

`@@ -354,8 +385,14 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,

`

354

385

`task_batches = Pool._get_tasks(func, iterable, chunksize)

`

355

386

`result = MapResult(self._cache, chunksize, len(iterable), callback,

`

356

387

`error_callback=error_callback)

`

357

``

`-

self._taskqueue.put((((result._job, i, mapper, (x,), {})

`

358

``

`-

for i, x in enumerate(task_batches)), None))

`

``

388

`+

self._taskqueue.put(

`

``

389

`+

(

`

``

390

`+

self._guarded_task_generation(result._job,

`

``

391

`+

mapper,

`

``

392

`+

task_batches),

`

``

393

`+

None

`

``

394

`+

)

`

``

395

`+

)

`

359

396

`return result

`

360

397

``

361

398

`@staticmethod

`

`@@ -377,33 +414,27 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache):

`

377

414

``

378

415

`for taskseq, set_length in iter(taskqueue.get, None):

`

379

416

`task = None

`

380

``

`-

i = -1

`

381

417

`try:

`

382

``

`-

for i, task in enumerate(taskseq):

`

``

418

`+

iterating taskseq cannot fail

`

``

419

`+

for task in taskseq:

`

383

420

`if thread._state:

`

384

421

`util.debug('task handler found thread._state != RUN')

`

385

422

`break

`

386

423

`try:

`

387

424

`put(task)

`

388

425

`except Exception as e:

`

389

``

`-

job, ind = task[:2]

`

``

426

`+

job, idx = task[:2]

`

390

427

`try:

`

391

``

`-

cache[job]._set(ind, (False, e))

`

``

428

`+

cache[job]._set(idx, (False, e))

`

392

429

`except KeyError:

`

393

430

`pass

`

394

431

`else:

`

395

432

`if set_length:

`

396

433

`util.debug('doing set_length()')

`

397

``

`-

set_length(i+1)

`

``

434

`+

idx = task[1] if task else -1

`

``

435

`+

set_length(idx + 1)

`

398

436

`continue

`

399

437

`break

`

400

``

`-

except Exception as ex:

`

401

``

`-

job, ind = task[:2] if task else (0, 0)

`

402

``

`-

if job in cache:

`

403

``

`-

cache[job]._set(ind + 1, (False, ex))

`

404

``

`-

if set_length:

`

405

``

`-

util.debug('doing set_length()')

`

406

``

`-

set_length(i+1)

`

407

438

`finally:

`

408

439

`task = taskseq = job = None

`

409

440

`else:

`