Skip to content

Commit 314ba71

Browse files
committed
graph, node, store: Add incremental dump support
When a dump directory already contains a previous dump (metadata.json), subsequent dumps only write new rows (VID-append) and generate clamp files for mutable tables whose block_range was closed since the last dump. On restore, clamps are loaded into memory and applied to rows before DB insertion. Key changes: - Add clamp Arrow schema (vid + block_range_end) - Add #[serde(default)] clamps field to TableInfo for backward compat - Make dump_entity_table and dump_data_sources incremental via prev param - Layout::dump detects previous dumps, validates deployment match and guards against reorgs - Add load_clamps() to read clamp parquet files into a vid->end map - Apply clamps during restore in import_entity_table and import_data_sources - Add DumpReporter::start_clamps/finish_clamps with CLI progress output
1 parent b712185 commit 314ba71

6 files changed

Lines changed: 739 additions & 35 deletions

File tree

graph/src/components/store/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,6 +1001,12 @@ pub trait DumpReporter: Send + 'static {
10011001
/// Called after data_sources$ has been dumped.
10021002
fn finish_data_sources(&mut self, rows: usize) {}
10031003

1004+
/// Called before dumping clamps for a table.
1005+
fn start_clamps(&mut self, _table: &str, _rows_approx: usize) {}
1006+
1007+
/// Called after clamps have been dumped for a table.
1008+
fn finish_clamps(&mut self, _table: &str, _rows: usize) {}
1009+
10041010
/// Called when the entire dump has completed.
10051011
fn finish(&mut self) {}
10061012
}

node/src/manager/commands/dump.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,26 @@ impl DumpReporter for DumpProgress {
8787
self.spinner.suspend(|| println!("{line}"));
8888
}
8989

90+
fn start_clamps(&mut self, table: &str, rows_approx: usize) {
91+
self.spinner.set_message(format!(
92+
"{:<32} clamps ~{} rows",
93+
table,
94+
format_count(rows_approx),
95+
));
96+
}
97+
98+
fn finish_clamps(&mut self, table: &str, rows: usize) {
99+
if rows > 0 {
100+
let line = format!(
101+
" {} {:<32} {:>10} clamps",
102+
style("\u{2714}").green(),
103+
table,
104+
format_count(rows),
105+
);
106+
self.spinner.suspend(|| println!("{line}"));
107+
}
108+
}
109+
90110
fn finish(&mut self) {
91111
let elapsed = self.start.elapsed().as_secs();
92112
self.spinner.finish_with_message(format!(

store/postgres/src/parquet/convert.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::HashMap;
2+
use std::path::Path;
13
use std::str::FromStr;
24
use std::sync::Arc;
35

@@ -412,6 +414,31 @@ pub fn record_batch_to_data_source_rows(
412414
Ok(rows)
413415
}
414416

417+
/// Load clamp files and build a `vid → block_range_end` map.
418+
///
419+
/// Each clamp parquet file contains rows with `(vid: i64, block_range_end: i32)`.
420+
/// Later clamp files overwrite earlier ones for the same vid (only the
421+
/// latest value matters).
422+
pub fn load_clamps(
423+
dir: &Path,
424+
clamps: &[crate::parquet::writer::ChunkInfo],
425+
) -> Result<HashMap<i64, i32>, StoreError> {
426+
let mut map = HashMap::new();
427+
for chunk_info in clamps {
428+
let path = dir.join(&chunk_info.file);
429+
let batches = crate::parquet::reader::read_batches(&path)?;
430+
for batch in batches {
431+
let batch = batch?;
432+
let vid_arr = downcast_i64(batch.column(0), "vid")?;
433+
let bre_arr = downcast_i32(batch.column(1), "block_range_end")?;
434+
for row in 0..batch.num_rows() {
435+
map.insert(vid_arr.value(row), bre_arr.value(row));
436+
}
437+
}
438+
}
439+
Ok(map)
440+
}
441+
415442
// -- Downcasting helpers --
416443

417444
fn downcast_i64<'a>(array: &'a ArrayRef, name: &str) -> Result<&'a Int64Array, StoreError> {
@@ -1134,4 +1161,66 @@ mod tests {
11341161
let restore_rows = record_batch_to_restore_rows(&batch, table).unwrap();
11351162
assert!(restore_rows.is_empty());
11361163
}
1164+
1165+
#[test]
1166+
fn load_clamps_roundtrip() {
1167+
use crate::parquet::schema::clamp_arrow_schema;
1168+
use crate::parquet::writer::{ChunkInfo, ParquetChunkWriter};
1169+
1170+
let tmp_dir =
1171+
std::env::temp_dir().join(format!("graph_node_clamp_test_{}", std::process::id()));
1172+
std::fs::create_dir_all(&tmp_dir).unwrap();
1173+
1174+
let schema = clamp_arrow_schema();
1175+
1176+
// Write clamp file 1
1177+
let path1 = tmp_dir.join("clamp_000000.parquet");
1178+
let mut writer1 =
1179+
ParquetChunkWriter::new(path1, "clamp_000000.parquet".into(), &schema).unwrap();
1180+
let batch1 = arrow::array::RecordBatch::try_new(
1181+
Arc::new(schema.clone()),
1182+
vec![
1183+
Arc::new(Int64Array::from(vec![10, 20])),
1184+
Arc::new(Int32Array::from(vec![500, 600])),
1185+
],
1186+
)
1187+
.unwrap();
1188+
writer1.write_batch(&batch1, 10, 20).unwrap();
1189+
let chunk1 = writer1.finish().unwrap();
1190+
1191+
// Write clamp file 2 (overwrites vid 20 with a different end)
1192+
let path2 = tmp_dir.join("clamp_000001.parquet");
1193+
let mut writer2 =
1194+
ParquetChunkWriter::new(path2, "clamp_000001.parquet".into(), &schema).unwrap();
1195+
let batch2 = arrow::array::RecordBatch::try_new(
1196+
Arc::new(schema.clone()),
1197+
vec![
1198+
Arc::new(Int64Array::from(vec![20, 30])),
1199+
Arc::new(Int32Array::from(vec![700, 800])),
1200+
],
1201+
)
1202+
.unwrap();
1203+
writer2.write_batch(&batch2, 20, 30).unwrap();
1204+
let chunk2 = writer2.finish().unwrap();
1205+
1206+
let clamps = vec![
1207+
ChunkInfo {
1208+
file: chunk1.file,
1209+
..chunk1
1210+
},
1211+
ChunkInfo {
1212+
file: chunk2.file,
1213+
..chunk2
1214+
},
1215+
];
1216+
1217+
let map = load_clamps(&tmp_dir, &clamps).unwrap();
1218+
assert_eq!(map.len(), 3);
1219+
assert_eq!(map[&10], 500);
1220+
assert_eq!(map[&20], 700); // overwritten by second clamp file
1221+
assert_eq!(map[&30], 800);
1222+
1223+
// Cleanup
1224+
let _ = std::fs::remove_dir_all(&tmp_dir);
1225+
}
11371226
}

store/postgres/src/parquet/schema.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,15 @@ pub fn data_sources_arrow_schema() -> Schema {
6262
])
6363
}
6464

65+
/// Arrow schema for clamp files: records which rows had their
66+
/// `block_range` upper bound set (clamped) since a previous dump.
67+
pub fn clamp_arrow_schema() -> Schema {
68+
Schema::new(vec![
69+
Field::new("vid", DataType::Int64, false),
70+
Field::new("block_range_end", DataType::Int32, false),
71+
])
72+
}
73+
6574
fn column_type_to_arrow(ct: &ColumnType) -> DataType {
6675
match ct {
6776
ColumnType::Boolean => DataType::Boolean,
@@ -361,6 +370,31 @@ mod tests {
361370
);
362371
}
363372

373+
#[test]
374+
fn clamp_schema() {
375+
let schema = clamp_arrow_schema();
376+
let names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
377+
assert_eq!(names, &["vid", "block_range_end"]);
378+
379+
assert_eq!(
380+
schema.field_with_name("vid").unwrap().data_type(),
381+
&DataType::Int64
382+
);
383+
assert!(!schema.field_with_name("vid").unwrap().is_nullable());
384+
385+
assert_eq!(
386+
schema
387+
.field_with_name("block_range_end")
388+
.unwrap()
389+
.data_type(),
390+
&DataType::Int32
391+
);
392+
assert!(!schema
393+
.field_with_name("block_range_end")
394+
.unwrap()
395+
.is_nullable());
396+
}
397+
364398
#[test]
365399
fn enum_columns_map_to_utf8() {
366400
let layout = test_layout(

0 commit comments

Comments
 (0)