Skip to content

Commit ef5f59c

Browse files
author
Oleksandr Baltian
committed
Add 2 basic ThreadPool.imap() tests w/ and w/o buffersize
1 parent 3065a96 commit ef5f59c

1 file changed

Lines changed: 58 additions & 0 deletions

File tree

Lib/test/_test_multiprocessing.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2929,6 +2929,64 @@ def test_imap(self):
29292929
self.assertEqual(next(it), i*i)
29302930
self.assertRaises(StopIteration, it.__next__)
29312931

2932+
def test_imap_inf_iterable_with_slow_task(self):
2933+
if self.TYPE in ("processes", "manager"):
2934+
self.skipTest("test not appropriate for {}".format(self.TYPE))
2935+
2936+
processes = 4
2937+
p = self.Pool(processes)
2938+
2939+
tasks_started_later = 2
2940+
last_produced_task_arg = Value("i")
2941+
2942+
def produce_args():
2943+
for arg in range(1, processes + tasks_started_later + 1):
2944+
last_produced_task_arg.value = arg
2945+
yield arg
2946+
2947+
it = p.imap(functools.partial(sqr, wait=0.2), produce_args())
2948+
2949+
next(it)
2950+
time.sleep(0.2)
2951+
# `iterable` should've been advanced only up by `processes` times,
2952+
# but in fact advances further (by `>=processes+1`).
2953+
# In this case, it advances to the maximum value.
2954+
self.assertGreater(last_produced_task_arg.value, processes + 1)
2955+
2956+
p.terminate()
2957+
p.join()
2958+
2959+
def test_imap_inf_iterable_with_slow_task_and_buffersize(self):
2960+
if self.TYPE in ("processes", "manager"):
2961+
self.skipTest("test not appropriate for {}".format(self.TYPE))
2962+
2963+
processes = 4
2964+
p = self.Pool(processes)
2965+
2966+
tasks_started_later = 2
2967+
last_produced_task_arg = Value("i")
2968+
2969+
def produce_args():
2970+
for arg in range(1, processes + tasks_started_later + 1):
2971+
last_produced_task_arg.value = arg
2972+
yield arg
2973+
2974+
it = p.imap(
2975+
functools.partial(sqr, wait=0.2),
2976+
produce_args(),
2977+
buffersize=processes,
2978+
)
2979+
2980+
time.sleep(0.2)
2981+
self.assertEqual(last_produced_task_arg.value, processes)
2982+
2983+
next(it)
2984+
time.sleep(0.2)
2985+
self.assertEqual(last_produced_task_arg.value, processes + 1)
2986+
2987+
p.terminate()
2988+
p.join()
2989+
29322990
def test_imap_handle_iterable_exception(self):
29332991
if self.TYPE == 'manager':
29342992
self.skipTest('test not appropriate for {}'.format(self.TYPE))

0 commit comments

Comments
 (0)