Skip to content

Commit f9aaa38

Browse files
committed
Create 'records' and 'current_records' metadata views
Why these changes are being introduced: Much of the refactor work has been building to provide metadata views for all records and the current version of a given TIMDEX record, views we had previously but calculated on demand each time. How this addresses that need: When setting up the DuckDB context for TIMDEXDatasetMetadata, we create views that build from a) the static metadata database file and b) the append deltas, providing a projection over all metadata records. Two primary views are added: 'records': all records in the ETL parquet dataset 'current_records': filter to the most recent version of any timdex_record_id from 'records' These views will provide the metadata for future work that (re)implements filtering to current records during read. Side effects of this change: * Views are created on TIMDEXDatasetMetadata initialization Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-526
1 parent 269b489 commit f9aaa38

4 files changed

Lines changed: 343 additions & 7 deletions

File tree

tests/conftest.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ def local_dataset_location(tmp_path):
3333
def local_dataset(local_dataset_location):
3434
timdex_dataset = TIMDEXDataset(local_dataset_location)
3535
timdex_dataset.write(
36-
generate_sample_records_with_simulated_partitions(num_records=5_000)
36+
generate_sample_records_with_simulated_partitions(num_records=5_000),
37+
write_append_deltas=False,
3738
)
3839
timdex_dataset.load()
3940
return timdex_dataset
@@ -65,7 +66,8 @@ def fixed_local_dataset(tmp_path) -> TIMDEXDataset:
6566
source=source,
6667
run_date="2024-12-01",
6768
run_id=run_id,
68-
)
69+
),
70+
write_append_deltas=False,
6971
)
7072
timdex_dataset.load()
7173
return timdex_dataset
@@ -131,7 +133,7 @@ def dataset_with_runs_location(tmp_path) -> str:
131133
action=action,
132134
run_id=run_id,
133135
)
134-
timdex_dataset.write(records)
136+
timdex_dataset.write(records, write_append_deltas=False)
135137

136138
return location
137139

@@ -187,7 +189,7 @@ def dataset_with_same_day_runs(tmp_path) -> TIMDEXDataset:
187189
run_id=run_id,
188190
run_timestamp=run_timestamp,
189191
)
190-
timdex_dataset.write(records)
192+
timdex_dataset.write(records, write_append_deltas=False)
191193

192194
# reload after writes
193195
timdex_dataset.load()
@@ -218,3 +220,24 @@ def timdex_dataset_metadata(dataset_with_runs_location):
218220
tdm = TIMDEXDatasetMetadata(dataset_with_runs_location)
219221
tdm.recreate_static_database_file()
220222
return tdm
223+
224+
225+
@pytest.fixture
226+
def timdex_dataset_metadata_with_deltas(
227+
dataset_with_runs_location, timdex_dataset_metadata
228+
):
229+
td = TIMDEXDataset(dataset_with_runs_location)
230+
231+
# perform an ETL write of 50 records
232+
# results in 1 append delta, with 50 rows contained
233+
records = generate_sample_records(
234+
num_records=50,
235+
source="alma",
236+
run_date="2025-01-10",
237+
run_type="daily",
238+
action="index",
239+
run_id="run-delta-1",
240+
)
241+
td.write(records)
242+
243+
return TIMDEXDatasetMetadata(dataset_with_runs_location)

tests/test_metadata.py

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,193 @@ def test_dataset_metadata_structure_is_idempotent(timdex_dataset_metadata):
5858
assert os.path.exists(timdex_dataset_metadata.metadata_root)
5959
end_file_count = glob.glob(f"{timdex_dataset_metadata.metadata_root}/**/*")
6060
assert start_file_count == end_file_count
61+
62+
63+
def test_tdm_views_created_on_init(timdex_dataset_metadata):
64+
views = timdex_dataset_metadata.conn.query(
65+
"""select table_name from information_schema.tables where table_type = 'VIEW';"""
66+
).to_df()
67+
68+
expected_views = {"append_deltas", "records", "current_records"}
69+
actual_views = set(views.table_name)
70+
assert expected_views <= actual_views
71+
72+
73+
def test_tdm_records_view_structure(timdex_dataset_metadata):
74+
records_df = timdex_dataset_metadata.conn.query(
75+
"""select * from records limit 1;"""
76+
).to_df()
77+
expected_columns = {
78+
"timdex_record_id",
79+
"source",
80+
"run_date",
81+
"run_type",
82+
"action",
83+
"run_id",
84+
"run_record_offset",
85+
"run_timestamp",
86+
"filename",
87+
}
88+
assert set(records_df.columns) == expected_columns
89+
90+
91+
def test_tdm_current_records_view_structure(timdex_dataset_metadata):
92+
current_records_df = timdex_dataset_metadata.conn.query(
93+
"""select * from current_records limit 1;"""
94+
).to_df()
95+
expected_columns = {
96+
"timdex_record_id",
97+
"source",
98+
"run_date",
99+
"run_type",
100+
"action",
101+
"run_id",
102+
"run_record_offset",
103+
"run_timestamp",
104+
"filename",
105+
}
106+
assert set(current_records_df.columns) == expected_columns
107+
108+
109+
def test_tdm_append_deltas_view_empty_structure(timdex_dataset_metadata):
110+
append_deltas_df = timdex_dataset_metadata.conn.query(
111+
"""select * from append_deltas;"""
112+
).to_df()
113+
expected_columns = {
114+
"timdex_record_id",
115+
"source",
116+
"run_date",
117+
"run_type",
118+
"action",
119+
"run_id",
120+
"run_record_offset",
121+
"run_timestamp",
122+
"filename",
123+
}
124+
assert set(append_deltas_df.columns) == expected_columns
125+
assert len(append_deltas_df) == 0
126+
127+
128+
def test_tdm_records_count_property(timdex_dataset_metadata):
129+
assert timdex_dataset_metadata.records_count > 0
130+
131+
manual_count = timdex_dataset_metadata.conn.query(
132+
"""select count(*) from records;"""
133+
).fetchone()[0]
134+
assert timdex_dataset_metadata.records_count == manual_count
135+
136+
137+
def test_tdm_current_records_count_property(timdex_dataset_metadata):
138+
assert timdex_dataset_metadata.current_records_count > 0
139+
140+
manual_count = timdex_dataset_metadata.conn.query(
141+
"""select count(*) from current_records;"""
142+
).fetchone()[0]
143+
assert timdex_dataset_metadata.current_records_count == manual_count
144+
145+
146+
def test_tdm_append_deltas_count_property_empty(timdex_dataset_metadata):
147+
assert timdex_dataset_metadata.append_deltas_count == 0
148+
149+
150+
def test_tdm_records_equals_static_without_deltas(timdex_dataset_metadata):
151+
static_count = timdex_dataset_metadata.conn.query(
152+
"""select count(*) from static_db.records;"""
153+
).fetchone()[0]
154+
records_count = timdex_dataset_metadata.conn.query(
155+
"""select count(*) from records;"""
156+
).fetchone()[0]
157+
assert static_count == records_count
158+
159+
160+
def test_tdm_current_records_filtering_logic(timdex_dataset_metadata):
161+
current_count = timdex_dataset_metadata.current_records_count
162+
total_count = timdex_dataset_metadata.records_count
163+
164+
assert current_count <= total_count
165+
assert current_count > 0
166+
167+
168+
def test_tdm_views_with_append_deltas(timdex_dataset_metadata_with_deltas):
169+
views = timdex_dataset_metadata_with_deltas.conn.query(
170+
"""select table_name from information_schema.tables where table_type = 'VIEW';"""
171+
).to_df()
172+
173+
expected_views = {"append_deltas", "records", "current_records"}
174+
actual_views = set(views.table_name)
175+
assert expected_views.issubset(actual_views)
176+
177+
178+
def test_tdm_append_deltas_view_has_data(timdex_dataset_metadata_with_deltas):
179+
append_deltas_count = timdex_dataset_metadata_with_deltas.append_deltas_count
180+
assert append_deltas_count > 0
181+
182+
183+
def test_tdm_records_includes_deltas(timdex_dataset_metadata_with_deltas):
184+
static_count = timdex_dataset_metadata_with_deltas.conn.query(
185+
"""select count(*) from static_db.records;"""
186+
).fetchone()[0]
187+
deltas_count = timdex_dataset_metadata_with_deltas.append_deltas_count
188+
records_count = timdex_dataset_metadata_with_deltas.records_count
189+
190+
assert records_count == static_count + deltas_count
191+
assert records_count > static_count
192+
193+
194+
def test_tdm_current_records_with_deltas_logic(timdex_dataset_metadata_with_deltas):
195+
current_count = timdex_dataset_metadata_with_deltas.current_records_count
196+
total_count = timdex_dataset_metadata_with_deltas.records_count
197+
198+
assert current_count <= total_count
199+
assert current_count > 0
200+
201+
# verify current records view returns unique timdex_record_id values
202+
current_records_df = timdex_dataset_metadata_with_deltas.conn.query(
203+
"""select timdex_record_id from current_records;"""
204+
).to_df()
205+
206+
unique_count = len(current_records_df.timdex_record_id.unique())
207+
assert unique_count == current_count
208+
209+
210+
def test_tdm_current_records_most_recent_version(timdex_dataset_metadata_with_deltas):
211+
# check that for records with multiple versions, only the most recent is returned
212+
multi_version_records = timdex_dataset_metadata_with_deltas.conn.query(
213+
"""
214+
select timdex_record_id, count(*) as version_count
215+
from records
216+
group by timdex_record_id
217+
having count(*) > 1
218+
limit 1;
219+
"""
220+
).to_df()
221+
222+
if len(multi_version_records) > 0:
223+
record_id = multi_version_records.iloc[0]["timdex_record_id"]
224+
225+
# get most recent timestamp for this record
226+
most_recent = timdex_dataset_metadata_with_deltas.conn.query(
227+
f"""
228+
select run_timestamp, run_id
229+
from records
230+
where timdex_record_id = '{record_id}'
231+
order by run_timestamp desc
232+
limit 1;
233+
"""
234+
).to_df()
235+
236+
# verify current_records contains this version
237+
current_version = timdex_dataset_metadata_with_deltas.conn.query(
238+
f"""
239+
select run_timestamp, run_id
240+
from current_records
241+
where timdex_record_id = '{record_id}';
242+
"""
243+
).to_df()
244+
245+
assert len(current_version) == 1
246+
assert (
247+
current_version.iloc[0]["run_timestamp"]
248+
== most_recent.iloc[0]["run_timestamp"]
249+
)
250+
assert current_version.iloc[0]["run_id"] == most_recent.iloc[0]["run_id"]

timdex_dataset_api/dataset.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ def write(
431431
if write_append_deltas:
432432
for written_file in written_files:
433433
self.metadata.write_append_delta_duckdb(written_file.path) # type: ignore[attr-defined]
434+
self.metadata.refresh()
434435

435436
self.log_write_statistics(start_time, written_files)
436437

0 commit comments

Comments
 (0)