Skip to content

Commit b346fb3

Browse files
committed
Validate DatasetRecord during serialization
Why these changes are being introduced: When a DatasetRecord is serialized for writing, it contains the dataset partitions that the record will fall under. We do not want any records getting written to the dataset without all required partitions. Additionally, we may want to add future validations to the record, e.g. ensuring the transformed record is valid JSON. This provides a place for that. How this addresses that need: * Adds new DatasetRecord.validate() method, and applies method during to_dict() serialization. Side effects of this change: * Records cannot be written to dataset without all required partitions Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-415
1 parent 7759a2f commit b346fb3

4 files changed

Lines changed: 77 additions & 13 deletions

File tree

tests/test_dataset_write.py

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,23 @@
1313
DatasetNotLoadedError,
1414
TIMDEXDataset,
1515
)
16+
from timdex_dataset_api.exceptions import InvalidDatasetRecordError
1617
from timdex_dataset_api.record import DatasetRecord
1718

1819

1920
def test_dataset_record_serialization():
20-
dataset_record = DatasetRecord(
21-
timdex_record_id="alma:123",
22-
source_record=b"<record><title>Hello World.</title></record>",
23-
transformed_record=b"""{"title":["Hello World."]}""",
24-
)
25-
assert dataset_record.to_dict() == {
21+
values = {
2622
"timdex_record_id": "alma:123",
2723
"source_record": b"<record><title>Hello World.</title></record>",
2824
"transformed_record": b"""{"title":["Hello World."]}""",
29-
"source": None,
30-
"run_date": None,
31-
"run_type": None,
32-
"action": None,
33-
"run_id": None,
25+
"source": "libguides",
26+
"run_date": "2024-12-01",
27+
"run_type": "full",
28+
"action": "index",
29+
"run_id": "abc123",
3430
}
31+
dataset_record = DatasetRecord(**values)
32+
assert dataset_record.to_dict() == values
3533

3634

3735
def test_dataset_record_serialization_with_partition_values_provided():
@@ -59,6 +57,25 @@ def test_dataset_record_serialization_with_partition_values_provided():
5957
}
6058

6159

60+
def test_dataset_record_serialization_missing_partition_raise_error():
61+
values = {
62+
"timdex_record_id": "alma:123",
63+
"source_record": b"<record><title>Hello World.</title></record>",
64+
"transformed_record": b"""{"title":["Hello World."]}""",
65+
"source": "libguides",
66+
"run_date": "2024-12-01",
67+
"run_type": "full",
68+
"action": "index",
69+
"run_id": None, # <------ missing partition here
70+
}
71+
dataset_record = DatasetRecord(**values)
72+
with pytest.raises(
73+
InvalidDatasetRecordError,
74+
match="Partition values are missing: run_id",
75+
):
76+
assert dataset_record.to_dict() == values
77+
78+
6279
def test_dataset_write_records_to_new_dataset(new_dataset, sample_records_iter):
6380
files_written = new_dataset.write(sample_records_iter(10_000))
6481
assert len(files_written) == 1
@@ -226,3 +243,18 @@ def test_dataset_write_partition_deleted_when_written_to_again(
226243
assert not os.path.exists(written_files_1[0].path)
227244
assert os.path.exists(written_files_2[0].path)
228245
assert os.path.exists(written_files_x[0].path)
246+
247+
248+
def test_dataset_write_missing_partitions_raise_error(new_dataset, sample_records_iter):
249+
missing_partition_values = {
250+
"source": "libguides",
251+
"run_date": None,
252+
"run_type": None,
253+
"action": None,
254+
"run_id": None,
255+
}
256+
with pytest.raises(InvalidDatasetRecordError, match="Partition values are missing"):
257+
_ = new_dataset.write(
258+
sample_records_iter(10),
259+
partition_values=missing_partition_values,
260+
)

timdex_dataset_api/dataset.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ def write(
180180
optimizations (e.g. batching) so that the calling context can focus on yielding
181181
data.
182182
183+
For write, the configuration existing_data_behavior="delete_matching" is used.
184+
This means that during write, if any pre-existing files are found for the exact
185+
combinations of partitions for that batch, those pre-existing files will be
186+
deleted. This effectively makes a write idempotent to the TIMDEX dataset.
187+
188+
A max_open_files=500 configuration is set to avoid AWS S3 503 error "SLOW_DOWN"
189+
if too many PutObject calls are made in parallel. Testing suggests this does not
190+
substantially slow down the overall write.
191+
183192
Args:
184193
- records_iter: Iterator of DatasetRecord instances
185194
- partition_values: dictionary of static partition column name/value pairs
@@ -209,7 +218,7 @@ def write(
209218
filesystem=self.filesystem,
210219
file_visitor=lambda written_file: written_files.append(written_file),
211220
format="parquet",
212-
max_open_files=500, # avoids S3 503 "SLOW_DOWN" error for PutObject requests
221+
max_open_files=500,
213222
max_rows_per_file=MAX_ROWS_PER_FILE,
214223
max_rows_per_group=MAX_ROWS_PER_GROUP,
215224
partitioning=self.partition_columns,
@@ -235,7 +244,7 @@ def get_dataset_record_batches(
235244
are the same for all rows written, eliminating the need to repeat those values
236245
in the iterator).
237246
238-
Each DatasetRecord is serialized to a dictionary before added to a
247+
Each DatasetRecord is validated and serialized to a dictionary before added to a
239248
pyarrow.RecordBatch for writing.
240249
241250
Args:

timdex_dataset_api/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,7 @@
33

44
class DatasetNotLoadedError(Exception):
55
"""Custom exception for accessing methods requiring a loaded dataset."""
6+
7+
8+
class InvalidDatasetRecordError(Exception):
9+
"""Custom exception for invalid DatasetRecord instances."""

timdex_dataset_api/record.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import datetime
44
from dataclasses import asdict, dataclass
55

6+
from timdex_dataset_api.exceptions import InvalidDatasetRecordError
7+
68

79
@dataclass
810
class DatasetRecord:
@@ -27,10 +29,27 @@ class DatasetRecord:
2729

2830
def to_dict(
2931
self,
32+
*,
3033
partition_values: dict[str, str | datetime.datetime] | None = None,
34+
validate: bool = True,
3135
) -> dict:
3236
"""Serialize instance as dictionary, setting partition values if passed."""
3337
if partition_values:
3438
for key, value in partition_values.items():
3539
setattr(self, key, value)
40+
if validate:
41+
self.validate()
3642
return asdict(self)
43+
44+
def validate(self) -> None:
45+
"""Validate DatasetRecord for writing."""
46+
# ensure all partition columns are set
47+
missing_partition_values = [
48+
field
49+
for field in ["source", "run_date", "run_type", "action", "run_id"]
50+
if getattr(self, field) is None
51+
]
52+
if missing_partition_values:
53+
raise InvalidDatasetRecordError(
54+
f"Partition values are missing: {', '.join(missing_partition_values)}"
55+
)

0 commit comments

Comments
 (0)