Skip to content

Commit 25d4430

Browse files
committed
current_records view relies on temp table
Why these changes are being introduced: The current_records view has always been a performance and resource bottleneck. Moving to metadata and SQL has helped, but there was a little kink left in relation to using that metadata for data retreival. We often would materialize a query to a pandas dataframe for to use to drive data retrieval. In that moment, we do not benefit from having current_records be a view, when we're going to materialize the data anyhow. How this addresses that need: Utilizing a DuckDB temp table, we take a small performance hit on TIMDEXDatasetMetadata load, but then have near instant current_records queries thereafter. Additionally, we remove ordering in the metadata query for data retrieval and perform this in-memory with the pandas dataframe. Often this may be quite small, but even if large, it's more efficient here and already in python memory. This will also set the stage for performing just-in-time metadata queries as chunks before data retrieval, versus pulling all metadata rows in one query and then chunking that in memory. Side effects of this change: * Quicker metadata queries, small performance hit on load. Appears similarly memory intensive. Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-543
1 parent b7c3350 commit 25d4430

3 files changed

Lines changed: 66 additions & 28 deletions

File tree

tests/test_read.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ def test_read_batches_where_and_dataset_filters_are_combined(timdex_dataset_mult
125125
[
126126
"SELECT * FROM current_records WHERE source = 'libguides'",
127127
"FROM records WHERE source = 'libguides'",
128-
"source = 'libguides';",
129-
" run_date = '2024-12-01'; ",
128+
"ORDER BY timdex_record_id",
129+
"LIMIT 3",
130130
],
131131
)
132132
def test_read_batches_where_rejects_non_predicate_sql(

timdex_dataset_api/dataset.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,10 +375,13 @@ def read_batches_iter(
375375
key/value DatasetFilters
376376
- filters: simple filtering based on key/value pairs from DatasetFilters
377377
"""
378+
start_time = time.perf_counter()
379+
378380
# build and execute metadata query
379381
metadata_time = time.perf_counter()
380382
meta_query = self.metadata.build_meta_query(table, where, **filters)
381383
meta_df = self.metadata.conn.query(meta_query).to_df()
384+
meta_df = meta_df.sort_values(by=["filename", "run_record_offset"])
382385
logger.debug(
383386
f"Metadata query identified {len(meta_df)} rows, "
384387
f"across {len(meta_df.filename.unique())} parquet files, "
@@ -410,6 +413,10 @@ def read_batches_iter(
410413
f"@ {batch_rps} records/second, total yielded: {total_yield_count}"
411414
)
412415

416+
logger.debug(
417+
f"read_batches_iter() elapsed: {round(time.perf_counter()-start_time, 2)}s"
418+
)
419+
413420
def _iter_meta_chunks(self, meta_df: pd.DataFrame) -> Iterator[pd.DataFrame]:
414421
"""Utility method to yield chunks of metadata query results."""
415422
for start in range(0, len(meta_df), self.config.duckdb_join_batch_size):

timdex_dataset_api/metadata.py

Lines changed: 57 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ def setup_duckdb_context(self) -> DuckDBPyConnection:
334334
start_time = time.perf_counter()
335335

336336
conn = duckdb.connect()
337+
conn.execute("""SET enable_progress_bar = false;""")
337338
self.configure_duckdb_connection(conn)
338339

339340
if not self.database_exists():
@@ -436,36 +437,67 @@ def _create_current_records_view(self, conn: DuckDBPyConnection) -> None:
436437
437438
This view builds on the table `records`.
438439
439-
This view includes only the most current version of each record in the dataset.
440-
Because it includes the `timdex_record_id` and `run_id`, it makes yielding the
441-
current version of a record via a TIMDEXDataset instance trivial: for any given
442-
`timdex_record_id` if the `run_id` doesn't match, it's not the current version.
440+
This metadata view includes only the most current version of each record in the
441+
dataset. With the metadata provided from this view, we can streamline data
442+
retrievals in TIMDEXDataset read methods.
443443
"""
444444
logger.info("creating view of current records metadata")
445445

446-
query = f"""
447-
create or replace view metadata.current_records as
448-
with ranked_records as (
446+
conn.execute(
447+
"""
448+
set temp_directory = '/tmp';
449+
"""
450+
)
451+
452+
conn.execute(
453+
"""
454+
-- create temp table with current records using CTEs
455+
create or replace temp table temp.main.current_records as
456+
with
457+
-- CTE of run_timestamp for last source full run
458+
cr_source_last_full as (
459+
select
460+
source,
461+
max(run_timestamp) as last_full_ts
462+
from metadata.records
463+
where run_type = 'full'
464+
group by source
465+
),
466+
467+
-- CTE of all records, per source, on or after last full run
468+
cr_since_last_full as (
469+
select
470+
r.*
471+
from metadata.records r
472+
join cr_source_last_full f using (source)
473+
where r.run_timestamp >= f.last_full_ts
474+
),
475+
476+
-- CTE of records ranked by run_timestamp, with tie breaker
477+
cr_ranked_records as (
478+
select
479+
r.*,
480+
row_number() over (
481+
partition by r.source, r.timdex_record_id
482+
order by
483+
r.run_timestamp desc nulls last,
484+
r.run_id desc nulls last,
485+
r.run_record_offset desc nulls last
486+
) as rn
487+
from cr_since_last_full r
488+
)
489+
490+
-- final select for current records (rn = 1)
449491
select
450-
r.*,
451-
row_number() over (
452-
partition by r.timdex_record_id
453-
order by r.run_timestamp desc
454-
) as rn
455-
from metadata.records r
456-
where r.run_timestamp >= (
457-
select max(r2.run_timestamp)
458-
from metadata.records r2
459-
where r2.source = r.source
460-
and r2.run_type = 'full'
461-
)
492+
* exclude (rn)
493+
from cr_ranked_records
494+
where rn = 1;
495+
496+
-- create view in metadata schema
497+
create or replace view metadata.current_records as
498+
select * from temp.main.current_records;
499+
"""
462500
)
463-
select
464-
{','.join(ORDERED_METADATA_COLUMN_NAMES)}
465-
from ranked_records
466-
where rn = 1;
467-
"""
468-
conn.execute(query)
469501

470502
def merge_append_deltas(self) -> None:
471503
"""Merge append deltas into the static metadata database file."""
@@ -602,7 +634,6 @@ def build_meta_query(
602634
).select_from(sa_table)
603635
if combined is not None:
604636
stmt = stmt.where(combined)
605-
stmt = stmt.order_by(sa_table.c.filename, sa_table.c.run_record_offset)
606637

607638
# using DuckDB dialect, compile to SQL string
608639
compiled = stmt.compile(

0 commit comments

Comments
 (0)