Skip to content

Commit f584a1d

Browse files
committed
TIMDEXRunManager for producing ETL run metadata
Why these changes are being introduced: One of the challenges the architecture of the TIMDEX parquet dataset presents is quick and easy metadata about ETL "runs" in the dataset. The year/month/day partitioning structure is very efficient for accessing a run if you know the date, where only a few parquet files are scanned, but it's not geared towards quickly isolating runs (parquet files) associated with a given source. Having metadata about runs provides a map to efficiently access meaningful subsets of data. One example would be fully refreshing a source in Opensearch. To do, you'd want to access all runs for a given source since, and including, the last run_type=full run. Those runs represent the current state of the source in TIMDEX. Unfortunately, this is not terribly efficient to naively perform with pyarrow or DuckDB, where potentially thousands of parquet files are touched. Similar to how Apache Iceberg (a parquet dataset architecture) works, we need some metadata about each "run" in the dataset which correlates to parquet file(s). How this addresses that need: A new class TIMDEXRunManager exists to provide this functionality. This class will produce a pandas dataframe of metadata about all runs in the dataset, including the explicit parquet filepath the run is associated with, in a highly efficient and parallelized way. The is achieved by: 1. Getting a list of all parquet files from the dataset. 2. Reading the *first* row from each file, which contains metadata about the run that produced the file. 3. Aggregating the results and grouping by "run_id". The result is a dataframe that provides a precise map of run metadata to parquet files in the dataset. With those parquet files identified, this unblocks further functionality for this library like "replaying" the runs for a given source in chronological order to refresh it in Opensearch. Side effects of this change: * None. No changes are made to pre-existing functionality, just the addition of this new information gathering class. Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-493 * https://mitlibraries.atlassian.net/browse/TIMX-494
1 parent 3f97353 commit f584a1d

3 files changed

Lines changed: 311 additions & 0 deletions

File tree

tests/conftest.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
# ruff: noqa: D205, D209
44

5+
import os
56

67
import pytest
78

@@ -10,6 +11,7 @@
1011
generate_sample_records_with_simulated_partitions,
1112
)
1213
from timdex_dataset_api import TIMDEXDataset
14+
from timdex_dataset_api.dataset import TIMDEXDatasetConfig
1315

1416

1517
@pytest.fixture(autouse=True)
@@ -90,3 +92,58 @@ def _records_iter(num_records):
9092
)
9193

9294
return _records_iter
95+
96+
97+
@pytest.fixture
98+
def dataset_with_runs_location(tmp_path) -> str:
99+
"""Fixture to simulate a dataset with multiple full and daily ETL runs."""
100+
location = str(tmp_path / "dataset_with_runs")
101+
os.mkdir(location)
102+
103+
timdex_dataset = TIMDEXDataset(
104+
location, config=TIMDEXDatasetConfig(max_rows_per_group=75, max_rows_per_file=75)
105+
)
106+
timdex_dataset.load()
107+
108+
run_params = []
109+
110+
# simulate ETL runs for 'alma'
111+
run_params.extend(
112+
[
113+
(40, "alma", "2024-12-01", "full", "index", "run-1"),
114+
(20, "alma", "2024-12-15", "daily", "index", "run-2"),
115+
(100, "alma", "2025-01-01", "full", "index", "run-3"),
116+
(50, "alma", "2025-01-02", "daily", "index", "run-4"),
117+
(25, "alma", "2025-01-03", "daily", "index", "run-5"),
118+
(10, "alma", "2025-01-04", "daily", "delete", "run-6"),
119+
(9, "alma", "2025-01-05", "daily", "index", "run-7"),
120+
]
121+
)
122+
123+
# simulate ETL runs for 'alma'
124+
run_params.extend(
125+
[
126+
(30, "dspace", "2024-12-02", "full", "index", "run-8"),
127+
(10, "dspace", "2024-12-16", "daily", "index", "run-9"),
128+
(90, "dspace", "2025-02-01", "full", "index", "run-10"),
129+
(40, "dspace", "2025-02-02", "daily", "index", "run-11"),
130+
(15, "dspace", "2025-02-03", "daily", "index", "run-12"),
131+
(5, "dspace", "2025-02-04", "daily", "delete", "run-13"),
132+
(4, "dspace", "2025-02-05", "daily", "index", "run-14"),
133+
]
134+
)
135+
136+
# write to dataset
137+
for params in run_params:
138+
num_records, source, run_date, run_type, action, run_id = params
139+
records = generate_sample_records(
140+
num_records,
141+
source=source,
142+
run_date=run_date,
143+
run_type=run_type,
144+
action=action,
145+
run_id=run_id,
146+
)
147+
timdex_dataset.write(records)
148+
149+
return location

tests/test_runs.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# ruff: noqa: SLF001, D205, D209, PLR2004
2+
3+
import datetime
4+
from unittest.mock import patch
5+
6+
import pytest
7+
8+
from timdex_dataset_api import TIMDEXDataset
9+
from timdex_dataset_api.run import TIMDEXRunManager
10+
11+
12+
@pytest.fixture
13+
def timdex_run_manager(dataset_with_runs_location):
14+
timdex_dataset = TIMDEXDataset(dataset_with_runs_location)
15+
return TIMDEXRunManager(timdex_dataset=timdex_dataset)
16+
17+
18+
def test_timdex_run_manager_init(dataset_with_runs_location):
19+
timdex_dataset = TIMDEXDataset(dataset_with_runs_location)
20+
timdex_run_manager = TIMDEXRunManager(timdex_dataset=timdex_dataset)
21+
assert timdex_run_manager._runs_metadata_cache is None
22+
23+
24+
def test_timdex_run_manager_parse_single_parquet_file_success(timdex_run_manager):
25+
"""Parse run metadata from first parquet file in fixture dataset. We know the details
26+
of this ETL run in advance given the deterministic fixture that generated it."""
27+
parquet_filepath = timdex_run_manager.timdex_dataset.dataset.files[0]
28+
run_metadata = timdex_run_manager.parse_run_metadata_from_parquet_file(
29+
parquet_filepath
30+
)
31+
assert run_metadata["source"] == "alma"
32+
assert run_metadata["run_date"] == datetime.date(2024, 12, 1)
33+
assert run_metadata["run_type"] == "full"
34+
assert run_metadata["run_id"] == "run-1"
35+
assert run_metadata["num_rows"] == 40
36+
assert run_metadata["filename"] == parquet_filepath
37+
38+
39+
def test_timdex_run_manager_parse_multiple_parquet_files(timdex_run_manager):
40+
parquet_metadata_df = timdex_run_manager.get_parquet_files_run_metadata()
41+
42+
# assert 16 rows for this per-file dataframe, despite only 14 distinct ETL "runs"
43+
assert len(parquet_metadata_df) == 16
44+
45+
# assert each source has metadata for 8 parquet files
46+
assert parquet_metadata_df.source.value_counts().to_dict() == {"alma": 8, "dspace": 8}
47+
48+
49+
def test_timdex_run_manager_get_runs_df(timdex_run_manager):
50+
runs_df = timdex_run_manager.get_runs_metadata()
51+
52+
# assert two "large" runs have multiple parquet files
53+
assert len(runs_df[runs_df.parquet_files_count > 1]) == 2
54+
55+
# assert 7 distinct runs per source, despite more parquet files
56+
assert runs_df.source.value_counts().to_dict() == {"alma": 7, "dspace": 7}
57+
58+
59+
def test_timdex_run_manager_get_source_current_run_parquet_files_success(
60+
timdex_run_manager,
61+
):
62+
ordered_parquet_files = timdex_run_manager.get_current_source_parquet_files("alma")
63+
64+
# assert 6 parquet files, despite being 8 total for alma
65+
# this represents the last full run and all daily since
66+
assert len(ordered_parquet_files)
67+
68+
# assert sorted reverse chronologically
69+
assert "year=2025/month=01/day=05" in ordered_parquet_files[0]
70+
assert "year=2025/month=01/day=01" in ordered_parquet_files[-1]
71+
72+
73+
def test_timdex_run_manager_caches_runs_dataframe(timdex_run_manager):
74+
runs_df = timdex_run_manager.get_runs_metadata()
75+
assert timdex_run_manager._runs_metadata_cache is not None
76+
77+
with patch.object(
78+
timdex_run_manager, "get_parquet_files_run_metadata"
79+
) as mocked_intermediate_method:
80+
mocked_intermediate_method.side_effect = Exception(
81+
"I am not reached, cache is used."
82+
)
83+
runs_df_2 = timdex_run_manager.get_runs_metadata()
84+
85+
assert runs_df.equals(runs_df_2)

timdex_dataset_api/run.py

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
"""timdex_dataset_api/run.py"""
2+
3+
import concurrent.futures
4+
import logging
5+
import time
6+
from typing import TYPE_CHECKING
7+
8+
import pandas as pd
9+
import pyarrow.parquet as pq
10+
11+
if TYPE_CHECKING:
12+
from timdex_dataset_api.dataset import TIMDEXDataset
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class TIMDEXRunManager:
18+
"""Manages and provides access to ETL run metadata from the TIMDEX parquet dataset."""
19+
20+
def __init__(self, timdex_dataset: "TIMDEXDataset"):
21+
self.timdex_dataset: TIMDEXDataset = timdex_dataset
22+
if self.timdex_dataset.dataset is None:
23+
self.timdex_dataset.load()
24+
25+
self._runs_metadata_cache: pd.DataFrame | None = None
26+
27+
def clear_cache(self) -> None:
28+
self._runs_metadata_cache = None
29+
30+
def parse_run_metadata_from_parquet_file(self, parquet_filepath: str) -> dict:
31+
"""Parse source, run_date, run_type, and run_id from a single Parquet file.
32+
33+
Args:
34+
parquet_filepath: Path to the parquet file
35+
"""
36+
parquet_file = pq.ParquetFile(
37+
parquet_filepath,
38+
filesystem=self.timdex_dataset.filesystem, # type: ignore[union-attr]
39+
)
40+
file_meta = parquet_file.metadata.to_dict()
41+
num_rows = file_meta["num_rows"]
42+
columns_meta = file_meta["row_groups"][0]["columns"] # type: ignore[typeddict-item]
43+
source = columns_meta[3]["statistics"]["max"]
44+
run_date = columns_meta[4]["statistics"]["max"]
45+
run_type = columns_meta[5]["statistics"]["max"]
46+
run_id = columns_meta[7]["statistics"]["max"]
47+
48+
return {
49+
"source": source,
50+
"run_date": run_date,
51+
"run_type": run_type,
52+
"run_id": run_id,
53+
"num_rows": num_rows,
54+
"filename": parquet_filepath,
55+
}
56+
57+
def get_parquet_files_run_metadata(self, max_workers: int = 250) -> pd.DataFrame:
58+
"""Retrieve run metadata from parquet file(s) in dataset.
59+
60+
A single ETL run may still be spread across multiple Parquet files making this
61+
data ungrouped by run.
62+
63+
Args:
64+
max_workers: Maximum number of parallel workers for processing
65+
- a high number is generally safe given the lightweight nature of the
66+
thread's work, just reading a few parquet file header bytes
67+
"""
68+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
69+
futures = []
70+
for parquet_filepath in self.timdex_dataset.dataset.files: # type: ignore[attr-defined]
71+
future = executor.submit(
72+
self.parse_run_metadata_from_parquet_file,
73+
parquet_filepath,
74+
)
75+
futures.append(future)
76+
77+
done, not_done = concurrent.futures.wait(
78+
futures, return_when=concurrent.futures.ALL_COMPLETED
79+
)
80+
81+
results = []
82+
for future in done:
83+
try:
84+
if result := future.result():
85+
results.append(result)
86+
except Exception:
87+
logger.exception("Error reading run metadata from parquet file.")
88+
89+
return pd.DataFrame(results) if results else pd.DataFrame()
90+
91+
def get_runs_metadata(self, *, refresh: bool = False) -> pd.DataFrame:
92+
"""Get metadata for all runs in dataset, grouped by run_id.
93+
94+
Args:
95+
refresh: If True, force refresh of cached metadata
96+
"""
97+
start_time = time.perf_counter()
98+
99+
if self._runs_metadata_cache is not None and not refresh:
100+
return self._runs_metadata_cache
101+
102+
ungrouped_runs_df = self.get_parquet_files_run_metadata()
103+
if ungrouped_runs_df.empty:
104+
return ungrouped_runs_df
105+
106+
# group by run_id
107+
grouped_runs_df = (
108+
ungrouped_runs_df.groupby("run_id")
109+
.agg(
110+
{
111+
"source": "first",
112+
"run_date": "first",
113+
"run_type": "first",
114+
"num_rows": "sum",
115+
"filename": list,
116+
}
117+
)
118+
.reset_index()
119+
)
120+
121+
# add additional metadata
122+
grouped_runs_df = grouped_runs_df.rename(columns={"filename": "parquet_files"})
123+
grouped_runs_df["parquet_files_count"] = grouped_runs_df["parquet_files"].apply(
124+
lambda x: len(x)
125+
)
126+
127+
# sort by run date and source
128+
grouped_runs_df = grouped_runs_df.sort_values(
129+
["run_date", "source"], ascending=False
130+
)
131+
132+
# cache the result
133+
self._runs_metadata_cache = grouped_runs_df
134+
135+
logger.info(
136+
f"Dataset runs metadata retrieved, elapsed: "
137+
f"{round(time.perf_counter() - start_time, 2)}s, runs: {len(grouped_runs_df)}"
138+
)
139+
return grouped_runs_df
140+
141+
def get_current_source_parquet_files(self, source: str) -> list[str]:
142+
"""Get reverse chronological list of current parquet files for a source.
143+
144+
Args:
145+
source: The source identifier to filter runs
146+
"""
147+
runs_df = self.get_runs_metadata()
148+
source_runs_df = runs_df[runs_df.source == source].copy()
149+
150+
# get last "full" run
151+
full_runs_df = source_runs_df[source_runs_df.run_type == "full"]
152+
if len(full_runs_df) == 0:
153+
raise RuntimeError(
154+
f"Could not find the most recent 'full' run for source: '{source}'"
155+
)
156+
last_full_run = full_runs_df.iloc[0]
157+
158+
# get all "daily" runs since
159+
daily_runs_df = source_runs_df[
160+
(source_runs_df.run_type == "daily")
161+
& (source_runs_df.run_date >= last_full_run.run_date)
162+
]
163+
164+
ordered_parquet_files = []
165+
for _, daily_run in daily_runs_df.iterrows():
166+
ordered_parquet_files.extend(daily_run.parquet_files)
167+
ordered_parquet_files.extend(last_full_run.parquet_files)
168+
169+
return ordered_parquet_files

0 commit comments

Comments
 (0)