Skip to content

Commit ccc1835

Browse files
committed
core: Lazily produce data streams with stream::unfold
Replace the eager loop that builds all StreamAggregator instances upfront and chains them together with stream::unfold + flatten. Each block-range iteration's stream is now produced on-demand only when the previous one is exhausted, avoiding a large upfront chain structure when there are many block ranges to cover.
1 parent fa272ef commit ccc1835

2 files changed

Lines changed: 54 additions & 61 deletions

File tree

core/src/amp_subgraph/runner/data_stream.rs

Lines changed: 52 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{collections::HashMap, ops::RangeInclusive, sync::Arc};
33
use alloy::primitives::BlockNumber;
44
use anyhow::anyhow;
55
use futures::{
6-
stream::{empty, BoxStream},
6+
stream::{self, BoxStream},
77
StreamExt, TryStreamExt,
88
};
99
use graph::{
@@ -28,74 +28,67 @@ pub(super) fn new_data_stream<AC>(
2828
latest_block: BlockNumber,
2929
) -> BoxStream<'static, Result<(RecordBatchGroups, Arc<[TablePtr]>), Error>>
3030
where
31-
AC: Client,
31+
AC: Client + Send + Sync + 'static,
3232
{
3333
let logger = cx.logger.new(slog::o!("process" => "new_data_stream"));
34-
35-
let mut total_queries_to_execute = 0;
36-
let mut data_streams = Vec::new();
37-
let mut latest_queried_block = cx.latest_synced_block();
38-
let mut max_end_block = BlockNumber::MIN;
34+
let client = cx.client.cheap_clone();
35+
let manifest = cx.manifest.clone();
36+
let max_buffer_size = cx.max_buffer_size;
37+
let max_block_range = cx.max_block_range;
38+
let stopwatch = cx.metrics.stopwatch.cheap_clone();
3939

4040
debug!(logger, "Creating data stream";
41-
"from_block" => latest_queried_block.unwrap_or(BlockNumber::MIN),
41+
"from_block" => cx.latest_synced_block().unwrap_or(BlockNumber::MIN),
4242
"to_block" => latest_block,
43-
"min_start_block" => cx.min_start_block(),
44-
"max_block_range" => cx.max_block_range,
43+
"start_block" => cx.min_start_block(),
44+
"max_block_range" => max_block_range,
4545
);
4646

47-
loop {
48-
let block_ranges = next_block_ranges(
49-
&cx.manifest.data_sources,
50-
cx.max_block_range,
51-
latest_queried_block,
52-
latest_block,
53-
);
54-
55-
if block_ranges.is_empty() {
56-
if data_streams.is_empty() {
57-
warn!(logger, "There are no unprocessed block ranges");
58-
}
59-
break;
60-
}
61-
62-
let min_start_block = block_ranges.values().map(|r| *r.start()).min().unwrap();
63-
max_end_block =
64-
max_end_block.max(block_ranges.values().map(|r| *r.end()).max().unwrap());
65-
66-
let (query_streams, table_ptrs) =
67-
build_query_streams(&*cx.client, &logger, &cx.manifest.data_sources, &block_ranges);
68-
total_queries_to_execute += query_streams.len();
69-
70-
data_streams.push(build_data_stream(
71-
&logger,
72-
query_streams,
73-
table_ptrs,
74-
cx.max_buffer_size,
75-
&cx.metrics.stopwatch,
76-
min_start_block,
77-
));
78-
79-
if max_end_block >= latest_block {
80-
break;
81-
}
82-
83-
latest_queried_block = Some(max_end_block);
84-
}
47+
// State: (latest_queried_block, max_end_block, is_first)
48+
let initial_state = (cx.latest_synced_block(), BlockNumber::MIN, true);
8549

86-
debug!(logger, "Created aggregated data streams";
87-
"total_data_streams" => data_streams.len(),
88-
"total_queries_to_execute" => total_queries_to_execute
89-
);
90-
91-
let mut iter = data_streams.into_iter();
92-
let mut merged_data_stream = iter.next().unwrap_or_else(|| empty().boxed());
50+
stream::unfold(
51+
initial_state,
52+
move |(latest_queried_block, mut end_block, is_first)| {
53+
let block_ranges = next_block_ranges(
54+
&manifest.data_sources,
55+
max_block_range,
56+
latest_queried_block,
57+
latest_block,
58+
);
9359

94-
for data_stream in iter {
95-
merged_data_stream = merged_data_stream.chain(data_stream).boxed();
96-
}
60+
if block_ranges.is_empty() {
61+
if is_first {
62+
warn!(logger, "There are no unprocessed block ranges");
63+
}
64+
return futures::future::ready(None);
65+
}
9766

98-
merged_data_stream
67+
let start_block = block_ranges.values().map(|r| *r.start()).min().unwrap();
68+
end_block = end_block.max(block_ranges.values().map(|r| *r.end()).max().unwrap());
69+
70+
let (query_streams, table_ptrs) =
71+
build_query_streams(&*client, &logger, &manifest.data_sources, &block_ranges);
72+
73+
let data_stream = build_data_stream(
74+
&logger,
75+
query_streams,
76+
table_ptrs,
77+
max_buffer_size,
78+
&stopwatch,
79+
start_block,
80+
);
81+
82+
debug!(logger, "Created a new data stream";
83+
"latest_queried_block" => latest_queried_block,
84+
"start_block" => start_block,
85+
"end_block" => end_block,
86+
);
87+
futures::future::ready(Some((data_stream, (Some(end_block), end_block, false))))
88+
},
89+
)
90+
.flatten()
91+
.boxed()
9992
}
10093

10194
fn build_query_streams<AC: Client>(

core/src/amp_subgraph/runner/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ where
6767

6868
async fn run_indexing<AC>(cx: &mut Context<AC>) -> Result<(), Error>
6969
where
70-
AC: Client,
70+
AC: Client + Send + Sync + 'static,
7171
{
7272
cx.metrics.deployment_status.starting();
7373

@@ -152,7 +152,7 @@ where
152152

153153
async fn run_indexing_with_retries<AC>(cx: &mut Context<AC>) -> Result<()>
154154
where
155-
AC: Client,
155+
AC: Client + Send + Sync + 'static,
156156
{
157157
loop {
158158
match run_indexing(cx).await {

0 commit comments

Comments
 (0)