Skip to content

Commit 3065a96

Browse files
author
Oleksandr Baltian
committed
Release all buffersize_lock obj from the parent thread when terminate
1 parent 6bbe0f2 commit 3065a96

1 file changed

Lines changed: 51 additions & 53 deletions

File tree

Lib/multiprocessing/pool.py

Lines changed: 51 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,11 @@ def __init__(self, processes=None, initializer=None, initargs=(),
191191
self._ctx = context or get_context()
192192
self._setup_queues()
193193
self._taskqueue = queue.SimpleQueue()
194+
# The _taskqueue_buffersize_semaphores exist to allow calling .release()
195+
# on every active semaphore when the pool is terminating to let task_handler
196+
# wake up to stop. It's a dict so that each iterator object can efficiently
197+
# deregister its semaphore when iterator finishes.
198+
self._taskqueue_buffersize_semaphores = {}
194199
# The _change_notifier queue exist to wake up self._handle_workers()
195200
# when the cache (self._cache) is empty or when there is a change in
196201
# the _state variable of the thread that runs _handle_workers.
@@ -257,7 +262,8 @@ def __init__(self, processes=None, initializer=None, initargs=(),
257262
self, self._terminate_pool,
258263
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
259264
self._change_notifier, self._worker_handler, self._task_handler,
260-
self._result_handler, self._cache),
265+
self._result_handler, self._cache,
266+
self._taskqueue_buffersize_semaphores),
261267
exitpriority=15
262268
)
263269
self._state = RUN
@@ -383,33 +389,27 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None,
383389
return self._map_async(func, iterable, starmapstar, chunksize,
384390
callback, error_callback)
385391

386-
def _guarded_task_generation(self, result_job, func, iterable):
392+
def _guarded_task_generation(self, result_job, func, iterable,
393+
buffersize_sema=None):
387394
'''Provides a generator of tasks for imap and imap_unordered with
388395
appropriate handling for iterables which throw exceptions during
389396
iteration.'''
390397
try:
391398
i = -1
392-
for i, x in enumerate(iterable):
393-
yield (result_job, i, func, (x,), {})
394399

395-
except Exception as e:
396-
yield (result_job, i+1, _helper_reraises_exception, (e,), {})
400+
if buffersize_sema is None:
401+
for i, x in enumerate(iterable):
402+
yield (result_job, i, func, (x,), {})
397403

398-
def _guarded_task_generation_lazy(self, result_job, func, iterable,
399-
backpressure_sema):
400-
"""Provides a generator of tasks for imap and imap_unordered with
401-
appropriate handling for iterables which throw exceptions during
402-
iteration."""
403-
try:
404-
i = -1
405-
enumerated_iter = iter(enumerate(iterable))
406-
while True:
407-
backpressure_sema.acquire()
408-
try:
409-
i, x = next(enumerated_iter)
410-
except StopIteration:
411-
break
412-
yield (result_job, i, func, (x,), {})
404+
else:
405+
enumerated_iter = iter(enumerate(iterable))
406+
while True:
407+
buffersize_sema.acquire()
408+
try:
409+
i, x = next(enumerated_iter)
410+
except StopIteration:
411+
break
412+
yield (result_job, i, func, (x,), {})
413413

414414
except Exception as e:
415415
yield (result_job, i+1, _helper_reraises_exception, (e,), {})
@@ -428,19 +428,11 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
428428
raise ValueError("buffersize must be None or > 0")
429429

430430
result = IMapIterator(self, buffersize)
431-
432-
if result._backpressure_sema is None:
433-
task_generation = self._guarded_task_generation
434-
else:
435-
task_generation = functools.partial(
436-
self._guarded_task_generation_lazy,
437-
backpressure_sema=result._backpressure_sema,
438-
)
439-
440431
if chunksize == 1:
441432
self._taskqueue.put(
442433
(
443-
task_generation(result._job, func, iterable),
434+
self._guarded_task_generation(result._job, func, iterable,
435+
result._buffersize_sema),
444436
result._set_length,
445437
)
446438
)
@@ -449,7 +441,8 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
449441
task_batches = Pool._get_tasks(func, iterable, chunksize)
450442
self._taskqueue.put(
451443
(
452-
task_generation(result._job, mapstar, task_batches),
444+
self._guarded_task_generation(result._job, mapstar, task_batches,
445+
result._buffersize_sema),
453446
result._set_length,
454447
)
455448
)
@@ -471,19 +464,11 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
471464
raise ValueError("buffersize must be None or > 0")
472465

473466
result = IMapUnorderedIterator(self, buffersize)
474-
475-
if result._backpressure_sema is None:
476-
task_generation = self._guarded_task_generation
477-
else:
478-
task_generation = functools.partial(
479-
self._guarded_task_generation_lazy,
480-
backpressure_sema=result._backpressure_sema,
481-
)
482-
483467
if chunksize == 1:
484468
self._taskqueue.put(
485469
(
486-
task_generation(result._job, func, iterable),
470+
self._guarded_task_generation(result._job, func, iterable,
471+
result._buffersize_sema),
487472
result._set_length,
488473
)
489474
)
@@ -492,7 +477,8 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
492477
task_batches = Pool._get_tasks(func, iterable, chunksize)
493478
self._taskqueue.put(
494479
(
495-
task_generation(result._job, mapstar, task_batches),
480+
self._guarded_task_generation(result._job, mapstar, task_batches,
481+
result._buffersize_sema),
496482
result._set_length,
497483
)
498484
)
@@ -727,7 +713,8 @@ def _help_stuff_finish(inqueue, task_handler, size):
727713

728714
@classmethod
729715
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
730-
worker_handler, task_handler, result_handler, cache):
716+
worker_handler, task_handler, result_handler, cache,
717+
taskqueue_buffersize_semaphores):
731718
# this is guaranteed to only be called once
732719
util.debug('finalizing pool')
733720

@@ -738,6 +725,10 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
738725
change_notifier.put(None)
739726

740727
task_handler._state = TERMINATE
728+
# Release all semaphores to wake up task_handler to stop.
729+
for job_id, sema in tuple(taskqueue_buffersize_semaphores.items()):
730+
taskqueue_buffersize_semaphores.pop(job_id)
731+
sema.release()
741732

742733
util.debug('helping task handler/workers to finish')
743734
cls._help_stuff_finish(inqueue, task_handler, len(pool))
@@ -893,11 +884,13 @@ def __init__(self, pool, buffersize):
893884
self._length = None
894885
self._unsorted = {}
895886
self._cache[self._job] = self
896-
897887
if buffersize is None:
898-
self._backpressure_sema = None
888+
self._buffersize_sema = None
899889
else:
900-
self._backpressure_sema = threading.Semaphore(buffersize)
890+
self._buffersize_sema = threading.Semaphore(buffersize)
891+
self._pool._taskqueue_buffersize_semaphores[self] = (
892+
self._buffersize_sema
893+
)
901894

902895
def __iter__(self):
903896
return self
@@ -908,25 +901,30 @@ def next(self, timeout=None):
908901
item = self._items.popleft()
909902
except IndexError:
910903
if self._index == self._length:
911-
self._pool = None
912-
raise StopIteration from None
904+
self._stop_iterator()
913905
self._cond.wait(timeout)
914906
try:
915907
item = self._items.popleft()
916908
except IndexError:
917909
if self._index == self._length:
918-
self._pool = None
919-
raise StopIteration from None
910+
self._stop_iterator()
920911
raise TimeoutError from None
921912

922-
if self._backpressure_sema is not None:
923-
self._backpressure_sema.release()
913+
if self._buffersize_sema is not None:
914+
self._buffersize_sema.release()
924915

925916
success, value = item
926917
if success:
927918
return value
928919
raise value
929920

921+
def _stop_iterator(self):
922+
if self._pool is not None:
923+
# could be deleted in previous `.next()` calls
924+
self._pool._taskqueue_buffersize_semaphores.pop(self._job)
925+
self._pool = None
926+
raise StopIteration from None
927+
930928
__next__ = next # XXX
931929

932930
def _set(self, i, obj):

0 commit comments

Comments
 (0)