Skip to content

Commit bd3b937

Browse files
committed
Reorder methods and expand docstrings
1 parent f584a1d commit bd3b937

3 files changed

Lines changed: 83 additions & 66 deletions

File tree

tests/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def dataset_with_runs_location(tmp_path) -> str:
120120
]
121121
)
122122

123-
# simulate ETL runs for 'alma'
123+
# simulate ETL runs for 'dspace'
124124
run_params.extend(
125125
[
126126
(30, "dspace", "2024-12-02", "full", "index", "run-8"),

tests/test_runs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def test_timdex_run_manager_parse_single_parquet_file_success(timdex_run_manager
2525
"""Parse run metadata from first parquet file in fixture dataset. We know the details
2626
of this ETL run in advance given the deterministic fixture that generated it."""
2727
parquet_filepath = timdex_run_manager.timdex_dataset.dataset.files[0]
28-
run_metadata = timdex_run_manager.parse_run_metadata_from_parquet_file(
28+
run_metadata = timdex_run_manager._parse_run_metadata_from_parquet_file(
2929
parquet_filepath
3030
)
3131
assert run_metadata["source"] == "alma"
@@ -37,7 +37,7 @@ def test_timdex_run_manager_parse_single_parquet_file_success(timdex_run_manager
3737

3838

3939
def test_timdex_run_manager_parse_multiple_parquet_files(timdex_run_manager):
40-
parquet_metadata_df = timdex_run_manager.get_parquet_files_run_metadata()
40+
parquet_metadata_df = timdex_run_manager._get_parquet_files_run_metadata()
4141

4242
# assert 16 rows for this per-file dataframe, despite only 14 distinct ETL "runs"
4343
assert len(parquet_metadata_df) == 16
@@ -75,7 +75,7 @@ def test_timdex_run_manager_caches_runs_dataframe(timdex_run_manager):
7575
assert timdex_run_manager._runs_metadata_cache is not None
7676

7777
with patch.object(
78-
timdex_run_manager, "get_parquet_files_run_metadata"
78+
timdex_run_manager, "_get_parquet_files_run_metadata"
7979
) as mocked_intermediate_method:
8080
mocked_intermediate_method.side_effect = Exception(
8181
"I am not reached, cache is used."

timdex_dataset_api/run.py

Lines changed: 79 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -27,70 +27,17 @@ def __init__(self, timdex_dataset: "TIMDEXDataset"):
2727
def clear_cache(self) -> None:
2828
self._runs_metadata_cache = None
2929

30-
def parse_run_metadata_from_parquet_file(self, parquet_filepath: str) -> dict:
31-
"""Parse source, run_date, run_type, and run_id from a single Parquet file.
32-
33-
Args:
34-
parquet_filepath: Path to the parquet file
35-
"""
36-
parquet_file = pq.ParquetFile(
37-
parquet_filepath,
38-
filesystem=self.timdex_dataset.filesystem, # type: ignore[union-attr]
39-
)
40-
file_meta = parquet_file.metadata.to_dict()
41-
num_rows = file_meta["num_rows"]
42-
columns_meta = file_meta["row_groups"][0]["columns"] # type: ignore[typeddict-item]
43-
source = columns_meta[3]["statistics"]["max"]
44-
run_date = columns_meta[4]["statistics"]["max"]
45-
run_type = columns_meta[5]["statistics"]["max"]
46-
run_id = columns_meta[7]["statistics"]["max"]
47-
48-
return {
49-
"source": source,
50-
"run_date": run_date,
51-
"run_type": run_type,
52-
"run_id": run_id,
53-
"num_rows": num_rows,
54-
"filename": parquet_filepath,
55-
}
56-
57-
def get_parquet_files_run_metadata(self, max_workers: int = 250) -> pd.DataFrame:
58-
"""Retrieve run metadata from parquet file(s) in dataset.
59-
60-
A single ETL run may still be spread across multiple Parquet files making this
61-
data ungrouped by run.
62-
63-
Args:
64-
max_workers: Maximum number of parallel workers for processing
65-
- a high number is generally safe given the lightweight nature of the
66-
thread's work, just reading a few parquet file header bytes
67-
"""
68-
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
69-
futures = []
70-
for parquet_filepath in self.timdex_dataset.dataset.files: # type: ignore[attr-defined]
71-
future = executor.submit(
72-
self.parse_run_metadata_from_parquet_file,
73-
parquet_filepath,
74-
)
75-
futures.append(future)
76-
77-
done, not_done = concurrent.futures.wait(
78-
futures, return_when=concurrent.futures.ALL_COMPLETED
79-
)
80-
81-
results = []
82-
for future in done:
83-
try:
84-
if result := future.result():
85-
results.append(result)
86-
except Exception:
87-
logger.exception("Error reading run metadata from parquet file.")
88-
89-
return pd.DataFrame(results) if results else pd.DataFrame()
90-
9130
def get_runs_metadata(self, *, refresh: bool = False) -> pd.DataFrame:
9231
"""Get metadata for all runs in dataset, grouped by run_id.
9332
33+
The dataframe returned includes the following columns:
34+
- source
35+
- run_date
36+
- run_type
37+
- run_id
38+
- num_rows: total number of records for that run_id
39+
- parquet_files: list of parquet file(s) that are associated with that run
40+
9441
Args:
9542
refresh: If True, force refresh of cached metadata
9643
"""
@@ -99,7 +46,7 @@ def get_runs_metadata(self, *, refresh: bool = False) -> pd.DataFrame:
9946
if self._runs_metadata_cache is not None and not refresh:
10047
return self._runs_metadata_cache
10148

102-
ungrouped_runs_df = self.get_parquet_files_run_metadata()
49+
ungrouped_runs_df = self._get_parquet_files_run_metadata()
10350
if ungrouped_runs_df.empty:
10451
return ungrouped_runs_df
10552

@@ -167,3 +114,73 @@ def get_current_source_parquet_files(self, source: str) -> list[str]:
167114
ordered_parquet_files.extend(last_full_run.parquet_files)
168115

169116
return ordered_parquet_files
117+
118+
def _get_parquet_files_run_metadata(self, max_workers: int = 250) -> pd.DataFrame:
119+
"""Retrieve run metadata from parquet file(s) in dataset.
120+
121+
A single ETL run may still be spread across multiple Parquet files making this
122+
data ungrouped by run.
123+
124+
Args:
125+
max_workers: Maximum number of parallel workers for processing
126+
- a high number is generally safe given the lightweight nature of the
127+
thread's work, just reading a few parquet file header bytes
128+
"""
129+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
130+
futures = []
131+
for parquet_filepath in self.timdex_dataset.dataset.files: # type: ignore[attr-defined]
132+
future = executor.submit(
133+
self._parse_run_metadata_from_parquet_file,
134+
parquet_filepath,
135+
)
136+
futures.append(future)
137+
138+
done, not_done = concurrent.futures.wait(
139+
futures, return_when=concurrent.futures.ALL_COMPLETED
140+
)
141+
142+
results = []
143+
for future in done:
144+
try:
145+
if result := future.result():
146+
results.append(result)
147+
except Exception:
148+
logger.exception("Error reading run metadata from parquet file.")
149+
150+
return pd.DataFrame(results) if results else pd.DataFrame()
151+
152+
def _parse_run_metadata_from_parquet_file(self, parquet_filepath: str) -> dict:
153+
"""Parse source, run_date, run_type, and run_id from a single Parquet file.
154+
155+
The TIMDEX parquet dataset has a characteristic that we can use for extracting
156+
run information from a single row in a parquet file: all rows in the parquet file
157+
share the column values source, run_date, run_type, and run_id.
158+
159+
Taking this a step further, we can extract these values without even touching a
160+
single proper row from the parquet file, but from reading the parquet file
161+
column statistics. In this way, we can extract run information from a parquet
162+
file by only reading the lightweight parquet file metadata.
163+
164+
Args:
165+
parquet_filepath: Path to the parquet file
166+
"""
167+
parquet_file = pq.ParquetFile(
168+
parquet_filepath,
169+
filesystem=self.timdex_dataset.filesystem, # type: ignore[union-attr]
170+
)
171+
file_meta = parquet_file.metadata.to_dict()
172+
num_rows = file_meta["num_rows"]
173+
columns_meta = file_meta["row_groups"][0]["columns"] # type: ignore[typeddict-item]
174+
source = columns_meta[3]["statistics"]["max"]
175+
run_date = columns_meta[4]["statistics"]["max"]
176+
run_type = columns_meta[5]["statistics"]["max"]
177+
run_id = columns_meta[7]["statistics"]["max"]
178+
179+
return {
180+
"source": source,
181+
"run_date": run_date,
182+
"run_type": run_type,
183+
"run_id": run_id,
184+
"num_rows": num_rows,
185+
"filename": parquet_filepath,
186+
}

0 commit comments

Comments
 (0)