Skip to content

Commit ff2aff0

Browse files
committed
Remove current records functionality in TIMDEXDataset
Why these changes are being introduced: While the TIMDEXDatasetMetadata class is rebuilt, TIMDEXDataset itself can no longer provide "current" records from the dataaset as it has no metadata to work with. This is temporary until TIMDEXDatasetMetadata is rebuilt, and TIMDEXDataset gets new functionality based on *that* new metadata. How this addresses that need: * Any reference to "current records" is removed Side effects of this change: * TIMDEXDataset cannot provide current records Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-530
1 parent 2fd2218 commit ff2aff0

2 files changed

Lines changed: 11 additions & 80 deletions

File tree

migrations/002_2025_06_25_consistent_run_timestamp_per_etl_run.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
# ruff: noqa: BLE001, D212, TRY300, TRY400
1+
# ruff: noqa: PGH004
2+
# ruff: noqa
3+
# type: ignore
4+
25
"""
36
Date: 2025-06-25
47
@@ -29,6 +32,10 @@
2932
pipenv run python migrations/002_2025_06_25_consistent_run_timestamp_per_etl_run.py \
3033
<DATASET_LOCATION> \
3134
--dry-run
35+
36+
Update: 2025-08-04
37+
38+
This migration is no longer functional given changes to TIMDEXDataset.
3239
"""
3340

3441
import argparse

timdex_dataset_api/dataset.py

Lines changed: 3 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020

2121
from timdex_dataset_api.config import configure_logger
2222
from timdex_dataset_api.exceptions import DatasetNotLoadedError
23-
from timdex_dataset_api.metadata import TIMDEXDatasetMetadata
2423

2524
if TYPE_CHECKING:
2625
from timdex_dataset_api.record import DatasetRecord # pragma: nocover
2726

27+
2828
logger = configure_logger(__name__)
2929

3030
TIMDEX_DATASET_SCHEMA = pa.schema(
@@ -126,10 +126,6 @@ def __init__(
126126
# writing
127127
self._written_files: list[ds.WrittenFile] = None # type: ignore[assignment]
128128

129-
# reading
130-
self._current_records: bool = False
131-
self.metadata: TIMDEXDatasetMetadata = None # type: ignore[assignment]
132-
133129
@property
134130
def row_count(self) -> int:
135131
"""Get row count from loaded dataset."""
@@ -139,8 +135,6 @@ def row_count(self) -> int:
139135

140136
def load(
141137
self,
142-
*,
143-
current_records: bool = False,
144138
**filters: Unpack[DatasetFilters],
145139
) -> None:
146140
"""Lazy load a pyarrow.dataset.Dataset and set to self.dataset.
@@ -161,21 +155,12 @@ def load(
161155
- filters: kwargs typed via DatasetFilters TypedDict
162156
- Filters passed directly in method call, e.g. source="alma",
163157
run_date="2024-12-20", etc., but are typed according to DatasetFilters.
164-
- current_records: bool
165-
- if True, all records yielded from this instance will be the current
166-
version of the record in the dataset.
167158
"""
168159
start_time = time.perf_counter()
169160

170161
# reset paths from original location before load
171162
_, self.paths = self.parse_location(self.location)
172163

173-
# read dataset metadata if only current records are requested
174-
self._current_records = current_records
175-
if current_records:
176-
self.metadata = TIMDEXDatasetMetadata(timdex_dataset=self)
177-
self.paths = self.metadata.get_current_parquet_files(**filters)
178-
179164
# perform initial load of full dataset
180165
self.dataset = self._load_pyarrow_dataset()
181166

@@ -465,10 +450,6 @@ def read_batches_iter(
465450
While batch_size will limit the max rows per batch, filtering may result in some
466451
batches having less than this limit.
467452
468-
If the flag self._current_records is set, this method leans on
469-
self._yield_current_record_deduped_batches() to apply deduplication of records to
470-
ensure only current versions of the record are ever yielded.
471-
472453
Args:
473454
- columns: list[str], list of columns to return from the dataset
474455
- filters: pairs of column:value to filter the dataset
@@ -479,73 +460,16 @@ def read_batches_iter(
479460
)
480461
dataset = self._get_filtered_dataset(**filters)
481462

482-
# if current records, add required columns for deduplication
483-
if self._current_records:
484-
if not columns:
485-
columns = TIMDEX_DATASET_SCHEMA.names
486-
columns.extend(["timdex_record_id", "run_id"])
487-
columns = list(set(columns))
488-
489463
batches = dataset.to_batches(
490464
columns=columns,
491465
batch_size=self.config.read_batch_size,
492466
batch_readahead=self.config.batch_read_ahead,
493467
fragment_readahead=self.config.fragment_read_ahead,
494468
)
495469

496-
if self._current_records:
497-
yield from self._yield_current_record_batches(batches, **filters)
498-
else:
499-
for batch in batches:
500-
if len(batch) > 0:
501-
yield batch
502-
503-
def _yield_current_record_batches(
504-
self,
505-
batches: Iterator[pa.RecordBatch],
506-
**filters: Unpack[DatasetFilters],
507-
) -> Iterator[pa.RecordBatch]:
508-
"""Method to yield only the most recent version of each record.
509-
510-
When multiple versions of a record (same timdex_record_id) exist in the dataset,
511-
this method ensures only the most recent version is returned. If filtering is
512-
applied that removes this most recent version of a record, that timdex_record_id
513-
will not be yielded at all.
514-
515-
This method uses TIMDEXDatasetMetadata to provide a mapping of timdex_record_id to
516-
run_id for the current ETL run for that record. While yielding records, only when
517-
the timdex_record_id + run_id match the mapping is a record yielded.
518-
519-
Args:
520-
- batches: batches of records to actually yield from
521-
- filters: pairs of column:value to filter the dataset metadata required
522-
"""
523-
# get map of timdex_record_id to run_id for current version of that record
524-
record_to_run_map = self.metadata.get_current_record_to_run_map(**filters)
525-
526-
# loop through batches, yielding only current records
527470
for batch in batches:
528-
529-
if batch.num_rows == 0:
530-
continue
531-
532-
to_yield_indices = []
533-
534-
record_ids = batch.column("timdex_record_id").to_pylist()
535-
run_ids = batch.column("run_id").to_pylist()
536-
537-
for i, (record_id, run_id) in enumerate(
538-
zip(
539-
record_ids,
540-
run_ids,
541-
strict=True,
542-
)
543-
):
544-
if record_to_run_map.get(record_id) == run_id:
545-
to_yield_indices.append(i)
546-
547-
if to_yield_indices:
548-
yield batch.take(pa.array(to_yield_indices)) # type: ignore[arg-type]
471+
if len(batch) > 0:
472+
yield batch
549473

550474
def read_dataframes_iter(
551475
self,

0 commit comments

Comments
 (0)