Skip to content

Commit 5769260

Browse files
authored
Merge pull request #12 from MITLibraries/TIMX-424-reorder-partition-columns
TIMX 424 - reorder partition columns
2 parents b6043f4 + ddbd5fd commit 5769260

5 files changed

Lines changed: 36 additions & 15 deletions

File tree

tests/test_dataset_write.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,14 +169,14 @@ def test_dataset_write_schema_partitions_correctly_ordered(
169169
"source": "alma",
170170
"run_date": "2024-12-01",
171171
"run_type": "daily",
172-
"action": "index",
173172
"run_id": "000-111-aaa-bbb",
173+
"action": "index",
174174
},
175175
)
176176
file = written_files[0]
177177
assert (
178178
"/source=alma/run_date=2024-12-01/run_type=daily"
179-
"/action=index/run_id=000-111-aaa-bbb" in file.path
179+
"/run_id=000-111-aaa-bbb/action=index/" in file.path
180180
)
181181

182182

tests/utils.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,17 @@ def generate_sample_records_with_simulated_partitions(
4343
run_dates = ["2024-01-01", "2024-06-15", "2024-12-31"]
4444
run_types = ["full", "daily"]
4545
actions = ["index", "delete"]
46-
run_ids = [str(uuid.uuid4()) for x in range(num_run_ids)]
4746

4847
records_remaining = num_records
4948
while records_remaining > 0:
5049
batch_size = random.randint(1, min(100, records_remaining))
50+
source = random.choice(sources)
5151
yield from generate_sample_records(
5252
num_records=batch_size,
53-
timdex_record_id_prefix=random.choice(sources),
54-
source=random.choice(sources),
53+
timdex_record_id_prefix=source,
54+
source=source,
5555
run_date=random.choice(run_dates),
5656
run_type=random.choice(run_types),
5757
action=random.choice(actions),
58-
run_id=random.choice(run_ids),
5958
)
6059
records_remaining -= batch_size

timdex_dataset_api/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from timdex_dataset_api.dataset import TIMDEXDataset
44
from timdex_dataset_api.record import DatasetRecord
55

6-
__version__ = "0.1.0"
6+
__version__ = "0.2.0"
77

88
__all__ = [
99
"DatasetRecord",

timdex_dataset_api/dataset.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,17 @@
2828
pa.field("source", pa.string()),
2929
pa.field("run_date", pa.date32()),
3030
pa.field("run_type", pa.string()),
31-
pa.field("action", pa.string()),
3231
pa.field("run_id", pa.string()),
32+
pa.field("action", pa.string()),
3333
)
3434
)
3535

3636
TIMDEX_DATASET_PARTITION_COLUMNS = [
3737
"source",
3838
"run_date",
3939
"run_type",
40-
"action",
4140
"run_id",
41+
"action",
4242
]
4343

4444
DEFAULT_BATCH_SIZE = 1_000
@@ -54,6 +54,7 @@ def __init__(self, location: str | list[str]):
5454
self.dataset: ds.Dataset = None # type: ignore[assignment]
5555
self.schema = TIMDEX_DATASET_SCHEMA
5656
self.partition_columns = TIMDEX_DATASET_PARTITION_COLUMNS
57+
self._written_files: list[ds.WrittenFile] = None # type: ignore[assignment]
5758

5859
@classmethod
5960
def load(cls, location: str) -> "TIMDEXDataset":
@@ -197,6 +198,7 @@ def write(
197198
- use_threads: boolean if threads should be used for writing
198199
"""
199200
start_time = time.perf_counter()
201+
self._written_files = []
200202

201203
if isinstance(self.source, list):
202204
raise TypeError(
@@ -209,14 +211,13 @@ def write(
209211
batch_size=batch_size,
210212
)
211213

212-
written_files = []
213214
ds.write_dataset(
214215
record_batches_iter,
215216
base_dir=self.source,
216217
basename_template="%s-{i}.parquet" % (str(uuid.uuid4())), # noqa: UP031
217218
existing_data_behavior="delete_matching",
218219
filesystem=self.filesystem,
219-
file_visitor=lambda written_file: written_files.append(written_file),
220+
file_visitor=lambda written_file: self._written_files.append(written_file), # type: ignore[arg-type]
220221
format="parquet",
221222
max_open_files=500,
222223
max_rows_per_file=MAX_ROWS_PER_FILE,
@@ -227,8 +228,8 @@ def write(
227228
use_threads=use_threads,
228229
)
229230

230-
logger.info(f"write elapsed: {round(time.perf_counter()-start_time, 2)}s")
231-
return written_files # type: ignore[return-value]
231+
self.log_write_statistics(start_time)
232+
return self._written_files # type: ignore[return-value]
232233

233234
def get_dataset_record_batches(
234235
self,
@@ -266,3 +267,24 @@ def get_dataset_record_batches(
266267
f"elapsed: {round(time.perf_counter()-batch_start_time, 6)}s"
267268
)
268269
yield batch
270+
271+
def log_write_statistics(self, start_time: float) -> None:
272+
"""Parse written files from write and log statistics."""
273+
total_time = round(time.perf_counter() - start_time, 2)
274+
total_files = len(self._written_files)
275+
total_rows = sum(
276+
[
277+
wf.metadata.num_rows # type: ignore[attr-defined]
278+
for wf in self._written_files
279+
]
280+
)
281+
total_size = sum(
282+
[wf.size for wf in self._written_files] # type: ignore[attr-defined]
283+
)
284+
logger.info(
285+
f"Dataset write complete - elapsed: "
286+
f"{total_time}s, "
287+
f"total files: {total_files}, "
288+
f"total rows: {total_rows}, "
289+
f"total size: {total_size}"
290+
)

timdex_dataset_api/record.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ class DatasetRecord:
2424
source: str | None = None
2525
run_date: str | datetime.datetime | None = None
2626
run_type: str | None = None
27-
action: str | None = None
2827
run_id: str | None = None
28+
action: str | None = None
2929

3030
def to_dict(
3131
self,
@@ -46,7 +46,7 @@ def validate(self) -> None:
4646
# ensure all partition columns are set
4747
missing_partition_values = [
4848
field
49-
for field in ["source", "run_date", "run_type", "action", "run_id"]
49+
for field in ["source", "run_date", "run_type", "run_id", "action"]
5050
if getattr(self, field) is None
5151
]
5252
if missing_partition_values:

0 commit comments

Comments
 (0)