|
1 | 1 | import asyncio |
2 | 2 | import logging |
3 | 3 | import queue |
| 4 | +import threading |
4 | 5 | from concurrent import futures |
5 | 6 | from typing import Callable, NewType, Optional, Union |
6 | 7 |
|
@@ -57,6 +58,7 @@ def __init__( |
57 | 58 | self._dbn_queue: Optional[DBNQueue] = None |
58 | 59 | self._decoder: databento_dbn.DbnDecoder = databento_dbn.DbnDecoder() |
59 | 60 | self._disconnected: "asyncio.Future[None]" = loop.create_future() |
| 61 | + self._transport_lock = threading.Lock() |
60 | 62 | self._transport = transport |
61 | 63 |
|
62 | 64 | def __aiter__(self) -> "DBNProtocol": |
@@ -90,7 +92,8 @@ async def __anext__(self) -> DBNStruct: |
90 | 92 | "resuming reading with %d pending records", |
91 | 93 | self._dbn_queue.qsize(), |
92 | 94 | ) |
93 | | - self._transport.resume_reading() |
| 95 | + with self._transport_lock: |
| 96 | + self._transport.resume_reading() |
94 | 97 |
|
95 | 98 | raise StopAsyncIteration() |
96 | 99 |
|
@@ -118,7 +121,8 @@ def __next__(self) -> DBNStruct: |
118 | 121 | "resuming reading with %d pending records", |
119 | 122 | self._dbn_queue.qsize(), |
120 | 123 | ) |
121 | | - self._transport.resume_reading() |
| 124 | + with self._transport_lock: |
| 125 | + self._transport.resume_reading() |
122 | 126 |
|
123 | 127 | raise StopIteration() |
124 | 128 |
|
@@ -171,7 +175,8 @@ def buffer_updated(self, nbytes: int) -> None: |
171 | 175 | self._decoder.write(record_bytes) |
172 | 176 | except ValueError: |
173 | 177 | logger.critical("could not write to dbn decoder") |
174 | | - self._transport.close() |
| 178 | + with self._transport_lock: |
| 179 | + self._transport.close() |
175 | 180 |
|
176 | 181 | try: |
177 | 182 | records = self._decoder.decode() |
@@ -210,4 +215,5 @@ def buffer_updated(self, nbytes: int) -> None: |
210 | 215 | "record queue is full; %d record(s) to be processed", |
211 | 216 | self._dbn_queue.qsize(), |
212 | 217 | ) |
213 | | - self._transport.pause_reading() |
| 218 | + with self._transport_lock: |
| 219 | + self._transport.pause_reading() |
0 commit comments