Skip to content

Commit fa272ef

Browse files
committed
core: Extract helpers from new_data_stream loop body
Refactor the loop body in new_data_stream into focused helper functions to improve readability and prepare for lazy stream chaining: - build_query_streams: builds SQL queries and fires them via the client - build_data_stream: creates StreamAggregator with metrics and reorg check - Refactor next_block_ranges/next_block_range to take specific fields instead of &Context<AC> - Simplify min_start_block/max_end_block as one-liner aggregates No behavior change.
1 parent 2e735ae commit fa272ef

2 files changed

Lines changed: 115 additions & 85 deletions

File tree

core/src/amp_subgraph/runner/context.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,6 @@ impl<AC> Context<AC> {
7777
.map(|block_ptr| (block_ptr.number.compat(), block_ptr.hash.compat()))
7878
}
7979

80-
pub(super) fn total_queries(&self) -> usize {
81-
self.manifest
82-
.data_sources
83-
.iter()
84-
.map(|data_source| data_source.transformer.tables.len())
85-
.sum()
86-
}
87-
8880
pub(super) fn min_start_block(&self) -> BlockNumber {
8981
self.manifest
9082
.data_sources

core/src/amp_subgraph/runner/data_stream.rs

Lines changed: 115 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@ use futures::{
88
};
99
use graph::{
1010
amp::{
11+
client::ResponseBatch,
12+
error::IsDeterministic,
1113
manifest::DataSource,
1214
stream_aggregator::{RecordBatchGroups, StreamAggregator},
1315
Client,
1416
},
1517
cheap_clone::CheapClone,
18+
prelude::StopwatchMetrics,
1619
};
1720
use slog::{debug, warn};
1821

@@ -29,7 +32,6 @@ where
2932
{
3033
let logger = cx.logger.new(slog::o!("process" => "new_data_stream"));
3134

32-
let total_queries = cx.total_queries();
3335
let mut total_queries_to_execute = 0;
3436
let mut data_streams = Vec::new();
3537
let mut latest_queried_block = cx.latest_synced_block();
@@ -43,82 +45,36 @@ where
4345
);
4446

4547
loop {
46-
let next_block_ranges = next_block_ranges(cx, latest_queried_block, latest_block);
48+
let block_ranges = next_block_ranges(
49+
&cx.manifest.data_sources,
50+
cx.max_block_range,
51+
latest_queried_block,
52+
latest_block,
53+
);
4754

48-
if next_block_ranges.is_empty() {
55+
if block_ranges.is_empty() {
4956
if data_streams.is_empty() {
5057
warn!(logger, "There are no unprocessed block ranges");
5158
}
5259
break;
5360
}
5461

55-
let mut query_streams = Vec::with_capacity(total_queries);
56-
let mut query_streams_table_ptr = Vec::with_capacity(total_queries);
57-
let mut min_start_block = BlockNumber::MAX;
58-
59-
for (i, data_source) in cx.manifest.data_sources.iter().enumerate() {
60-
let Some(block_range) = next_block_ranges.get(&i) else {
61-
continue;
62-
};
63-
64-
if *block_range.start() < min_start_block {
65-
min_start_block = *block_range.start();
66-
}
67-
68-
if *block_range.end() > max_end_block {
69-
max_end_block = *block_range.end();
70-
}
71-
72-
for (j, table) in data_source.transformer.tables.iter().enumerate() {
73-
let query = table.query.build_with_block_range(block_range);
74-
let stream = cx.client.query(&cx.logger, query, None);
75-
let stream_name = format!("{}.{}", data_source.name, table.name);
76-
77-
query_streams.push((stream_name, stream));
78-
query_streams_table_ptr.push((i, j));
79-
}
80-
}
62+
let min_start_block = block_ranges.values().map(|r| *r.start()).min().unwrap();
63+
max_end_block =
64+
max_end_block.max(block_ranges.values().map(|r| *r.end()).max().unwrap());
8165

82-
let query_streams_table_ptr: Arc<[TablePtr]> = query_streams_table_ptr.into();
66+
let (query_streams, table_ptrs) =
67+
build_query_streams(&*cx.client, &logger, &cx.manifest.data_sources, &block_ranges);
8368
total_queries_to_execute += query_streams.len();
8469

85-
let mut min_start_block_checked = false;
86-
let mut load_first_record_batch_group_section = Some(
87-
cx.metrics
88-
.stopwatch
89-
.start_section("load_first_record_batch_group"),
90-
);
91-
92-
data_streams.push(
93-
StreamAggregator::new(&cx.logger, query_streams, cx.max_buffer_size)
94-
.map_ok(move |response| (response, query_streams_table_ptr.cheap_clone()))
95-
.map_err(Error::from)
96-
.map(move |result| {
97-
if load_first_record_batch_group_section.is_some() {
98-
let _section = load_first_record_batch_group_section.take();
99-
}
100-
101-
match result {
102-
Ok(response) => {
103-
if !min_start_block_checked {
104-
if let Some(((first_block, _), _)) = response.0.first_key_value() {
105-
if *first_block < min_start_block {
106-
return Err(Error::NonDeterministic(anyhow!(
107-
"chain reorg"
108-
)));
109-
}
110-
}
111-
112-
min_start_block_checked = true;
113-
}
114-
115-
Ok(response)
116-
}
117-
Err(e) => Err(e),
118-
}
119-
})
120-
.boxed(),
121-
);
70+
data_streams.push(build_data_stream(
71+
&logger,
72+
query_streams,
73+
table_ptrs,
74+
cx.max_buffer_size,
75+
&cx.metrics.stopwatch,
76+
min_start_block,
77+
));
12278

12379
if max_end_block >= latest_block {
12480
break;
@@ -142,19 +98,101 @@ where
14298
merged_data_stream
14399
}
144100

145-
fn next_block_ranges<AC>(
146-
cx: &Context<AC>,
101+
fn build_query_streams<AC: Client>(
102+
client: &AC,
103+
logger: &slog::Logger,
104+
data_sources: &[DataSource],
105+
block_ranges: &HashMap<usize, RangeInclusive<BlockNumber>>,
106+
) -> (
107+
Vec<(String, BoxStream<'static, Result<ResponseBatch, AC::Error>>)>,
108+
Arc<[TablePtr]>,
109+
) {
110+
let total_queries: usize = data_sources
111+
.iter()
112+
.map(|ds| ds.transformer.tables.len())
113+
.sum();
114+
115+
let mut query_streams = Vec::with_capacity(total_queries);
116+
let mut table_ptrs = Vec::with_capacity(total_queries);
117+
118+
for (i, data_source) in data_sources.iter().enumerate() {
119+
let Some(block_range) = block_ranges.get(&i) else {
120+
continue;
121+
};
122+
123+
for (j, table) in data_source.transformer.tables.iter().enumerate() {
124+
let query = table.query.build_with_block_range(block_range);
125+
let stream = client.query(logger, query, None);
126+
let stream_name = format!("{}.{}", data_source.name, table.name);
127+
128+
query_streams.push((stream_name, stream));
129+
table_ptrs.push((i, j));
130+
}
131+
}
132+
133+
(query_streams, table_ptrs.into())
134+
}
135+
136+
fn build_data_stream<E>(
137+
logger: &slog::Logger,
138+
query_streams: Vec<(String, BoxStream<'static, Result<ResponseBatch, E>>)>,
139+
table_ptrs: Arc<[TablePtr]>,
140+
max_buffer_size: usize,
141+
stopwatch: &StopwatchMetrics,
142+
min_start_block: BlockNumber,
143+
) -> BoxStream<'static, Result<(RecordBatchGroups, Arc<[TablePtr]>), Error>>
144+
where
145+
E: std::error::Error + IsDeterministic + Send + Sync + 'static,
146+
{
147+
let mut min_start_block_checked = false;
148+
let mut load_first_record_batch_group_section =
149+
Some(stopwatch.start_section("load_first_record_batch_group"));
150+
151+
StreamAggregator::new(logger, query_streams, max_buffer_size)
152+
.map_ok(move |response| (response, table_ptrs.cheap_clone()))
153+
.map_err(Error::from)
154+
.map(move |result| {
155+
if load_first_record_batch_group_section.is_some() {
156+
let _section = load_first_record_batch_group_section.take();
157+
}
158+
159+
match result {
160+
Ok(response) => {
161+
if !min_start_block_checked {
162+
if let Some(((first_block, _), _)) = response.0.first_key_value() {
163+
if *first_block < min_start_block {
164+
return Err(Error::NonDeterministic(anyhow!("chain reorg")));
165+
}
166+
}
167+
168+
min_start_block_checked = true;
169+
}
170+
171+
Ok(response)
172+
}
173+
Err(e) => Err(e),
174+
}
175+
})
176+
.boxed()
177+
}
178+
179+
fn next_block_ranges(
180+
data_sources: &[DataSource],
181+
max_block_range: usize,
147182
latest_queried_block: Option<BlockNumber>,
148183
latest_block: BlockNumber,
149184
) -> HashMap<usize, RangeInclusive<BlockNumber>> {
150-
let block_ranges = cx
151-
.manifest
152-
.data_sources
185+
let block_ranges = data_sources
153186
.iter()
154187
.enumerate()
155188
.filter_map(|(i, data_source)| {
156-
next_block_range(cx, data_source, latest_queried_block, latest_block)
157-
.map(|block_range| (i, block_range))
189+
next_block_range(
190+
max_block_range,
191+
data_source,
192+
latest_queried_block,
193+
latest_block,
194+
)
195+
.map(|block_range| (i, block_range))
158196
})
159197
.collect::<HashMap<_, _>>();
160198

@@ -172,8 +210,8 @@ fn next_block_ranges<AC>(
172210
.collect()
173211
}
174212

175-
fn next_block_range<AC>(
176-
cx: &Context<AC>,
213+
fn next_block_range(
214+
max_block_range: usize,
177215
data_source: &DataSource,
178216
latest_queried_block: Option<BlockNumber>,
179217
latest_block: BlockNumber,
@@ -190,7 +228,7 @@ fn next_block_range<AC>(
190228
};
191229

192230
let end_block = [
193-
start_block.saturating_add(cx.max_block_range as BlockNumber),
231+
start_block.saturating_add(max_block_range as BlockNumber),
194232
data_source.source.end_block,
195233
latest_block,
196234
]

0 commit comments

Comments
 (0)