Skip to content

Commit eab47a6

Browse files
committed
MOD: Modify 5GB size warning heuristics
1 parent 3d68b44 commit eab47a6

8 files changed

Lines changed: 120 additions & 68 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
## 0.13.0 - TBD
44
- Added support for `statistics` schema
5-
- Upgraded `databento-dbn` to 0.5.1
5+
- Upgraded `databento-dbn` to 0.6.0
66
- Renamed `booklevel` MBP field to `levels` for brevity and consistent naming
77
- Changed `flags` field to an unsigned int
88
- Changed default of `ts_out` to `False` for `Live` client

databento/common/dbnstore.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from databento.common.symbology import InstrumentIdMappingInterval
3535
from databento.common.validation import validate_maybe_enum
3636
from databento.live.data import DBNStruct
37-
from databento_dbn import DbnDecoder, ErrorMsg, Metadata, SymbolMappingMsg, SystemMsg
37+
from databento_dbn import DBNDecoder, ErrorMsg, Metadata, SymbolMappingMsg, SystemMsg
3838

3939

4040
NON_SCHEMA_RECORD_TYPES = [
@@ -354,8 +354,7 @@ def __init__(self, data_source: DataSource) -> None:
354354

355355
def __iter__(self) -> Generator[DBNStruct, None, None]:
356356
reader = self.reader
357-
attach_ts_out = self.metadata.ts_out
358-
decoder = DbnDecoder()
357+
decoder = DBNDecoder()
359358
while True:
360359
raw = reader.read(DBNStore.DBN_READ_SIZE)
361360
if raw:
@@ -364,9 +363,7 @@ def __iter__(self) -> Generator[DBNStruct, None, None]:
364363
records = decoder.decode()
365364
except ValueError:
366365
continue
367-
for record, ts_out in records:
368-
if attach_ts_out and not isinstance(record, Metadata):
369-
setattr(record, "ts_out", ts_out)
366+
for record in records:
370367
yield record
371368
else:
372369
if len(decoder.buffer()) > 0:

databento/historical/api/timeseries.py

Lines changed: 103 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
)
1919
from databento.common.validation import validate_enum, validate_semantic_string
2020
from databento.historical.api import API_VERSION
21+
from databento.historical.api.metadata import MetadataHttpAPI
2122
from databento.historical.http import BentoHttpAPI
2223

2324

25+
WARN_REQUEST_SIZE: int = 5 * 10**9 # 5 GB
26+
27+
2428
class TimeSeriesHttpAPI(BentoHttpAPI):
2529
"""
2630
Provides request methods for the time series HTTP API endpoints.
@@ -127,10 +131,12 @@ def get_range(
127131
stype_in_valid = validate_enum(stype_in, SType, "stype_in")
128132
symbols_list = optional_symbols_list_to_string(symbols, stype_in_valid)
129133
schema_valid = validate_enum(schema, Schema, "schema")
134+
start_valid = datetime_to_string(start)
135+
end_valid = optional_datetime_to_string(end)
130136
params: List[Tuple[str, Optional[str]]] = [
131137
("dataset", validate_semantic_string(dataset, "dataset")),
132-
("start", datetime_to_string(start)),
133-
("end", optional_datetime_to_string(end)),
138+
("start", start_valid),
139+
("end", end_valid),
134140
("symbols", symbols_list),
135141
("schema", str(schema_valid)),
136142
("stype_in", str(stype_in_valid)),
@@ -144,10 +150,12 @@ def get_range(
144150
params.append(("limit", str(limit)))
145151

146152
self._pre_check_data_size(
147-
symbols=symbols,
153+
dataset=dataset,
154+
stype_in=stype_in_valid,
155+
symbols=symbols_list,
148156
schema=schema_valid,
149-
start=start,
150-
end=end,
157+
start=start_valid,
158+
end=end_valid,
151159
limit=limit,
152160
)
153161

@@ -267,10 +275,12 @@ async def get_range_async(
267275
stype_in_valid = validate_enum(stype_in, SType, "stype_in")
268276
symbols_list = optional_symbols_list_to_string(symbols, stype_in_valid)
269277
schema_valid = validate_enum(schema, Schema, "schema")
278+
start_valid = datetime_to_string(start)
279+
end_valid = optional_datetime_to_string(end)
270280
params: List[Tuple[str, Optional[str]]] = [
271281
("dataset", validate_semantic_string(dataset, "dataset")),
272-
("start", datetime_to_string(start)),
273-
("end", optional_datetime_to_string(end)),
282+
("start", start_valid),
283+
("end", end_valid),
274284
("symbols", symbols_list),
275285
("schema", str(schema_valid)),
276286
("stype_in", str(stype_in_valid)),
@@ -283,10 +293,12 @@ async def get_range_async(
283293
params.append(("limit", str(limit)))
284294

285295
self._pre_check_data_size(
286-
symbols=symbols,
296+
dataset=dataset,
297+
stype_in=stype_in_valid,
298+
symbols=symbols_list,
287299
schema=schema_valid,
288-
start=start,
289-
end=end,
300+
start=start_valid,
301+
end=end_valid,
290302
limit=limit,
291303
)
292304

@@ -308,59 +320,100 @@ async def get_range_async(
308320
writer.seek(0) # rewind for read
309321
return DBNStore.from_bytes(writer.read())
310322

311-
def _pre_check_data_size( # noqa (prefer not to make static)
323+
def _pre_check_data_size(
312324
self,
313-
symbols: Optional[Union[List[str], str]],
325+
dataset: str,
326+
symbols: str,
314327
schema: Schema,
315-
start: Optional[Union[pd.Timestamp, date, str, int]],
316-
end: Optional[Union[pd.Timestamp, date, str, int]],
328+
start: str,
329+
end: Optional[str],
330+
stype_in: SType,
317331
limit: Optional[int],
318332
) -> None:
319-
if limit and limit < 10**7:
333+
if _is_size_limited(
334+
schema=schema,
335+
limit=limit,
336+
):
320337
return
321338

322-
# Use heuristics to check ballpark data size
323-
if (
324-
_is_large_data_size_schema(schema)
325-
or _is_greater_than_one_day(start, end)
326-
or _is_large_number_of_symbols(symbols)
339+
if _is_period_limited(
340+
schema=schema,
341+
symbols=symbols,
342+
start=start,
343+
end=end,
327344
):
328-
warnings.warn(
329-
message="The size of this streaming request is estimated "
330-
"to be 5 GB or greater.\nWe recommend breaking your request "
331-
"into smaller requests, or submitting a batch download request.\n"
332-
"This warning can be suppressed: "
333-
"https://docs.python.org/3/library/warnings.html",
334-
category=BentoWarning,
335-
stacklevel=3, # This makes the error happen in user code
336-
)
337-
345+
return
338346

339-
def _is_large_number_of_symbols(symbols: Optional[Union[List[str], str]]) -> bool:
340-
if not symbols:
341-
return True # Full universe
347+
metadata_api = MetadataHttpAPI(
348+
key=self._key,
349+
gateway=self._gateway,
350+
)
351+
request_size = metadata_api.get_billable_size(
352+
dataset=dataset,
353+
start=start,
354+
end=end,
355+
symbols=symbols,
356+
schema=schema,
357+
stype_in=stype_in,
358+
limit=limit,
359+
)
342360

343-
if isinstance(symbols, str):
344-
symbols = symbols.split(",")
361+
if request_size < WARN_REQUEST_SIZE:
362+
return
345363

346-
if len(symbols) >= 500:
347-
return True
364+
warnings.warn(
365+
message="""The size of this streaming request is greater than 5GB.
366+
It is recommended to submit a batch download request for large volumes
367+
of data, or break this request into smaller requests.
368+
This warning can be suppressed:
369+
https://docs.python.org/3/library/warnings.html""",
370+
category=BentoWarning,
371+
stacklevel=3, # This makes the error happen in user code
372+
)
348373

349-
return False
350374

375+
def _is_size_limited(
376+
schema: Schema,
377+
limit: Optional[int],
378+
max_size: int = WARN_REQUEST_SIZE,
379+
) -> bool:
380+
if limit is None:
381+
return False
351382

352-
def _is_large_data_size_schema(schema: Schema) -> bool:
353-
return schema in (Schema.MBO, Schema.MBP_10)
383+
estimated_size = limit * schema.get_record_type().size_hint()
384+
return estimated_size < max_size
354385

355386

356-
def _is_greater_than_one_day(
357-
start: Optional[Union[pd.Timestamp, date, str, int]],
358-
end: Optional[Union[pd.Timestamp, date, str, int]],
387+
def _is_period_limited(
388+
schema: Schema,
389+
symbols: str,
390+
start: str,
391+
end: Optional[str],
392+
max_size: int = WARN_REQUEST_SIZE,
359393
) -> bool:
360-
if start is None or end is None:
361-
return True
362-
363-
if pd.to_datetime(end) - pd.to_datetime(start) > pd.Timedelta(days=1):
364-
return True
365-
366-
return False
394+
if end is None:
395+
return False
396+
397+
if schema not in (
398+
Schema.OHLCV_1S,
399+
Schema.OHLCV_1M,
400+
Schema.OHLCV_1H,
401+
Schema.OHLCV_1D,
402+
Schema.DEFINITION,
403+
):
404+
return False
405+
406+
dt_start = pd.to_datetime(start, utc=True)
407+
dt_end = pd.to_datetime(end, utc=True)
408+
409+
# default scale to one day for ohlcv_1d and definition
410+
scale = {
411+
Schema.OHLCV_1S: 1,
412+
Schema.OHLCV_1M: 60,
413+
Schema.OHLCV_1H: 60 * 60,
414+
}.get(schema, 60 * 60 * 24)
415+
416+
num_symbols = len(symbols.split(","))
417+
num_records = num_symbols * (dt_end - dt_start).total_seconds() // scale
418+
estimated_size = num_records * schema.get_record_type().size_hint()
419+
return estimated_size < max_size

databento/live/data.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ async def _stream_task(
7373
stream: IO[bytes],
7474
record: DBNStruct,
7575
) -> None:
76-
ts_out = getattr(record, "ts_out")
76+
ts_out = getattr(record, "ts_out", None)
7777
try:
7878
stream.write(bytes(record))
79-
if not isinstance(record, databento_dbn.Metadata) and ts_out:
79+
if not isinstance(record, databento_dbn.Metadata) and ts_out is not None:
8080
stream.write(struct.pack("Q", ts_out))
8181
except Exception as exc:
8282
stream_name = getattr(stream, "name", str(stream))

databento/live/dbn.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def __init__(
5757
self._buffer: bytearray = bytearray()
5858
self._client_callback = client_callback
5959
self._dbn_queue: Optional[DBNQueue] = None
60-
self._decoder: databento_dbn.DbnDecoder = databento_dbn.DbnDecoder()
60+
self._decoder: databento_dbn.DBNDecoder = databento_dbn.DBNDecoder()
6161
self._disconnected: "asyncio.Future[None]" = loop.create_future()
6262
self._transport_lock = threading.Lock()
6363
self._transport = transport
@@ -184,19 +184,15 @@ def buffer_updated(self, nbytes: int) -> None:
184184
except ValueError:
185185
pass
186186
else:
187-
for record, ts_out in records:
187+
for record in records:
188188
header = getattr(record, "hd", object())
189189
ts_event = getattr(header, "ts_event", None)
190190
logger.info(
191-
"decoded as %s record ts_event=%s ts_out=%s",
191+
"decoded as %s record ts_event=%s",
192192
type(record).__name__,
193193
ts_event,
194-
ts_out,
195194
)
196195

197-
if not isinstance(record, databento_dbn.Metadata):
198-
setattr(record, "ts_out", ts_out)
199-
200196
# Record Dispatch
201197
self._client_callback(record)
202198

databento/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.12.0"
1+
__version__ = "0.13.0"

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
aiohttp>=3.7.2,<4.0.0
2-
databento-dbn==0.5.1
2+
databento-dbn==0.6.0
33
numpy>=1.17.0
44
pandas>=1.1.3
55
requests>=2.24.0

tests/test_historical_timeseries.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import requests
88
from databento import DBNStore
99
from databento.common.enums import Schema
10+
from databento.historical.api.timeseries import TimeSeriesHttpAPI
1011
from pytest_mock import MockerFixture
1112

1213

@@ -62,6 +63,11 @@ def test_get_range_sends_expected_request(
6263

6364
# Mock from_bytes with the definition stub
6465
stream_bytes = test_data(Schema.TRADES)
66+
monkeypatch.setattr(
67+
TimeSeriesHttpAPI,
68+
"_pre_check_data_size",
69+
MagicMock(return_value=True),
70+
)
6571
monkeypatch.setattr(
6672
DBNStore,
6773
"from_bytes",

0 commit comments

Comments
 (0)