Skip to content

Commit 91e871a

Browse files
authored
pythongh-124397: Add free-threading support for iterators. (pythongh-148894)
1 parent 3236773 commit 91e871a

5 files changed

Lines changed: 537 additions & 0 deletions

File tree

Doc/library/threading.rst

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1436,3 +1436,159 @@ is equivalent to::
14361436
Currently, :class:`Lock`, :class:`RLock`, :class:`Condition`,
14371437
:class:`Semaphore`, and :class:`BoundedSemaphore` objects may be used as
14381438
:keyword:`with` statement context managers.
1439+
1440+
1441+
Iterator synchronization
1442+
------------------------
1443+
1444+
By default, Python iterators do not support concurrent access. Most iterators make
1445+
no guarantees when accessed simultaneously from multiple threads. Generator
1446+
iterators, for example, raise :exc:`ValueError` if one of their iterator methods
1447+
is called while the generator is already executing. The tools in this section
1448+
allow reliable concurrency support to be added to ordinary iterators and
1449+
iterator-producing callables.
1450+
1451+
The :class:`serialize_iterator` wrapper lets multiple threads share a single iterator and
1452+
take turns consuming from it. While one thread is running ``__next__()``, the
1453+
others block until the iterator becomes available. Each value produced by the
1454+
underlying iterator is delivered to exactly one caller.
1455+
1456+
The :func:`concurrent_tee` function lets multiple threads each receive the full
1457+
stream of values from one underlying iterator. It creates independent iterators
1458+
that all draw from the same source. Values are buffered until consumed by all
1459+
of the derived iterators.
1460+
1461+
.. class:: serialize_iterator(iterable)
1462+
1463+
Return an iterator wrapper that serializes concurrent calls to
1464+
:meth:`~iterator.__next__` using a lock.
1465+
1466+
If the wrapped iterator also defines :meth:`~generator.send`,
1467+
:meth:`~generator.throw`, or :meth:`~generator.close`, those calls
1468+
are serialized as well.
1469+
1470+
This makes it possible to share a single iterator, including a generator
1471+
iterator, between multiple threads. A lock ensures that calls are handled
1472+
one at a time. No values are duplicated or skipped by the wrapper itself.
1473+
Each item from the underlying iterator is given to exactly one caller.
1474+
1475+
This wrapper does not copy or buffer values. Threads that call
1476+
:func:`next` while another thread is already advancing the iterator will
1477+
block until the active call completes.
1478+
1479+
Example:
1480+
1481+
.. code-block:: python
1482+
1483+
import threading
1484+
1485+
def squares(n):
1486+
for x in range(n):
1487+
yield x * x
1488+
1489+
def consume(name, iterable):
1490+
for item in iterable:
1491+
print(name, item)
1492+
1493+
source = threading.serialize_iterator(squares(5))
1494+
1495+
t1 = threading.Thread(target=consume, args=("left", source))
1496+
t2 = threading.Thread(target=consume, args=("right", source))
1497+
t1.start()
1498+
t2.start()
1499+
t1.join()
1500+
t2.join()
1501+
1502+
In this example, each number is printed exactly once, but the work is shared
1503+
between the two threads.
1504+
1505+
.. versionadded:: next
1506+
1507+
1508+
.. function:: synchronized_iterator(func)
1509+
1510+
Wrap an iterator-producing callable so that each iterator it returns is
1511+
automatically passed through :class:`serialize_iterator`.
1512+
1513+
This is especially useful as a :term:`decorator` for generator functions,
1514+
allowing their generator-iterators to be consumed from multiple threads.
1515+
1516+
Example:
1517+
1518+
.. code-block:: python
1519+
1520+
import threading
1521+
1522+
@threading.synchronized_iterator
1523+
def squares(n):
1524+
for x in range(n):
1525+
yield x * x
1526+
1527+
def consume(name, iterable):
1528+
for item in iterable:
1529+
print(name, item)
1530+
1531+
source = squares(5)
1532+
1533+
t1 = threading.Thread(target=consume, args=("left", source))
1534+
t2 = threading.Thread(target=consume, args=("right", source))
1535+
t1.start()
1536+
t2.start()
1537+
t1.join()
1538+
t2.join()
1539+
1540+
The returned wrapper preserves the metadata of *func*, such as its name and
1541+
wrapped function reference.
1542+
1543+
.. versionadded:: next
1544+
1545+
1546+
.. function:: concurrent_tee(iterable, n=2)
1547+
1548+
Return *n* independent iterators from a single input *iterable*, with
1549+
guaranteed behavior when the derived iterators are consumed concurrently.
1550+
1551+
This function is similar to :func:`itertools.tee`, but is intended for cases
1552+
where the source iterator may feed consumers running in different threads.
1553+
Each returned iterator yields every value from the underlying iterable, in
1554+
the same order.
1555+
1556+
Internally, values are buffered until every derived iterator has consumed
1557+
them.
1558+
1559+
The returned iterators share the same underlying synchronization lock. Each
1560+
individual derived iterator is intended to be consumed by one thread at a
1561+
time. If a single derived iterator must itself be shared by multiple
1562+
threads, wrap it with :class:`serialize_iterator`.
1563+
1564+
If *n* is ``0``, return an empty tuple. If *n* is negative, raise
1565+
:exc:`ValueError`.
1566+
1567+
Example:
1568+
1569+
.. code-block:: python
1570+
1571+
import threading
1572+
1573+
def squares(n):
1574+
for x in range(n):
1575+
yield x * x
1576+
1577+
def consume(name, iterable):
1578+
for item in iterable:
1579+
print(name, item)
1580+
1581+
source = squares(5)
1582+
left, right = threading.concurrent_tee(source)
1583+
1584+
t1 = threading.Thread(target=consume, args=("left", left))
1585+
t2 = threading.Thread(target=consume, args=("right", right))
1586+
t1.start()
1587+
t2.start()
1588+
t1.join()
1589+
t2.join()
1590+
1591+
In this example, both consumer threads see the full sequence of squares
1592+
from a single generator expression.
1593+
1594+
.. versionadded:: next

Doc/whatsnew/3.15.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,6 +1279,16 @@ tarfile
12791279
(Contributed by Christoph Walcher in :gh:`57911`.)
12801280

12811281

1282+
threading
1283+
---------
1284+
1285+
* Added :class:`~threading.serialize_iterator`,
1286+
:func:`~threading.synchronized_iterator`,
1287+
and :func:`~threading.concurrent_tee` to support concurrent access to
1288+
generators and iterators.
1289+
(Contributed by Raymond Hettinger in :gh:`124397`.)
1290+
1291+
12821292
timeit
12831293
------
12841294

0 commit comments

Comments
 (0)