Skip to content

Commit 0d17d86

Browse files
committed
MOD: Reduce Python client buffering
1 parent 667127e commit 0d17d86

3 files changed

Lines changed: 138 additions & 11 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## 0.78.0 - TBD
4+
5+
#### Enhancements
6+
- Added time-based backpressure to the live client: pauses reading records from the live
7+
gateway when the internal queue spans more than 1 second of data by `ts_index`
8+
(`ts_recv` when present, otherwise `ts_event`)
9+
310
## 0.77.0 - 2026-04-28
411

512
#### Enhancements

databento/live/session.py

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@
3636
AUTH_TIMEOUT_SECONDS: Final = 30.0
3737
CONNECT_TIMEOUT_SECONDS: Final = 10.0
3838
DBN_QUEUE_CAPACITY: Final = 2**20
39+
DBN_QUEUE_LAG_THRESHOLD: Final = 128
40+
DBN_QUEUE_MAX_LAG_NS: Final = 1_000_000_000
41+
DBN_QUEUE_FULL_WARNING_INTERVAL_S: Final = 60.0
3942
DEFAULT_REMOTE_PORT: Final = 13000
4043
CLIENT_TIMEOUT_MARGIN_SECONDS: Final = 10
4144

@@ -48,6 +51,8 @@ class DBNQueue(queue.SimpleQueue): # type: ignore [type-arg]
4851
def __init__(self) -> None:
4952
super().__init__()
5053
self._enabled = threading.Event()
54+
self._front_ts_index: int | None = None
55+
self._back_ts_index: int | None = None
5156

5257
def is_enabled(self) -> bool:
5358
"""
@@ -62,7 +67,16 @@ def is_full(self) -> bool:
6267
"""
6368
Return True when the queue has reached capacity; False otherwise.
6469
"""
65-
return self.qsize() > DBN_QUEUE_CAPACITY
70+
if self.qsize() > DBN_QUEUE_CAPACITY:
71+
return True
72+
if (
73+
self.qsize() > DBN_QUEUE_LAG_THRESHOLD
74+
and self._front_ts_index is not None
75+
and self._back_ts_index is not None
76+
and self._back_ts_index - self._front_ts_index > DBN_QUEUE_MAX_LAG_NS
77+
):
78+
return True
79+
return False
6680

6781
def enable(self) -> None:
6882
"""
@@ -106,6 +120,9 @@ def put(
106120
107121
"""
108122
if self._enabled.wait(timeout):
123+
if self._front_ts_index is None:
124+
self._front_ts_index = item.ts_index
125+
self._back_ts_index = item.ts_index
109126
return super().put(item, block, timeout)
110127
if timeout is not None:
111128
raise BentoError(f"queue is not enabled after {timeout} second(s)")
@@ -131,9 +148,34 @@ def put_nowait(self, item: DBNRecord) -> None:
131148
132149
"""
133150
if self.is_enabled():
151+
if self._front_ts_index is None:
152+
self._front_ts_index = item.ts_index
153+
self._back_ts_index = item.ts_index
134154
return super().put_nowait(item)
135155
raise BentoError("queue is not enabled")
136156

157+
def get(
158+
self,
159+
block: bool = True,
160+
timeout: float | None = None,
161+
) -> DBNRecord:
162+
record = super().get(block, timeout)
163+
if self.empty():
164+
self._front_ts_index = None
165+
self._back_ts_index = None
166+
else:
167+
self._front_ts_index = record.ts_index
168+
return record
169+
170+
def get_nowait(self) -> DBNRecord:
171+
record = super().get_nowait()
172+
if self.empty():
173+
self._front_ts_index = None
174+
self._back_ts_index = None
175+
else:
176+
self._front_ts_index = record.ts_index
177+
return record
178+
137179

138180
@dataclasses.dataclass
139181
class SessionMetadata:
@@ -225,6 +267,7 @@ def __init__(
225267
self._user_streams = user_streams
226268
self._last_ts_event: int | None = None
227269
self._last_msg_loop_time: float = math.inf
270+
self._last_queue_full_warning_t: float = -math.inf
228271

229272
def received_metadata(self, metadata: databento_dbn.Metadata) -> None:
230273
if self._metadata:
@@ -282,10 +325,13 @@ def _queue_for_iteration(self, record: DBNRecord) -> None:
282325
self._dbn_queue.put(record)
283326
# DBNQueue has no max size; so check if it's above capacity, and if so, pause reading
284327
if self._dbn_queue.is_full():
285-
logger.warning(
286-
"record queue is full; %d record(s) to be processed",
287-
self._dbn_queue.qsize(),
288-
)
328+
now = self._loop.time()
329+
if now - self._last_queue_full_warning_t >= DBN_QUEUE_FULL_WARNING_INTERVAL_S:
330+
logger.warning(
331+
"record queue is full; %d record(s) to be processed",
332+
self._dbn_queue.qsize(),
333+
)
334+
self._last_queue_full_warning_t = now
289335
self.transport.pause_reading()
290336

291337

tests/test_live_session.py

Lines changed: 80 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,41 @@
11
import pytest
2+
from databento_dbn import CBBOMsg
3+
from databento_dbn import OHLCVMsg
4+
from databento_dbn import Side
25

36
from databento.common.error import BentoError
7+
from databento.live.session import DBN_QUEUE_LAG_THRESHOLD
8+
from databento.live.session import DBN_QUEUE_MAX_LAG_NS
49
from databento.live.session import DBNQueue
510

611

12+
def _record(ts_event: int = 0) -> OHLCVMsg:
13+
return OHLCVMsg(
14+
rtype=1,
15+
publisher_id=1,
16+
instrument_id=0,
17+
ts_event=ts_event,
18+
open=100,
19+
high=110,
20+
low=90,
21+
close=105,
22+
volume=1000,
23+
)
24+
25+
26+
def _cbbo_record(ts_event: int, ts_recv: int) -> CBBOMsg:
27+
return CBBOMsg(
28+
rtype=0xC1,
29+
publisher_id=1,
30+
instrument_id=0,
31+
ts_event=ts_event,
32+
price=1_000_000_000,
33+
size=1,
34+
side=Side.NONE,
35+
ts_recv=ts_recv,
36+
)
37+
38+
739
def test_dbn_queue_put(
840
timeout: float = 0.01,
941
) -> None:
@@ -15,17 +47,18 @@ def test_dbn_queue_put(
1547
"""
1648
# Arrange
1749
queue = DBNQueue()
50+
record = _record()
1851

1952
# Act, Assert
2053
with pytest.raises(BentoError):
21-
queue.put(None, timeout=timeout)
54+
queue.put(record, timeout=timeout)
2255

2356
queue.enable()
24-
queue.put(None, timeout=timeout)
57+
queue.put(record, timeout=timeout)
2558

2659
queue.disable()
2760
with pytest.raises(BentoError):
28-
queue.put(None, timeout=timeout)
61+
queue.put(record, timeout=timeout)
2962

3063

3164
def test_dbn_queue_put_nowait() -> None:
@@ -34,14 +67,55 @@ def test_dbn_queue_put_nowait() -> None:
3467
"""
3568
# Arrange
3669
queue = DBNQueue()
70+
record = _record()
3771

3872
# Act, Assert
3973
with pytest.raises(BentoError):
40-
queue.put_nowait(None)
74+
queue.put_nowait(record)
4175

4276
queue.enable()
43-
queue.put_nowait(None)
77+
queue.put_nowait(record)
4478

4579
queue.disable()
4680
with pytest.raises(BentoError):
47-
queue.put_nowait(None)
81+
queue.put_nowait(record)
82+
83+
84+
def test_dbn_queue_is_full_uses_ts_index() -> None:
85+
queue = DBNQueue()
86+
queue.enable()
87+
88+
base_recv = 1_700_000_000_000_000_000
89+
stale_event_start = 1_600_000_000_000_000_000
90+
ten_ms = 10_000_000
91+
for i in range(DBN_QUEUE_LAG_THRESHOLD + 2):
92+
queue.put_nowait(
93+
_cbbo_record(
94+
ts_event=stale_event_start + i * ten_ms,
95+
ts_recv=base_recv + i,
96+
),
97+
)
98+
99+
assert queue.qsize() > DBN_QUEUE_LAG_THRESHOLD
100+
assert not queue.is_full()
101+
102+
103+
def test_dbn_queue_is_full_triggers_on_ts_recv_lag() -> None:
104+
"""
105+
`ts_recv` lag past `DBN_QUEUE_MAX_LAG_NS` should trip backpressure even
106+
when `ts_event` is constant.
107+
"""
108+
queue = DBNQueue()
109+
queue.enable()
110+
111+
base_recv = 1_700_000_000_000_000_000
112+
step = (DBN_QUEUE_MAX_LAG_NS // DBN_QUEUE_LAG_THRESHOLD) + 1
113+
for i in range(DBN_QUEUE_LAG_THRESHOLD + 2):
114+
queue.put_nowait(
115+
_cbbo_record(
116+
ts_event=0,
117+
ts_recv=base_recv + i * step,
118+
),
119+
)
120+
121+
assert queue.is_full()

0 commit comments

Comments
 (0)