@@ -396,7 +396,8 @@ def _create_append_deltas_view(self, conn: DuckDBPyConnection) -> None:
396396 create or replace view metadata.append_deltas as (
397397 select *
398398 from read_parquet(
399- '{ self .append_deltas_path } /*.parquet'
399+ '{ self .append_deltas_path } /*.parquet',
400+ filename = 'append_delta_filename'
400401 )
401402 );
402403 """
@@ -414,14 +415,17 @@ def _create_append_deltas_view(self, conn: DuckDBPyConnection) -> None:
414415
415416 def _create_records_union_view (self , conn : DuckDBPyConnection ) -> None :
416417 logger .debug ("creating view of unioned records" )
418+
417419 conn .execute (
418- """
420+ f """
419421 create or replace view metadata.records as
420422 (
421- select *
423+ select
424+ { ',' .join (ORDERED_METADATA_COLUMN_NAMES )}
422425 from static_db.records
423426 union all
424- select *
427+ select
428+ { ',' .join (ORDERED_METADATA_COLUMN_NAMES )}
425429 from metadata.append_deltas
426430 );
427431 """
@@ -463,6 +467,79 @@ def _create_current_records_view(self, conn: DuckDBPyConnection) -> None:
463467 """
464468 conn .execute (query )
465469
470+ def merge_append_deltas (self ) -> None :
471+ """Merge append deltas into the static metadata database file."""
472+ logger .info ("merging append deltas into static metadata database file" )
473+
474+ start_time = time .perf_counter ()
475+
476+ s3_client = S3Client ()
477+
478+ # get filenames of append deltas
479+ append_delta_filenames = (
480+ self .conn .query (
481+ """
482+ select distinct(append_delta_filename)
483+ from metadata.append_deltas
484+ """
485+ )
486+ .to_df ()["append_delta_filename" ]
487+ .to_list ()
488+ )
489+
490+ if len (append_delta_filenames ) == 0 :
491+ logger .info ("no append deltas found" )
492+ return
493+
494+ logger .debug (f"{ len (append_delta_filenames )} append deltas found" )
495+
496+ with tempfile .TemporaryDirectory () as temp_dir :
497+ # create local copy of the static metadata database (static db) file
498+ local_db_path = str (Path (temp_dir ) / self .metadata_database_filename )
499+ if self .location_scheme == "s3" :
500+ s3_client .download_file (
501+ s3_uri = self .metadata_database_path , local_path = local_db_path
502+ )
503+ else :
504+ shutil .copy (src = self .metadata_database_path , dst = local_db_path )
505+
506+ # attach to local static db
507+ self .conn .execute (f"""attach '{ local_db_path } ' AS local_static_db;""" )
508+
509+ # insert records from append deltas to local static db
510+ self .conn .execute (
511+ f"""
512+ insert into local_static_db.records
513+ select
514+ { ',' .join (ORDERED_METADATA_COLUMN_NAMES )}
515+ from metadata.append_deltas
516+ """
517+ )
518+
519+ # detach from local static db
520+ self .conn .execute ("""detach local_static_db;""" )
521+
522+ # overwrite static db file with local version
523+ if self .location_scheme == "s3" :
524+ s3_client .upload_file (
525+ local_db_path ,
526+ self .metadata_database_path ,
527+ )
528+ else :
529+ shutil .copy (src = local_db_path , dst = self .metadata_database_path )
530+
531+ # delete append deltas
532+ for append_delta_filename in append_delta_filenames :
533+ if self .location_scheme == "s3" :
534+ s3_client .delete_file (s3_uri = append_delta_filename )
535+ else :
536+ os .remove (append_delta_filename )
537+
538+ logger .debug (
539+ "append deltas merged into the static metadata database file: "
540+ f"{ self .metadata_database_path } , { time .perf_counter ()- start_time } s"
541+ )
542+
466543 def write_append_delta_duckdb (self , filepath : str ) -> None :
467544 """Write an append delta for an ETL parquet file.
468545
0 commit comments