Skip to content

Commit 472726c

Browse files
committed
Setup DuckDB context on TIMDEXDataset
Why these changes are being introduced: TIMDEXDatasetMetadata (TDM) has a DuckDB context for metadata attachments and views. TIMDEXDataset (TD) will need one for DuckDB queries that return actual ETL data, not just metadata. How this addresses that need: * TD reuses the TDM.conn DuckDB connection and builds upon it * TDM DuckDB connection builds metadata related views in a "metadata" schema * TD will build views in a "data" schema Side effects of this change: * All TDM metadata views are under a "metadata" schema Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-529
1 parent e63d29d commit 472726c

4 files changed

Lines changed: 99 additions & 21 deletions

File tree

tests/test_dataset.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import pyarrow as pa
1010
import pytest
11+
from duckdb.duckdb import DuckDBPyConnection
1112
from pyarrow import fs
1213

1314
from timdex_dataset_api.dataset import (
@@ -291,3 +292,21 @@ def test_dataset_records_data_structure_is_idempotent(timdex_dataset_with_runs):
291292
assert os.path.exists(timdex_dataset_with_runs.data_records_root)
292293
end_file_count = glob.glob(f"{timdex_dataset_with_runs.data_records_root}/**/*")
293294
assert start_file_count == end_file_count
295+
296+
297+
def test_dataset_duckdb_context_crated_on_init(timdex_dataset):
298+
assert isinstance(timdex_dataset.conn, DuckDBPyConnection)
299+
300+
301+
def test_dataset_duckdb_context_creates_data_schema(timdex_dataset):
302+
assert (
303+
timdex_dataset.conn.query(
304+
"""
305+
select count(*)
306+
from information_schema.schemata
307+
where catalog_name = 'memory'
308+
and schema_name = 'data';
309+
"""
310+
).fetchone()[0]
311+
== 1
312+
)

tests/test_metadata.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@ def test_tdm_init_metadata_file_found_success(timdex_metadata):
3636
assert isinstance(timdex_metadata.conn, DuckDBPyConnection)
3737

3838

39+
def test_tdm_duckdb_context_creates_metadata_schema(timdex_metadata):
40+
assert (
41+
timdex_metadata.conn.query(
42+
"""
43+
select count(*)
44+
from information_schema.schemata
45+
where catalog_name = 'memory'
46+
and schema_name = 'metadata';
47+
"""
48+
).fetchone()[0]
49+
== 1
50+
)
51+
52+
3953
def test_tdm_connection_has_static_database_attached(timdex_metadata):
4054
assert set(
4155
timdex_metadata.conn.query("""show databases;""").to_df().database_name
@@ -71,7 +85,9 @@ def test_tdm_views_created_on_init(timdex_metadata):
7185

7286

7387
def test_tdm_records_view_structure(timdex_metadata):
74-
records_df = timdex_metadata.conn.query("""select * from records limit 1;""").to_df()
88+
records_df = timdex_metadata.conn.query(
89+
"""select * from metadata.records limit 1;"""
90+
).to_df()
7591
expected_columns = {
7692
"timdex_record_id",
7793
"source",
@@ -88,7 +104,7 @@ def test_tdm_records_view_structure(timdex_metadata):
88104

89105
def test_tdm_current_records_view_structure(timdex_metadata):
90106
current_records_df = timdex_metadata.conn.query(
91-
"""select * from current_records limit 1;"""
107+
"""select * from metadata.current_records limit 1;"""
92108
).to_df()
93109
expected_columns = {
94110
"timdex_record_id",
@@ -106,7 +122,7 @@ def test_tdm_current_records_view_structure(timdex_metadata):
106122

107123
def test_tdm_append_deltas_view_empty_structure(timdex_metadata):
108124
append_deltas_df = timdex_metadata.conn.query(
109-
"""select * from append_deltas;"""
125+
"""select * from metadata.append_deltas;"""
110126
).to_df()
111127
expected_columns = {
112128
"timdex_record_id",
@@ -127,7 +143,7 @@ def test_tdm_records_count_property(timdex_metadata):
127143
assert timdex_metadata.records_count > 0
128144

129145
manual_count = timdex_metadata.conn.query(
130-
"""select count(*) from records;"""
146+
"""select count(*) from metadata.records;"""
131147
).fetchone()[0]
132148
assert timdex_metadata.records_count == manual_count
133149

@@ -136,7 +152,7 @@ def test_tdm_current_records_count_property(timdex_metadata):
136152
assert timdex_metadata.current_records_count > 0
137153

138154
manual_count = timdex_metadata.conn.query(
139-
"""select count(*) from current_records;"""
155+
"""select count(*) from metadata.current_records;"""
140156
).fetchone()[0]
141157
assert timdex_metadata.current_records_count == manual_count
142158

@@ -150,7 +166,7 @@ def test_tdm_records_equals_static_without_deltas(timdex_metadata):
150166
"""select count(*) from static_db.records;"""
151167
).fetchone()[0]
152168
records_count = timdex_metadata.conn.query(
153-
"""select count(*) from records;"""
169+
"""select count(*) from metadata.records;"""
154170
).fetchone()[0]
155171
assert static_count == records_count
156172

@@ -198,7 +214,7 @@ def test_tdm_current_records_with_deltas_logic(timdex_metadata_with_deltas):
198214

199215
# verify current records view returns unique timdex_record_id values
200216
current_records_df = timdex_metadata_with_deltas.conn.query(
201-
"""select timdex_record_id from current_records;"""
217+
"""select timdex_record_id from metadata.current_records;"""
202218
).to_df()
203219

204220
unique_count = len(current_records_df.timdex_record_id.unique())
@@ -210,7 +226,7 @@ def test_tdm_current_records_most_recent_version(timdex_metadata_with_deltas):
210226
multi_version_records = timdex_metadata_with_deltas.conn.query(
211227
"""
212228
select timdex_record_id, count(*) as version_count
213-
from records
229+
from metadata.records
214230
group by timdex_record_id
215231
having count(*) > 1
216232
limit 1;
@@ -224,7 +240,7 @@ def test_tdm_current_records_most_recent_version(timdex_metadata_with_deltas):
224240
most_recent = timdex_metadata_with_deltas.conn.query(
225241
f"""
226242
select run_timestamp, run_id
227-
from records
243+
from metadata.records
228244
where timdex_record_id = '{record_id}'
229245
order by run_timestamp desc
230246
limit 1;
@@ -235,7 +251,7 @@ def test_tdm_current_records_most_recent_version(timdex_metadata_with_deltas):
235251
current_version = timdex_metadata_with_deltas.conn.query(
236252
f"""
237253
select run_timestamp, run_id
238-
from current_records
254+
from metadata.current_records
239255
where timdex_record_id = '{record_id}';
240256
"""
241257
).to_df()

timdex_dataset_api/dataset.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import pandas as pd
1919
import pyarrow as pa
2020
import pyarrow.dataset as ds
21+
from duckdb import DuckDBPyConnection
2122
from pyarrow import fs
2223

2324
from timdex_dataset_api.config import configure_logger
@@ -128,6 +129,9 @@ def __init__(
128129
# dataset metadata
129130
self.metadata = TIMDEXDatasetMetadata(location)
130131

132+
# DuckDB context
133+
self.conn = self.setup_duckdb_context()
134+
131135
@property
132136
def location_scheme(self) -> Literal["file", "s3"]:
133137
scheme = urlparse(self.location).scheme
@@ -221,6 +225,24 @@ def get_s3_filesystem() -> fs.FileSystem:
221225
session_token=credentials.token,
222226
)
223227

228+
def setup_duckdb_context(self) -> DuckDBPyConnection:
229+
"""Create a DuckDB connection that metadata and data query and retrieval.
230+
231+
This relies on TIMDEXDatasetMetadata.setup_duckdb_context() to produce a DuckDB
232+
connection that has all metadata already created.
233+
"""
234+
start_time = time.perf_counter()
235+
236+
conn = self.metadata.conn
237+
238+
# create data schema
239+
conn.execute("""create schema data;""")
240+
241+
logger.debug(
242+
f"DuckDB data context created, {round(time.perf_counter()-start_time,2)}s"
243+
)
244+
return conn
245+
224246
def write(
225247
self,
226248
records_iter: Iterator["DatasetRecord"],

timdex_dataset_api/metadata.py

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,17 +91,35 @@ def append_deltas_path(self) -> str:
9191
@property
9292
def records_count(self) -> int:
9393
"""Count of all records in dataset."""
94-
return self.conn.query("""select count(*) from records;""").fetchone()[0] # type: ignore[index]
94+
return self.conn.query(
95+
"""
96+
select count(*) from metadata.records;
97+
"""
98+
).fetchone()[
99+
0
100+
] # type: ignore[index]
95101

96102
@property
97103
def current_records_count(self) -> int:
98104
"""Count of all current records in dataset."""
99-
return self.conn.query("""select count(*) from current_records;""").fetchone()[0] # type: ignore[index]
105+
return self.conn.query(
106+
"""
107+
select count(*) from metadata.current_records;
108+
"""
109+
).fetchone()[
110+
0
111+
] # type: ignore[index]
100112

101113
@property
102114
def append_deltas_count(self) -> int:
103115
"""Count of all append deltas."""
104-
return self.conn.query("""select count(*) from append_deltas;""").fetchone()[0] # type: ignore[index]
116+
return self.conn.query(
117+
"""
118+
select count(*) from metadata.append_deltas;
119+
"""
120+
).fetchone()[
121+
0
122+
] # type: ignore[index]
105123

106124
def create_metadata_structure(self) -> None:
107125
"""Ensure metadata structure exists in TIDMEX dataset.."""
@@ -280,13 +298,16 @@ def setup_duckdb_context(self) -> DuckDBPyConnection:
280298
)
281299
return conn
282300

301+
# create metadata schema
302+
conn.execute("create schema metadata;")
303+
283304
self._attach_database_file(conn)
284305
self._create_append_deltas_view(conn)
285306
self._create_records_union_view(conn)
286307
self._create_current_records_view(conn)
287308

288309
logger.debug(
289-
f"DuckDB context created, {round(time.perf_counter()-start_time,2)}s"
310+
f"DuckDB metadata context created, {round(time.perf_counter()-start_time,2)}s"
290311
)
291312
return conn
292313

@@ -327,7 +348,7 @@ def _create_append_deltas_view(self, conn: DuckDBPyConnection) -> None:
327348
# if deltas, create view projecting over those parquet files
328349
if append_delta_count > 0:
329350
query = f"""
330-
create view append_deltas as (
351+
create or replace view metadata.append_deltas as (
331352
select *
332353
from read_parquet(
333354
'{self.append_deltas_path}/*.parquet'
@@ -338,7 +359,7 @@ def _create_append_deltas_view(self, conn: DuckDBPyConnection) -> None:
338359
# if not, create a view that mirrors the structure of static_db.records
339360
else:
340361
query = """
341-
create view append_deltas as (
362+
create or replace view metadata.append_deltas as (
342363
select *
343364
from static_db.records
344365
where 1 = 0
@@ -350,13 +371,13 @@ def _create_records_union_view(self, conn: DuckDBPyConnection) -> None:
350371
logger.debug("creating view of unioned records")
351372
conn.execute(
352373
"""
353-
create view records as
374+
create or replace view metadata.records as
354375
(
355376
select *
356377
from static_db.records
357378
union all
358379
select *
359-
from append_deltas
380+
from metadata.append_deltas
360381
);
361382
"""
362383
)
@@ -374,18 +395,18 @@ def _create_current_records_view(self, conn: DuckDBPyConnection) -> None:
374395
logger.info("creating view of current records metadata")
375396

376397
query = f"""
377-
create or replace view current_records as
398+
create or replace view metadata.current_records as
378399
with ranked_records as (
379400
select
380401
r.*,
381402
row_number() over (
382403
partition by r.timdex_record_id
383404
order by r.run_timestamp desc
384405
) as rn
385-
from records r
406+
from metadata.records r
386407
where r.run_timestamp >= (
387408
select max(r2.run_timestamp)
388-
from records r2
409+
from metadata.records r2
389410
where r2.source = r.source
390411
and r2.run_type = 'full'
391412
)

0 commit comments

Comments
 (0)