Skip to content

Commit f80864b

Browse files
committed
ADD: Add compression parameter to DBNStore.to_file
1 parent 59acae2 commit f80864b

3 files changed

Lines changed: 63 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
- Added `mode` parameter to `DBNStore.to_csv` to control the file writing mode
77
- Added `mode` parameter to `DBNStore.to_json` to control the file writing mode
88
- Added `mode` parameter to `DBNStore.to_parquet` to control the file writing mode
9+
- Added `compression` parameter to `DBNStore.to_file` which controls the output compression format
910

1011
#### Breaking changes
1112
- Changed default write mode for `DBNStore.to_csv` to overwrite ("w")

databento/common/dbnstore.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,6 +1041,7 @@ def to_file(
10411041
self,
10421042
path: PathLike[str] | str,
10431043
mode: Literal["w", "x"] = "w",
1044+
compression: Compression | str | None = None,
10441045
) -> None:
10451046
"""
10461047
Write the data to a DBN file at the given path.
@@ -1051,6 +1052,8 @@ def to_file(
10511052
The file path to write to.
10521053
mode : str, default "w"
10531054
The file write mode to use, either "x" or "w".
1055+
compression : Compression or str, optional
1056+
The compression format to write. If `None`, uses the same compression as the underlying data.
10541057
10551058
Raises
10561059
------
@@ -1062,9 +1065,35 @@ def to_file(
10621065
If path is not writable.
10631066
10641067
"""
1068+
compression = validate_maybe_enum(compression, Compression, "compression")
10651069
file_path = validate_file_write_path(path, "path", exist_ok=mode == "w")
1066-
file_path.write_bytes(self._data_source.reader.read())
1067-
self._data_source = FileDataSource(file_path)
1070+
1071+
writer: IO[bytes] | zstandard.ZstdCompressionWriter
1072+
if compression is None or compression == self.compression:
1073+
# Handle trivial case
1074+
with open(file_path, mode=f"{mode}b") as writer:
1075+
reader = self._data_source.reader
1076+
while chunk := reader.read(2**16):
1077+
writer.write(chunk)
1078+
return
1079+
1080+
if compression == Compression.ZSTD:
1081+
writer = zstandard.ZstdCompressor(
1082+
write_checksum=True,
1083+
).stream_writer(
1084+
open(file_path, mode=f"{mode}b"),
1085+
closefd=True,
1086+
)
1087+
else:
1088+
writer = open(file_path, mode=f"{mode}b")
1089+
1090+
try:
1091+
reader = self.reader
1092+
1093+
while chunk := reader.read(2**16):
1094+
writer.write(chunk)
1095+
finally:
1096+
writer.close()
10681097

10691098
def to_json(
10701099
self,

tests/test_historical_bento.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from databento.common.error import BentoError
2323
from databento.common.publishers import Dataset
2424
from databento.common.types import DBNRecord
25+
from databento_dbn import Compression
2526
from databento_dbn import MBOMsg
2627
from databento_dbn import Schema
2728
from databento_dbn import SType
@@ -243,6 +244,36 @@ def test_to_file_exclusive(
243244
dbnstore.to_file(path=dbn_path, mode="x")
244245

245246

247+
@pytest.mark.parametrize(
248+
"compression",
249+
[
250+
Compression.NONE,
251+
Compression.ZSTD,
252+
],
253+
)
254+
def test_to_file_compression(
255+
test_data: Callable[[Dataset, Schema], bytes],
256+
tmp_path: Path,
257+
compression: Compression,
258+
) -> None:
259+
"""
260+
Test that specifying a compression for DBNStore.to_file writes the desired
261+
compression mode.
262+
"""
263+
# Arrange
264+
stub_data = test_data(Dataset.GLBX_MDP3, Schema.MBO)
265+
dbnstore = DBNStore.from_bytes(data=stub_data)
266+
dbn_path = tmp_path / "my_test.dbn"
267+
dbnstore.to_file(
268+
path=dbn_path,
269+
compression=compression,
270+
)
271+
272+
# Act, Assert
273+
new_store = databento.read_dbn(dbn_path)
274+
assert new_store.compression == compression
275+
276+
246277
def test_to_csv_overwrite(
247278
test_data: Callable[[Dataset, Schema], bytes],
248279
tmp_path: Path,

0 commit comments

Comments
 (0)