Skip to content

Commit 97df92d

Browse files
committed
Add migrations folder and run_timestamp migration
Why these changes are being introduced: We have a need to perform a onetime bulk edit of the parquet dataset, which is very similar to a SQL database migration: addition of a column and backfilling of data. While we may not require the strictness of SQL schema migrations, which often can be "replayed" in order to upgrade or reverse order to downgrade, we nonetheless would benefit from some structure and code-as-documentation for bulk operations we perform on the actual parquet dataset. How this addresses that need: * Creates a new root level migrations/ folder * Includes a README with a proposed simple structure for migrations (mostly naming conventions) * Includes our first migration to backfill all pre-existing parquet files with a new 'run_timestamp' column * The run_timestamp will be pulled from the creation date of the parquet file in S3, which is close enough for our purposes Side effects of this change: * TDA becomes the location for storing code related to "migrations" for the parquet dataset * All pre-existing parquet files will now have a run_timestamp column Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-496
1 parent 926d630 commit 97df92d

3 files changed

Lines changed: 261 additions & 0 deletions

File tree

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
# ruff: noqa: BLE001, D212, TRY300, TRY400
2+
"""
3+
Date: 2025-05-30
4+
5+
Description:
6+
7+
After the creation of a new run_timestamp column as part of Jira ticket TIMX-496, there
8+
was a need to backfill a run timestamp for all parquet files in the dataset.
9+
10+
This migration performs the following:
11+
1. retrieves all parquet file from the dataset
12+
2. for each parquet file:
13+
a. if the run_timestamp column already exists, skip
14+
b. retrieve the file creation date of the parquet file, this becomes the run_timestamp
15+
c. rewrite the parquet file with a new run_timestamp column
16+
17+
Usage:
18+
19+
pipenv run python migrations/001_2025_05_30_backfill_run_timestamp_column.py \
20+
<DATASET_LOCATION> \
21+
--dry-run
22+
"""
23+
24+
import argparse
25+
import json
26+
import time
27+
from datetime import UTC, datetime
28+
29+
import pyarrow as pa
30+
import pyarrow.dataset as ds
31+
import pyarrow.parquet as pq
32+
from pyarrow import fs
33+
34+
from timdex_dataset_api.config import configure_dev_logger, configure_logger
35+
from timdex_dataset_api.dataset import TIMDEX_DATASET_SCHEMA, TIMDEXDataset
36+
37+
configure_dev_logger()
38+
39+
logger = configure_logger(__name__)
40+
41+
42+
def backfill_dataset(location: str, *, dry_run: bool = False) -> None:
43+
"""Main entrypoint for backfill script.
44+
45+
Loop through all parquet files in the dataset and, if the run_timestamp column does
46+
not exist, create it using the S3 object creation date.
47+
"""
48+
start_time = time.perf_counter()
49+
td = TIMDEXDataset(location)
50+
td.load()
51+
52+
parquet_files = td.dataset.files # type: ignore[attr-defined]
53+
logger.info(f"Found {len(parquet_files)} parquet files in dataset.")
54+
55+
success_count = 0
56+
skip_count = 0
57+
error_count = 0
58+
59+
for i, parquet_file in enumerate(parquet_files):
60+
logger.info(
61+
f"Working on parquet file {i + 1}/{len(parquet_files)}: {parquet_file}"
62+
)
63+
64+
success, result = backfill_parquet_file(parquet_file, td.dataset, dry_run=dry_run)
65+
66+
if success:
67+
if result and "skipped" in result:
68+
skip_count += 1
69+
else:
70+
success_count += 1
71+
else:
72+
error_count += 1
73+
74+
logger.info(json.dumps(result))
75+
76+
logger.info(
77+
f"Backfill complete. Elapsed: {time.perf_counter()-start_time}, "
78+
f"Success: {success_count}, Skipped: {skip_count}, Errors: {error_count}"
79+
)
80+
81+
82+
def get_s3_object_creation_date(file_path: str, filesystem: fs.FileSystem) -> datetime:
83+
"""Get the creation date of an S3 object.
84+
85+
Args:
86+
file_path: Path to the S3 object
87+
filesystem: PyArrow S3 filesystem instance
88+
89+
Returns:
90+
datetime: Creation date of the S3 object in UTC
91+
"""
92+
try:
93+
# Get creation date of S3 object
94+
file_info = filesystem.get_file_info(file_path)
95+
creation_date: datetime = file_info.mtime # type: ignore[assignment]
96+
97+
# Ensure it's timezone-aware and in UTC
98+
if creation_date.tzinfo is None:
99+
creation_date = creation_date.replace(tzinfo=UTC)
100+
elif creation_date.tzinfo != UTC:
101+
creation_date = creation_date.astimezone(UTC)
102+
103+
return creation_date
104+
105+
except Exception as e:
106+
logger.error(f"Error getting S3 object creation date for {file_path}: {e}")
107+
raise
108+
109+
110+
def backfill_parquet_file(
111+
parquet_filepath: str,
112+
dataset: ds.Dataset,
113+
*,
114+
dry_run: bool = False,
115+
) -> tuple[bool, dict]:
116+
"""Backfill a single parquet file with run_timestamp column.
117+
118+
Args:
119+
parquet_filepath: Path to the parquet file
120+
dataset: PyArrow dataset instance
121+
dry_run: If True, don't actually write changes
122+
123+
Returns:
124+
Tuple of (success: bool, result: dict)
125+
"""
126+
start_time = time.perf_counter()
127+
try:
128+
parquet_file = pq.ParquetFile(parquet_filepath, filesystem=dataset.filesystem) # type: ignore[attr-defined]
129+
130+
# Check if run_timestamp column already exists
131+
if "run_timestamp" in parquet_file.schema.names:
132+
logger.info(
133+
f"Parquet already has 'run_timestamp', skipping: {parquet_filepath}"
134+
)
135+
return True, {"file_path": parquet_filepath, "skipped": True}
136+
137+
# Read all rows from the parquet file into a pyarrow Table
138+
# NOTE: memory intensive for very large parquet files, though suitable for onetime
139+
# migration work.
140+
table = parquet_file.read()
141+
142+
# Get S3 object creation date
143+
creation_date = get_s3_object_creation_date(parquet_filepath, dataset.filesystem) # type: ignore[attr-defined]
144+
145+
# Create run_timestamp column using the exact schema definition
146+
num_rows = len(table)
147+
run_timestamp_field = TIMDEX_DATASET_SCHEMA.field("run_timestamp")
148+
run_timestamp_array = pa.array(
149+
[creation_date] * num_rows, type=run_timestamp_field.type
150+
)
151+
152+
# Add the run_timestamp column to the table
153+
table_with_timestamp = table.append_column("run_timestamp", run_timestamp_array)
154+
155+
# Write the updated table back to the same file
156+
if not dry_run:
157+
pq.write_table(
158+
table_with_timestamp, # type: ignore[attr-defined]
159+
parquet_filepath,
160+
filesystem=dataset.filesystem, # type: ignore[attr-defined]
161+
)
162+
logger.info(f"Successfully updated file: {parquet_filepath}")
163+
else:
164+
logger.info(f"DRY RUN: Would update file: {parquet_filepath}")
165+
166+
update_details = {
167+
"file_path": parquet_filepath,
168+
"rows_updated": num_rows,
169+
"run_timestamp_added": creation_date.isoformat(),
170+
"elapsed": time.perf_counter() - start_time,
171+
"dry_run": dry_run,
172+
}
173+
174+
return True, update_details
175+
176+
except Exception as e:
177+
logger.error(f"Error processing parquet file {parquet_filepath}: {e}")
178+
return False, {
179+
"file_path": parquet_filepath,
180+
"error": str(e),
181+
"elapsed": time.perf_counter() - start_time,
182+
"dry_run": dry_run,
183+
}
184+
185+
186+
if __name__ == "__main__":
187+
parser = argparse.ArgumentParser(
188+
description=(
189+
"Backfill run_timestamp column in TIMDEX parquet files "
190+
"using S3 creation dates"
191+
)
192+
)
193+
parser.add_argument(
194+
"--dry-run",
195+
action="store_true",
196+
help="Scan files and report what would be done without making changes",
197+
)
198+
parser.add_argument(
199+
"dataset_location", help="Path to the dataset (local path or s3://bucket/path)"
200+
)
201+
202+
args = parser.parse_args()
203+
204+
backfill_dataset(args.dataset_location, dry_run=args.dry_run)

migrations/README.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# TIMDEX Dataset Migrations
2+
3+
This directory includes manual, bulk migrations of data and schema in the TIMDEX parquet dataset. Consider it like migrations for a SQL database, except a bit more unstructured and ad-hoc.
4+
5+
## Structure
6+
7+
Each migration is either a single python file, or a dedicated directory, with that follows the naming convention:
8+
9+
- `###_`: incrementing migration sequence number
10+
- `YYYY_MM_DD_`: approximate date of migration creation and run
11+
- `short_name.py` (file) or `short_name` (directory): short migration name
12+
13+
Examples:
14+
15+
- `001_2025_05_30_backfill_run_timestamp_column.py` --> single file
16+
- `002_2025_06_15_remove_errant_parquet_files` --> directory that contains 1+ files
17+
18+
The entrypoint for each migration should contain a docstring at the root of the file with a structure like:
19+
20+
```python
21+
"""
22+
Date: YYYY-MM-DD
23+
24+
Description:
25+
26+
Description here about the nature of the migration...
27+
28+
Usage:
29+
30+
Explanation here for how to run it...
31+
"""
32+
```
33+
34+
Example:
35+
```python
36+
"""
37+
Date: 2025-05-30
38+
39+
Description:
40+
41+
After the creation of a new run_timestamp column as part of Jira ticket TIMX-496, there
42+
was a need to backfill a run timestamp for all parquet files in the dataset.
43+
44+
This migration performs the following:
45+
1. retrieves all parquet file from the dataset
46+
2. for each parquet file:
47+
a. if the run_timestamp column already exists, skip
48+
b. retrieve the file creation date of the parquet file, this becomes the run_timestamp
49+
c. rewrite the parquet file with a new run_timestamp column
50+
51+
Usage:
52+
PYTHONPATH=. \
53+
pipenv run python migrations/001_2025_05_30_backfill_run_timestamp_column.py \
54+
<DATASET_LOCATION> \
55+
--dry-run
56+
"""
57+
```

migrations/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)