Skip to content

Commit 76347b1

Browse files
Rework dataset partitions to only year, month, day
Why these changes are being introduced: * These changes simplify the partitioning schema for the TIMDEXDataset, allowing the app to take advantage of PyArrow's memory-efficient processes for reading and writing Parquet datasets. Furthermore, the new partitioning schema will result in a more efficient, coherent folder structure when writing datasets. For more details, see: https://mitlibraries.atlassian.net/wiki/spaces/IN/pages/4094296066/Engineering+Plan+Parquet+Datasets+for+TIMDEX+ETL#Rework-Dataset-Partitions-to-use-only-Year-%2F-Month-%2F-Day. How this addresses that need: * Update TIMDEX_DATASET_SCHEMA to include [year, month, day] * Update DatasetRecord attrs to include [year, month, day] and set [source, run_date, run_type, run_id, action] as primary columns * Add post_init method to DatasetRecord to derive partition values from 'run-date * Remove 'partition' values from DatasetRecord.to_dict * Remove 'partition_values' mixin from TIMDEXDataset.write to reduce complexity and have write method utilize DatasetRecord partition columns instead. * Update unit tests to use new partitions and remove deprecated tests Side effects of this change: * The new partitioning schema introduces a 3-level folder structure within TIMDEXDataset.location (i.e. the base path of the dataset) for [year, month, day], where the leaf node will contain parquet files for every source run. Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-432
1 parent 5769260 commit 76347b1

6 files changed

Lines changed: 80 additions & 153 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,3 +156,6 @@ cython_debug/
156156

157157
# PyCharm
158158
.idea/
159+
160+
# VSCode
161+
.vscode

tests/conftest.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,7 @@ def sample_records_iter_without_partitions():
5151

5252
def _records_iter(num_records):
5353
return generate_sample_records(
54-
num_records,
55-
source=None,
56-
run_date=None,
57-
run_type=None,
58-
action=None,
59-
run_id=None,
54+
num_records, run_date="invalid run-date", year=None, month=None, day=None
6055
)
6156

6257
return _records_iter

tests/test_dataset_write.py

Lines changed: 32 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
# ruff: noqa: S105, S106, SLF001, PLR2004, PD901, D209, D205
22

3-
import datetime
43
import math
54
import os
5+
import re
66

77
import pyarrow.dataset as ds
88
import pytest
99

10+
from tests.utils import generate_sample_records
1011
from timdex_dataset_api.dataset import (
1112
MAX_ROWS_PER_FILE,
1213
TIMDEX_DATASET_SCHEMA,
@@ -17,7 +18,7 @@
1718
from timdex_dataset_api.record import DatasetRecord
1819

1920

20-
def test_dataset_record_serialization():
21+
def test_dataset_record_init():
2122
values = {
2223
"timdex_record_id": "alma:123",
2324
"source_record": b"<record><title>Hello World.</title></record>",
@@ -26,38 +27,38 @@ def test_dataset_record_serialization():
2627
"run_date": "2024-12-01",
2728
"run_type": "full",
2829
"action": "index",
29-
"run_id": "abc123",
30+
"run_id": "000-111-aaa-bbb",
31+
"year": 2024,
32+
"month": 12,
33+
"day": 1,
3034
}
31-
dataset_record = DatasetRecord(**values)
32-
assert dataset_record.to_dict() == values
35+
assert DatasetRecord(**values)
3336

3437

35-
def test_dataset_record_serialization_with_partition_values_provided():
36-
dataset_record = DatasetRecord(
37-
timdex_record_id="alma:123",
38-
source_record=b"<record><title>Hello World.</title></record>",
39-
transformed_record=b"""{"title":["Hello World."]}""",
40-
)
41-
partition_values = {
42-
"source": "alma",
43-
"run_date": "2024-12-01",
44-
"run_type": "daily",
45-
"action": "index",
46-
"run_id": "000-111-aaa-bbb",
47-
}
48-
assert dataset_record.to_dict(partition_values=partition_values) == {
38+
def test_dataset_record_init_with_invalid_run_date_raise_error():
39+
values = {
4940
"timdex_record_id": "alma:123",
5041
"source_record": b"<record><title>Hello World.</title></record>",
5142
"transformed_record": b"""{"title":["Hello World."]}""",
52-
"source": "alma",
53-
"run_date": "2024-12-01",
54-
"run_type": "daily",
43+
"source": "libguides",
44+
"run_date": "-12-01",
45+
"run_type": "full",
5546
"action": "index",
5647
"run_id": "000-111-aaa-bbb",
48+
"year": None,
49+
"month": None,
50+
"day": None,
5751
}
52+
with pytest.raises(
53+
InvalidDatasetRecordError,
54+
match=re.escape(
55+
"Cannot parse partition values [year, month, date] from invalid 'run-date' string." # noqa: E501
56+
),
57+
):
58+
DatasetRecord(**values)
5859

5960

60-
def test_dataset_record_serialization_missing_partition_raise_error():
61+
def test_dataset_record_serialization():
6162
values = {
6263
"timdex_record_id": "alma:123",
6364
"source_record": b"<record><title>Hello World.</title></record>",
@@ -66,14 +67,13 @@ def test_dataset_record_serialization_missing_partition_raise_error():
6667
"run_date": "2024-12-01",
6768
"run_type": "full",
6869
"action": "index",
69-
"run_id": None, # <------ missing partition here
70+
"run_id": "abc123",
71+
"year": "2024",
72+
"month": "12",
73+
"day": "01",
7074
}
7175
dataset_record = DatasetRecord(**values)
72-
with pytest.raises(
73-
InvalidDatasetRecordError,
74-
match="Partition values are missing: run_id",
75-
):
76-
assert dataset_record.to_dict() == values
76+
assert dataset_record.to_dict() == values
7777

7878

7979
def test_dataset_write_records_to_new_dataset(new_dataset, sample_records_iter):
@@ -134,52 +134,6 @@ def test_dataset_write_to_multiple_locations_raise_error(sample_records_iter):
134134
timdex_dataset.write(sample_records_iter(10))
135135

136136

137-
def test_dataset_write_mixin_partition_values_used(
138-
new_dataset, sample_records_iter_without_partitions
139-
):
140-
partition_values = {
141-
"source": "alma",
142-
"run_date": "2024-12-01",
143-
"run_type": "daily",
144-
"action": "index",
145-
"run_id": "000-111-aaa-bbb",
146-
}
147-
_written_files = new_dataset.write(
148-
sample_records_iter_without_partitions(10),
149-
partition_values=partition_values,
150-
)
151-
new_dataset.reload()
152-
153-
# load as pandas dataframe and assert column values
154-
df = new_dataset.dataset.to_table().to_pandas()
155-
row = df.iloc[0]
156-
assert row.source == partition_values["source"]
157-
assert row.run_date == datetime.date(2024, 12, 1)
158-
assert row.run_type == partition_values["run_type"]
159-
assert row.action == partition_values["action"]
160-
assert row.action == partition_values["action"]
161-
162-
163-
def test_dataset_write_schema_partitions_correctly_ordered(
164-
new_dataset, sample_records_iter
165-
):
166-
written_files = new_dataset.write(
167-
sample_records_iter(10),
168-
partition_values={
169-
"source": "alma",
170-
"run_date": "2024-12-01",
171-
"run_type": "daily",
172-
"run_id": "000-111-aaa-bbb",
173-
"action": "index",
174-
},
175-
)
176-
file = written_files[0]
177-
assert (
178-
"/source=alma/run_date=2024-12-01/run_type=daily"
179-
"/run_id=000-111-aaa-bbb/action=index/" in file.path
180-
)
181-
182-
183137
def test_dataset_write_schema_applied_to_dataset(new_dataset, sample_records_iter):
184138
new_dataset.write(sample_records_iter(10))
185139

@@ -199,38 +153,20 @@ def test_dataset_write_partition_deleted_when_written_to_again(
199153
):
200154
"""This tests the existing_data_behavior="delete_matching" configuration when writing
201155
to a dataset."""
202-
partition_values = {
203-
"source": "alma",
204-
"run_date": "2024-12-01",
205-
"run_type": "daily",
206-
"action": "index",
207-
"run_id": "000-111-aaa-bbb",
208-
}
209-
210156
# perform FIRST write to run_date="2024-12-01"
211-
written_files_1 = new_dataset.write(
212-
sample_records_iter(10),
213-
partition_values=partition_values,
214-
)
157+
written_files_1 = new_dataset.write(sample_records_iter(10))
215158

216159
# assert that files from first write are present at this time
217160
assert os.path.exists(written_files_1[0].path)
218161

219162
# perform unrelated write with new run_date to confirm this is untouched during delete
220-
new_partition_values = partition_values.copy()
221-
new_partition_values["run_date"] = "2024-12-15"
222-
new_partition_values["run_id"] = "222-333-ccc-ddd"
223163
written_files_x = new_dataset.write(
224-
sample_records_iter(7),
225-
partition_values=new_partition_values,
164+
generate_sample_records(7, run_date="2024-12-15"),
226165
)
227166

228167
# perform SECOND write to run_date="2024-12-01", expecting this to delete everything
229168
# under this combination of partitions (i.e. the first write)
230-
written_files_2 = new_dataset.write(
231-
sample_records_iter(10),
232-
partition_values=partition_values,
233-
)
169+
written_files_2 = new_dataset.write(sample_records_iter(10))
234170

235171
new_dataset.reload()
236172

@@ -243,18 +179,3 @@ def test_dataset_write_partition_deleted_when_written_to_again(
243179
assert not os.path.exists(written_files_1[0].path)
244180
assert os.path.exists(written_files_2[0].path)
245181
assert os.path.exists(written_files_x[0].path)
246-
247-
248-
def test_dataset_write_missing_partitions_raise_error(new_dataset, sample_records_iter):
249-
missing_partition_values = {
250-
"source": "libguides",
251-
"run_date": None,
252-
"run_type": None,
253-
"action": None,
254-
"run_id": None,
255-
}
256-
with pytest.raises(InvalidDatasetRecordError, match="Partition values are missing"):
257-
_ = new_dataset.write(
258-
sample_records_iter(10),
259-
partition_values=missing_partition_values,
260-
)

tests/utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ def generate_sample_records(
1717
run_type: str | None = "daily",
1818
action: str | None = "index",
1919
run_id: str | None = None,
20+
year: str | int | None = "2024",
21+
month: str | int | None = "12",
22+
day: str | int | None = "1",
2023
) -> Iterator[DatasetRecord]:
2124
"""Generate sample DatasetRecords."""
2225
if not run_id:
@@ -32,6 +35,9 @@ def generate_sample_records(
3235
run_type=run_type,
3336
action=action,
3437
run_id=run_id,
38+
year=year,
39+
month=month,
40+
day=day,
3541
)
3642

3743

timdex_dataset_api/dataset.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
"""timdex_dataset_api/dataset.py"""
22

3-
import datetime
43
import itertools
54
import time
65
import uuid
@@ -30,15 +29,16 @@
3029
pa.field("run_type", pa.string()),
3130
pa.field("run_id", pa.string()),
3231
pa.field("action", pa.string()),
32+
pa.field("year", pa.string()),
33+
pa.field("month", pa.string()),
34+
pa.field("day", pa.string()),
3335
)
3436
)
3537

3638
TIMDEX_DATASET_PARTITION_COLUMNS = [
37-
"source",
38-
"run_date",
39-
"run_type",
40-
"run_id",
41-
"action",
39+
"year",
40+
"month",
41+
"day",
4242
]
4343

4444
DEFAULT_BATCH_SIZE = 1_000
@@ -166,16 +166,12 @@ def write(
166166
self,
167167
records_iter: Iterator["DatasetRecord"],
168168
*,
169-
partition_values: dict[str, str | datetime.datetime] | None = None,
170169
batch_size: int = DEFAULT_BATCH_SIZE,
171170
use_threads: bool = True,
172171
) -> list[ds.WrittenFile]:
173172
"""Write records to the TIMDEX parquet dataset.
174173
175-
This method expects an iterator of DatasetRecord instances, with optional
176-
partition column values that will be applied to all rows written (often, these
177-
are the same for all rows written, eliminating the need to repeat those values
178-
in the iterator).
174+
This method expects an iterator of DatasetRecord instances.
179175
180176
This method encapsulates all dataset writing mechanics and performance
181177
optimizations (e.g. batching) so that the calling context can focus on yielding
@@ -192,7 +188,6 @@ def write(
192188
193189
Args:
194190
- records_iter: Iterator of DatasetRecord instances
195-
- partition_values: dictionary of static partition column name/value pairs
196191
- batch_size: size for batches to yield and write, directly affecting row
197192
group size in final parquet files
198193
- use_threads: boolean if threads should be used for writing
@@ -207,7 +202,6 @@ def write(
207202

208203
record_batches_iter = self.get_dataset_record_batches(
209204
records_iter,
210-
partition_values=partition_values,
211205
batch_size=batch_size,
212206
)
213207

@@ -235,32 +229,24 @@ def get_dataset_record_batches(
235229
self,
236230
records_iter: Iterator["DatasetRecord"],
237231
*,
238-
partition_values: dict[str, str | datetime.datetime] | None = None,
239232
batch_size: int = DEFAULT_BATCH_SIZE,
240233
) -> Iterator[pa.RecordBatch]:
241234
"""Yield pyarrow.RecordBatches for writing.
242235
243-
This method expects an iterator of DatasetRecord instances, with optional
244-
partition column values that will be applied to all rows written (often, these
245-
are the same for all rows written, eliminating the need to repeat those values
246-
in the iterator).
236+
This method expects an iterator of DatasetRecord instances.
247237
248238
Each DatasetRecord is validated and serialized to a dictionary before added to a
249239
pyarrow.RecordBatch for writing.
250240
251241
Args:
252242
- records_iter: Iterator of DatasetRecord instances
253-
- partition_values: dictionary of static partition column name/value pairs
254243
- batch_size: size for batches to yield and write, directly affecting row
255244
group size in final parquet files
256245
"""
257246
for i, record_batch in enumerate(itertools.batched(records_iter, batch_size)):
258247
batch_start_time = time.perf_counter()
259248
batch = pa.RecordBatch.from_pylist(
260-
[
261-
record.to_dict(partition_values=partition_values)
262-
for record in record_batch
263-
]
249+
[record.to_dict() for record in record_batch]
264250
)
265251
logger.debug(
266252
f"Batch {i + 1} yielded for writing, "

0 commit comments

Comments
 (0)