Skip to content

Commit 0004a65

Browse files
authored
Merge pull request #143 from MITLibraries/TIMX-494-run-metadata
TIMX 494 - TIMDEXRunManager for producing ETL run metadata
2 parents 3f97353 + bd3b937 commit 0004a65

3 files changed

Lines changed: 328 additions & 0 deletions

File tree

tests/conftest.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
# ruff: noqa: D205, D209
44

5+
import os
56

67
import pytest
78

@@ -10,6 +11,7 @@
1011
generate_sample_records_with_simulated_partitions,
1112
)
1213
from timdex_dataset_api import TIMDEXDataset
14+
from timdex_dataset_api.dataset import TIMDEXDatasetConfig
1315

1416

1517
@pytest.fixture(autouse=True)
@@ -90,3 +92,58 @@ def _records_iter(num_records):
9092
)
9193

9294
return _records_iter
95+
96+
97+
@pytest.fixture
98+
def dataset_with_runs_location(tmp_path) -> str:
99+
"""Fixture to simulate a dataset with multiple full and daily ETL runs."""
100+
location = str(tmp_path / "dataset_with_runs")
101+
os.mkdir(location)
102+
103+
timdex_dataset = TIMDEXDataset(
104+
location, config=TIMDEXDatasetConfig(max_rows_per_group=75, max_rows_per_file=75)
105+
)
106+
timdex_dataset.load()
107+
108+
run_params = []
109+
110+
# simulate ETL runs for 'alma'
111+
run_params.extend(
112+
[
113+
(40, "alma", "2024-12-01", "full", "index", "run-1"),
114+
(20, "alma", "2024-12-15", "daily", "index", "run-2"),
115+
(100, "alma", "2025-01-01", "full", "index", "run-3"),
116+
(50, "alma", "2025-01-02", "daily", "index", "run-4"),
117+
(25, "alma", "2025-01-03", "daily", "index", "run-5"),
118+
(10, "alma", "2025-01-04", "daily", "delete", "run-6"),
119+
(9, "alma", "2025-01-05", "daily", "index", "run-7"),
120+
]
121+
)
122+
123+
# simulate ETL runs for 'dspace'
124+
run_params.extend(
125+
[
126+
(30, "dspace", "2024-12-02", "full", "index", "run-8"),
127+
(10, "dspace", "2024-12-16", "daily", "index", "run-9"),
128+
(90, "dspace", "2025-02-01", "full", "index", "run-10"),
129+
(40, "dspace", "2025-02-02", "daily", "index", "run-11"),
130+
(15, "dspace", "2025-02-03", "daily", "index", "run-12"),
131+
(5, "dspace", "2025-02-04", "daily", "delete", "run-13"),
132+
(4, "dspace", "2025-02-05", "daily", "index", "run-14"),
133+
]
134+
)
135+
136+
# write to dataset
137+
for params in run_params:
138+
num_records, source, run_date, run_type, action, run_id = params
139+
records = generate_sample_records(
140+
num_records,
141+
source=source,
142+
run_date=run_date,
143+
run_type=run_type,
144+
action=action,
145+
run_id=run_id,
146+
)
147+
timdex_dataset.write(records)
148+
149+
return location

tests/test_runs.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# ruff: noqa: SLF001, D205, D209, PLR2004
2+
3+
import datetime
4+
from unittest.mock import patch
5+
6+
import pytest
7+
8+
from timdex_dataset_api import TIMDEXDataset
9+
from timdex_dataset_api.run import TIMDEXRunManager
10+
11+
12+
@pytest.fixture
13+
def timdex_run_manager(dataset_with_runs_location):
14+
timdex_dataset = TIMDEXDataset(dataset_with_runs_location)
15+
return TIMDEXRunManager(timdex_dataset=timdex_dataset)
16+
17+
18+
def test_timdex_run_manager_init(dataset_with_runs_location):
19+
timdex_dataset = TIMDEXDataset(dataset_with_runs_location)
20+
timdex_run_manager = TIMDEXRunManager(timdex_dataset=timdex_dataset)
21+
assert timdex_run_manager._runs_metadata_cache is None
22+
23+
24+
def test_timdex_run_manager_parse_single_parquet_file_success(timdex_run_manager):
25+
"""Parse run metadata from first parquet file in fixture dataset. We know the details
26+
of this ETL run in advance given the deterministic fixture that generated it."""
27+
parquet_filepath = timdex_run_manager.timdex_dataset.dataset.files[0]
28+
run_metadata = timdex_run_manager._parse_run_metadata_from_parquet_file(
29+
parquet_filepath
30+
)
31+
assert run_metadata["source"] == "alma"
32+
assert run_metadata["run_date"] == datetime.date(2024, 12, 1)
33+
assert run_metadata["run_type"] == "full"
34+
assert run_metadata["run_id"] == "run-1"
35+
assert run_metadata["num_rows"] == 40
36+
assert run_metadata["filename"] == parquet_filepath
37+
38+
39+
def test_timdex_run_manager_parse_multiple_parquet_files(timdex_run_manager):
40+
parquet_metadata_df = timdex_run_manager._get_parquet_files_run_metadata()
41+
42+
# assert 16 rows for this per-file dataframe, despite only 14 distinct ETL "runs"
43+
assert len(parquet_metadata_df) == 16
44+
45+
# assert each source has metadata for 8 parquet files
46+
assert parquet_metadata_df.source.value_counts().to_dict() == {"alma": 8, "dspace": 8}
47+
48+
49+
def test_timdex_run_manager_get_runs_df(timdex_run_manager):
50+
runs_df = timdex_run_manager.get_runs_metadata()
51+
52+
# assert two "large" runs have multiple parquet files
53+
assert len(runs_df[runs_df.parquet_files_count > 1]) == 2
54+
55+
# assert 7 distinct runs per source, despite more parquet files
56+
assert runs_df.source.value_counts().to_dict() == {"alma": 7, "dspace": 7}
57+
58+
59+
def test_timdex_run_manager_get_source_current_run_parquet_files_success(
60+
timdex_run_manager,
61+
):
62+
ordered_parquet_files = timdex_run_manager.get_current_source_parquet_files("alma")
63+
64+
# assert 6 parquet files, despite being 8 total for alma
65+
# this represents the last full run and all daily since
66+
assert len(ordered_parquet_files)
67+
68+
# assert sorted reverse chronologically
69+
assert "year=2025/month=01/day=05" in ordered_parquet_files[0]
70+
assert "year=2025/month=01/day=01" in ordered_parquet_files[-1]
71+
72+
73+
def test_timdex_run_manager_caches_runs_dataframe(timdex_run_manager):
74+
runs_df = timdex_run_manager.get_runs_metadata()
75+
assert timdex_run_manager._runs_metadata_cache is not None
76+
77+
with patch.object(
78+
timdex_run_manager, "_get_parquet_files_run_metadata"
79+
) as mocked_intermediate_method:
80+
mocked_intermediate_method.side_effect = Exception(
81+
"I am not reached, cache is used."
82+
)
83+
runs_df_2 = timdex_run_manager.get_runs_metadata()
84+
85+
assert runs_df.equals(runs_df_2)

timdex_dataset_api/run.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
"""timdex_dataset_api/run.py"""
2+
3+
import concurrent.futures
4+
import logging
5+
import time
6+
from typing import TYPE_CHECKING
7+
8+
import pandas as pd
9+
import pyarrow.parquet as pq
10+
11+
if TYPE_CHECKING:
12+
from timdex_dataset_api.dataset import TIMDEXDataset
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class TIMDEXRunManager:
18+
"""Manages and provides access to ETL run metadata from the TIMDEX parquet dataset."""
19+
20+
def __init__(self, timdex_dataset: "TIMDEXDataset"):
21+
self.timdex_dataset: TIMDEXDataset = timdex_dataset
22+
if self.timdex_dataset.dataset is None:
23+
self.timdex_dataset.load()
24+
25+
self._runs_metadata_cache: pd.DataFrame | None = None
26+
27+
def clear_cache(self) -> None:
28+
self._runs_metadata_cache = None
29+
30+
def get_runs_metadata(self, *, refresh: bool = False) -> pd.DataFrame:
31+
"""Get metadata for all runs in dataset, grouped by run_id.
32+
33+
The dataframe returned includes the following columns:
34+
- source
35+
- run_date
36+
- run_type
37+
- run_id
38+
- num_rows: total number of records for that run_id
39+
- parquet_files: list of parquet file(s) that are associated with that run
40+
41+
Args:
42+
refresh: If True, force refresh of cached metadata
43+
"""
44+
start_time = time.perf_counter()
45+
46+
if self._runs_metadata_cache is not None and not refresh:
47+
return self._runs_metadata_cache
48+
49+
ungrouped_runs_df = self._get_parquet_files_run_metadata()
50+
if ungrouped_runs_df.empty:
51+
return ungrouped_runs_df
52+
53+
# group by run_id
54+
grouped_runs_df = (
55+
ungrouped_runs_df.groupby("run_id")
56+
.agg(
57+
{
58+
"source": "first",
59+
"run_date": "first",
60+
"run_type": "first",
61+
"num_rows": "sum",
62+
"filename": list,
63+
}
64+
)
65+
.reset_index()
66+
)
67+
68+
# add additional metadata
69+
grouped_runs_df = grouped_runs_df.rename(columns={"filename": "parquet_files"})
70+
grouped_runs_df["parquet_files_count"] = grouped_runs_df["parquet_files"].apply(
71+
lambda x: len(x)
72+
)
73+
74+
# sort by run date and source
75+
grouped_runs_df = grouped_runs_df.sort_values(
76+
["run_date", "source"], ascending=False
77+
)
78+
79+
# cache the result
80+
self._runs_metadata_cache = grouped_runs_df
81+
82+
logger.info(
83+
f"Dataset runs metadata retrieved, elapsed: "
84+
f"{round(time.perf_counter() - start_time, 2)}s, runs: {len(grouped_runs_df)}"
85+
)
86+
return grouped_runs_df
87+
88+
def get_current_source_parquet_files(self, source: str) -> list[str]:
89+
"""Get reverse chronological list of current parquet files for a source.
90+
91+
Args:
92+
source: The source identifier to filter runs
93+
"""
94+
runs_df = self.get_runs_metadata()
95+
source_runs_df = runs_df[runs_df.source == source].copy()
96+
97+
# get last "full" run
98+
full_runs_df = source_runs_df[source_runs_df.run_type == "full"]
99+
if len(full_runs_df) == 0:
100+
raise RuntimeError(
101+
f"Could not find the most recent 'full' run for source: '{source}'"
102+
)
103+
last_full_run = full_runs_df.iloc[0]
104+
105+
# get all "daily" runs since
106+
daily_runs_df = source_runs_df[
107+
(source_runs_df.run_type == "daily")
108+
& (source_runs_df.run_date >= last_full_run.run_date)
109+
]
110+
111+
ordered_parquet_files = []
112+
for _, daily_run in daily_runs_df.iterrows():
113+
ordered_parquet_files.extend(daily_run.parquet_files)
114+
ordered_parquet_files.extend(last_full_run.parquet_files)
115+
116+
return ordered_parquet_files
117+
118+
def _get_parquet_files_run_metadata(self, max_workers: int = 250) -> pd.DataFrame:
119+
"""Retrieve run metadata from parquet file(s) in dataset.
120+
121+
A single ETL run may still be spread across multiple Parquet files making this
122+
data ungrouped by run.
123+
124+
Args:
125+
max_workers: Maximum number of parallel workers for processing
126+
- a high number is generally safe given the lightweight nature of the
127+
thread's work, just reading a few parquet file header bytes
128+
"""
129+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
130+
futures = []
131+
for parquet_filepath in self.timdex_dataset.dataset.files: # type: ignore[attr-defined]
132+
future = executor.submit(
133+
self._parse_run_metadata_from_parquet_file,
134+
parquet_filepath,
135+
)
136+
futures.append(future)
137+
138+
done, not_done = concurrent.futures.wait(
139+
futures, return_when=concurrent.futures.ALL_COMPLETED
140+
)
141+
142+
results = []
143+
for future in done:
144+
try:
145+
if result := future.result():
146+
results.append(result)
147+
except Exception:
148+
logger.exception("Error reading run metadata from parquet file.")
149+
150+
return pd.DataFrame(results) if results else pd.DataFrame()
151+
152+
def _parse_run_metadata_from_parquet_file(self, parquet_filepath: str) -> dict:
153+
"""Parse source, run_date, run_type, and run_id from a single Parquet file.
154+
155+
The TIMDEX parquet dataset has a characteristic that we can use for extracting
156+
run information from a single row in a parquet file: all rows in the parquet file
157+
share the column values source, run_date, run_type, and run_id.
158+
159+
Taking this a step further, we can extract these values without even touching a
160+
single proper row from the parquet file, but from reading the parquet file
161+
column statistics. In this way, we can extract run information from a parquet
162+
file by only reading the lightweight parquet file metadata.
163+
164+
Args:
165+
parquet_filepath: Path to the parquet file
166+
"""
167+
parquet_file = pq.ParquetFile(
168+
parquet_filepath,
169+
filesystem=self.timdex_dataset.filesystem, # type: ignore[union-attr]
170+
)
171+
file_meta = parquet_file.metadata.to_dict()
172+
num_rows = file_meta["num_rows"]
173+
columns_meta = file_meta["row_groups"][0]["columns"] # type: ignore[typeddict-item]
174+
source = columns_meta[3]["statistics"]["max"]
175+
run_date = columns_meta[4]["statistics"]["max"]
176+
run_type = columns_meta[5]["statistics"]["max"]
177+
run_id = columns_meta[7]["statistics"]["max"]
178+
179+
return {
180+
"source": source,
181+
"run_date": run_date,
182+
"run_type": run_type,
183+
"run_id": run_id,
184+
"num_rows": num_rows,
185+
"filename": parquet_filepath,
186+
}

0 commit comments

Comments
 (0)