Skip to content

Commit ed1cf59

Browse files
authored
Merge pull request #153 from MITLibraries/TIMX-512-row-group-sizes
TIMX 512 - row group size data migration
2 parents c143f55 + d5d2bdb commit ed1cf59

1 file changed

Lines changed: 131 additions & 0 deletions

File tree

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
# ruff: noqa: BLE001, D212, RUF002, RUF001, TRY400
2+
3+
"""
4+
Date: 2025-07-03
5+
6+
Description:
7+
8+
Re‑pack any Parquet file in a TIMDEX dataset that still has **one** oversized row‑group
9+
so that no group exceeds the configured `TDA_MAX_ROWS_PER_GROUP` (defaults to 1k).
10+
11+
Usage
12+
-----
13+
PYTHONPATH=. \
14+
pipenv run python migrations/003_2025_07_03_fix_row_group_sizes.py \
15+
<DATASET_LOCATION> \
16+
--dry-run
17+
"""
18+
19+
import argparse
20+
import logging
21+
import time
22+
from math import ceil
23+
24+
import pyarrow.dataset as ds
25+
import pyarrow.parquet as pq
26+
from pyarrow import fs
27+
28+
from timdex_dataset_api.config import configure_dev_logger
29+
from timdex_dataset_api.dataset import (
30+
TIMDEXDataset,
31+
TIMDEXDatasetConfig,
32+
)
33+
34+
configure_dev_logger()
35+
36+
DEFAULT_ROWS_PER_GROUP: int = TIMDEXDatasetConfig().max_rows_per_group
37+
DEFAULT_COMPRESSION: str | None = "snappy"
38+
39+
log = logging.getLogger(__name__)
40+
logging.basicConfig(level=logging.INFO, format="%(message)s")
41+
42+
43+
def needs_rewrite(pf: pq.ParquetFile, rows_per_group: int) -> bool:
44+
return pf.metadata.num_row_groups == 1 and pf.metadata.num_rows > rows_per_group
45+
46+
47+
def rewrite_file(
48+
path: str,
49+
filesystem: fs.FileSystem,
50+
*,
51+
dry_run: bool = False,
52+
) -> None:
53+
pf = pq.ParquetFile(path, filesystem=filesystem)
54+
55+
codec = pf.metadata.row_group(0).column(0).compression
56+
compression = None if codec.upper() == "UNCOMPRESSED" else codec.lower()
57+
58+
if dry_run:
59+
log.info(
60+
f"[DRY‑RUN] {path} — would split {pf.metadata.num_rows:,} rows "
61+
f"into {ceil(pf.metadata.num_rows / DEFAULT_ROWS_PER_GROUP):,} row‑groups"
62+
)
63+
return
64+
65+
table = pf.read()
66+
67+
pq.write_table(
68+
table,
69+
path,
70+
filesystem=filesystem,
71+
row_group_size=DEFAULT_ROWS_PER_GROUP,
72+
compression=compression, # type: ignore[arg-type]
73+
)
74+
75+
log.info(
76+
f"[OK] {path}{pf.metadata.num_rows:,} rows now split into "
77+
f"{ceil(pf.metadata.num_rows / DEFAULT_ROWS_PER_GROUP):,} row‑groups"
78+
)
79+
80+
81+
def fix_row_groups(
82+
location: str,
83+
*,
84+
dry_run: bool = False,
85+
) -> None:
86+
start = time.perf_counter()
87+
88+
filesystem, dataset_root = TIMDEXDataset.parse_location(location)
89+
if isinstance(dataset_root, list):
90+
raise TypeError("Expected a single dataset root path, not a list.")
91+
92+
dataset = ds.dataset(dataset_root, format="parquet", filesystem=filesystem)
93+
94+
rewritten = skipped = errors = 0
95+
for file_path in dataset.files:
96+
try:
97+
pf = pq.ParquetFile(file_path, filesystem=filesystem)
98+
if needs_rewrite(pf, DEFAULT_ROWS_PER_GROUP):
99+
rewrite_file(
100+
file_path,
101+
filesystem,
102+
dry_run=dry_run,
103+
)
104+
rewritten += 1
105+
else:
106+
skipped += 1
107+
except Exception as exc:
108+
log.error(f"[ERR] {file_path}: {exc}")
109+
errors += 1
110+
111+
elapsed = time.perf_counter() - start
112+
log.info(
113+
f"Done in {elapsed:.1f}s — rewritten: {rewritten}, "
114+
f"skipped: {skipped}, errors: {errors}"
115+
)
116+
117+
118+
def _parse_args() -> tuple[str, bool]:
119+
p = argparse.ArgumentParser(
120+
description="Re‑pack Parquet files so no row‑group exceeds the configured size."
121+
)
122+
p.add_argument("dataset_location")
123+
p.add_argument("--dry-run", action="store_true")
124+
125+
a = p.parse_args()
126+
return a.dataset_location, a.dry_run
127+
128+
129+
if __name__ == "__main__":
130+
location, dry_run = _parse_args()
131+
fix_row_groups(location, dry_run=dry_run)

0 commit comments

Comments
 (0)