Skip to content

Commit 9298e7e

Browse files
authored
Merge pull request #151 from MITLibraries/TIMX-508-run-timestamp-data-migration
TIMX 508 - run timestamp data migration
2 parents f429b2c + a57156f commit 9298e7e

1 file changed

Lines changed: 228 additions & 0 deletions

File tree

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
# ruff: noqa: BLE001, D212, TRY300, TRY400
2+
"""
3+
Date: 2025-06-25
4+
5+
Description:
6+
7+
This migration will ensure that all rows, for all parquet files, for a given ETL run have
8+
the same run_timestamp.
9+
10+
When migration 001 was performed, it did not take into consideration that ETL runs with
11+
100k+ records -- e.g. full Alma or DSpace -- would span multiple parquet files. With
12+
multiple parquet files came multiple S3 object dates which is what was used to backfill
13+
that run_timestamp column.
14+
15+
This discovery led to TIMX-509, which ensures that a single run_timestamp can be applied
16+
to all rows / files for a given ETL run. Now that TIMX-509 is complete and deployed,
17+
ensuring all *future* rows are written correctly, this migration is needed to update
18+
a subset of run_timestamp values from migration 001.
19+
20+
The approach is fairly simple and uses the new TIMDEXDatasetMetadata class:
21+
22+
1. retrieve metadata for all records
23+
2. for a given ETL run (run_id), find the earliest run_timestamp
24+
3. apply that run_timestamp to all rows / files for that run_id
25+
26+
Usage:
27+
28+
PYTHONPATH=. \
29+
pipenv run python migrations/002_2025_06_25_consistent_run_timestamp_per_etl_run.py \
30+
<DATASET_LOCATION> \
31+
--dry-run
32+
"""
33+
34+
import argparse
35+
import json
36+
import time
37+
from datetime import datetime
38+
39+
import pandas as pd
40+
import pyarrow as pa
41+
import pyarrow.dataset as ds
42+
import pyarrow.parquet as pq
43+
44+
from timdex_dataset_api import TIMDEXDatasetMetadata
45+
from timdex_dataset_api.config import configure_dev_logger, configure_logger
46+
from timdex_dataset_api.dataset import TIMDEX_DATASET_SCHEMA, TIMDEXDataset
47+
48+
configure_dev_logger()
49+
50+
logger = configure_logger(__name__)
51+
52+
53+
def fix_backfilled_run_timestamps(location: str, *, dry_run: bool = False) -> None:
54+
"""Main entrypoint for backfill script."""
55+
start_time = time.perf_counter()
56+
td = TIMDEXDataset(location)
57+
td.load()
58+
59+
parquet_to_run_timestamp_df = prepare_run_timestamps_for_select_parquet_files(td)
60+
61+
success_count = 0
62+
skip_count = 0
63+
error_count = 0
64+
65+
for idx, row in parquet_to_run_timestamp_df.iterrows():
66+
67+
if row.status == "OK":
68+
continue
69+
70+
logger.info(
71+
f"Working on parquet file {int(idx) + 1}/{len(parquet_to_run_timestamp_df)}- " # type: ignore[call-overload]
72+
f"run_id: {row.run_id}, status: {row.status}, filename: {row.filename}"
73+
)
74+
75+
success, result = backfill_parquet_file(
76+
row.filename, td.dataset, row.earliest_timestamp, dry_run=dry_run
77+
)
78+
79+
if success:
80+
if result and "skipped" in result:
81+
skip_count += 1
82+
else:
83+
success_count += 1
84+
else:
85+
error_count += 1
86+
87+
logger.info(json.dumps(result))
88+
89+
logger.info(
90+
f"Backfill complete. Elapsed: {time.perf_counter()-start_time}, "
91+
f"Success: {success_count}, Skipped: {skip_count}, Errors: {error_count}"
92+
)
93+
94+
95+
def prepare_run_timestamps_for_select_parquet_files(
96+
timdex_dataset: TIMDEXDataset,
97+
) -> pd.DataFrame:
98+
"""Prepare a dataframe of parquet file to earliest + current timestamp for run_id.
99+
100+
Returns:
101+
pd.DataFrame
102+
103+
Example row:
104+
105+
mapping.loc[".../df65fb2d-a071-4d96-87cf-c1288a5e010f-1.parquet"]
106+
107+
ROW:
108+
run_id 2b9baa22-8eb8-41ff-8108-bd247ae884bd
109+
earliest_timestamp 2025-02-28 13:02:07-05:00
110+
current_timestamp 2025-02-28 13:33:46-05:00
111+
record_count 45025
112+
status UPDATE
113+
Name: .../df65fb2d-a071-4d96-87cf-c1288a5e010f-1.parquet
114+
115+
"""
116+
tdm = TIMDEXDatasetMetadata(timdex_dataset=timdex_dataset)
117+
118+
query = """
119+
with earliest_timestamps as (
120+
select
121+
run_id,
122+
min(run_timestamp) as earliest_timestamp
123+
from records
124+
group by run_id
125+
)
126+
select
127+
r.run_id,
128+
et.earliest_timestamp,
129+
r.run_timestamp as current_timestamp,
130+
regexp_replace(r.filename, '^s3://', '') as filename,
131+
count(*) as record_count,
132+
case
133+
when r.run_timestamp = et.earliest_timestamp then 'ok'
134+
else 'update'
135+
end as status
136+
from records r
137+
join earliest_timestamps et on r.run_id = et.run_id
138+
group by r.run_id, et.earliest_timestamp, r.run_timestamp, r.filename
139+
order by r.run_timestamp, r.run_id;
140+
"""
141+
return tdm.conn.query(query).to_df()
142+
143+
144+
def backfill_parquet_file(
145+
parquet_filepath: str,
146+
dataset: ds.Dataset,
147+
new_run_timestamp: datetime,
148+
*,
149+
dry_run: bool = False,
150+
) -> tuple[bool, dict]:
151+
"""Backfill a single parquet file with the correct run_timestamp value for ETL run.
152+
153+
Args:
154+
parquet_filepath: Path to the parquet file
155+
dataset: PyArrow dataset instance
156+
new_run_timestamp: datetime
157+
dry_run: If True, don't actually write changes
158+
159+
Returns:
160+
Tuple of (success: bool, result: dict)
161+
"""
162+
start_time = time.perf_counter()
163+
try:
164+
parquet_file = pq.ParquetFile(parquet_filepath, filesystem=dataset.filesystem) # type: ignore[attr-defined]
165+
166+
# read all rows from the parquet file into a pyarrow Table
167+
table = parquet_file.read()
168+
169+
# set new run_timestamp value
170+
num_rows = len(table)
171+
run_timestamp_field = TIMDEX_DATASET_SCHEMA.field("run_timestamp")
172+
new_run_timestamp_array = pa.array(
173+
[new_run_timestamp] * num_rows, type=run_timestamp_field.type
174+
)
175+
table_updated = table.set_column(
176+
table.schema.get_field_index("run_timestamp"),
177+
"run_timestamp",
178+
new_run_timestamp_array,
179+
)
180+
181+
# write the updated table back to the same file
182+
if not dry_run:
183+
pq.write_table(
184+
table_updated, # type: ignore[attr-defined]
185+
parquet_filepath,
186+
filesystem=dataset.filesystem, # type: ignore[attr-defined]
187+
)
188+
logger.info(f"Successfully updated file: {parquet_filepath}")
189+
else:
190+
logger.info(f"DRY RUN: Would update file: {parquet_filepath}")
191+
192+
update_details = {
193+
"file_path": parquet_filepath,
194+
"rows_updated": num_rows,
195+
"new_run_timestamp": new_run_timestamp.isoformat(),
196+
"elapsed": time.perf_counter() - start_time,
197+
"dry_run": dry_run,
198+
}
199+
200+
return True, update_details
201+
202+
except Exception as e:
203+
logger.error(f"Error processing parquet file {parquet_filepath}: {e}")
204+
return False, {
205+
"file_path": parquet_filepath,
206+
"error": str(e),
207+
"elapsed": time.perf_counter() - start_time,
208+
"dry_run": dry_run,
209+
}
210+
211+
212+
if __name__ == "__main__":
213+
parser = argparse.ArgumentParser(
214+
description=("Ensures each ETL run has a single run_timestamp value.")
215+
)
216+
parser.add_argument(
217+
"--dry-run",
218+
action="store_true",
219+
help="Scan files and report what would be done without making changes",
220+
)
221+
parser.add_argument(
222+
"dataset_location",
223+
help="Path to the dataset (local path or s3://bucket/path)",
224+
)
225+
226+
args = parser.parse_args()
227+
228+
fix_backfilled_run_timestamps(args.dataset_location, dry_run=args.dry_run)

0 commit comments

Comments
 (0)