Skip to content

Commit 6bbe0f2

Browse files
author
Oleksandr Baltian
committed
Update buffersize behavior to match concurrent.futures.Executor behavior
This new behavior allow smaller real concurrency number than number of running processes. Previously, it was not allowed since we implicitly incremented buffersize by `self._processes`.
1 parent 99757ea commit 6bbe0f2

1 file changed

Lines changed: 13 additions & 5 deletions

File tree

Lib/multiprocessing/pool.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
# If threading is available then ThreadPool should be provided. Therefore
2828
# we avoid top-level imports which are liable to fail on some systems.
2929
from . import util
30-
from . import get_context, TimeoutError
30+
from . import TimeoutError, get_context
3131
from .connection import wait
3232

3333
#
@@ -421,6 +421,11 @@ def imap(self, func, iterable, chunksize=1, buffersize=None):
421421
self._check_running()
422422
if chunksize < 1:
423423
raise ValueError("Chunksize must be 1+, not {0:n}".format(chunksize))
424+
if buffersize is not None:
425+
if not isinstance(buffersize, int):
426+
raise TypeError("buffersize must be an integer or None")
427+
if buffersize < 1:
428+
raise ValueError("buffersize must be None or > 0")
424429

425430
result = IMapIterator(self, buffersize)
426431

@@ -459,6 +464,11 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None):
459464
raise ValueError(
460465
"Chunksize must be 1+, not {0!r}".format(chunksize)
461466
)
467+
if buffersize is not None:
468+
if not isinstance(buffersize, int):
469+
raise TypeError("buffersize must be an integer or None")
470+
if buffersize < 1:
471+
raise ValueError("buffersize must be None or > 0")
462472

463473
result = IMapUnorderedIterator(self, buffersize)
464474

@@ -887,9 +897,7 @@ def __init__(self, pool, buffersize):
887897
if buffersize is None:
888898
self._backpressure_sema = None
889899
else:
890-
self._backpressure_sema = threading.Semaphore(
891-
value=self._pool._processes + buffersize
892-
)
900+
self._backpressure_sema = threading.Semaphore(buffersize)
893901

894902
def __iter__(self):
895903
return self
@@ -911,7 +919,7 @@ def next(self, timeout=None):
911919
raise StopIteration from None
912920
raise TimeoutError from None
913921

914-
if self._backpressure_sema:
922+
if self._backpressure_sema is not None:
915923
self._backpressure_sema.release()
916924

917925
success, value = item

0 commit comments

Comments
 (0)