Skip to content

Commit 9b4d918

Browse files
committed
core: Rename methods to get start and end block for amp subgraphs
1 parent 8619036 commit 9b4d918

3 files changed

Lines changed: 12 additions & 12 deletions

File tree

core/src/amp_subgraph/runner/context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl<AC> Context<AC> {
7777
.map(|block_ptr| (block_ptr.number.compat(), block_ptr.hash.compat()))
7878
}
7979

80-
pub(super) fn min_start_block(&self) -> BlockNumber {
80+
pub(super) fn start_block(&self) -> BlockNumber {
8181
self.manifest
8282
.data_sources
8383
.iter()
@@ -86,7 +86,7 @@ impl<AC> Context<AC> {
8686
.unwrap()
8787
}
8888

89-
pub(super) fn max_end_block(&self) -> BlockNumber {
89+
pub(super) fn end_block(&self) -> BlockNumber {
9090
self.manifest
9191
.data_sources
9292
.iter()

core/src/amp_subgraph/runner/data_stream.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ where
4040
debug!(logger, "Creating data stream";
4141
"from_block" => cx.latest_synced_block().unwrap_or(BlockNumber::MIN),
4242
"to_block" => latest_block,
43-
"start_block" => cx.min_start_block(),
43+
"start_block" => cx.start_block(),
4444
"max_block_range" => max_block_range,
4545
);
4646

47-
// State: (latest_queried_block, max_end_block, is_first)
47+
// State: (latest_queried_block, end_block, is_first)
4848
let initial_state = (cx.latest_synced_block(), BlockNumber::MIN, true);
4949

5050
stream::unfold(
@@ -132,12 +132,12 @@ fn build_data_stream<E>(
132132
table_ptrs: Arc<[TablePtr]>,
133133
buffer_size: usize,
134134
stopwatch: &StopwatchMetrics,
135-
min_start_block: BlockNumber,
135+
start_block: BlockNumber,
136136
) -> BoxStream<'static, Result<(RecordBatchGroups, Arc<[TablePtr]>), Error>>
137137
where
138138
E: std::error::Error + IsDeterministic + Send + Sync + 'static,
139139
{
140-
let mut min_start_block_checked = false;
140+
let mut start_block_checked = false;
141141
let mut load_first_record_batch_group_section =
142142
Some(stopwatch.start_section("load_first_record_batch_group"));
143143

@@ -151,14 +151,14 @@ where
151151

152152
match result {
153153
Ok(response) => {
154-
if !min_start_block_checked {
154+
if !start_block_checked {
155155
if let Some(((first_block, _), _)) = response.0.first_key_value() {
156-
if *first_block < min_start_block {
156+
if *first_block < start_block {
157157
return Err(Error::NonDeterministic(anyhow!("chain reorg")));
158158
}
159159
}
160160

161-
min_start_block_checked = true;
161+
start_block_checked = true;
162162
}
163163

164164
Ok(response)

core/src/amp_subgraph/runner/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ where
101101

102102
cx.metrics
103103
.deployment_target
104-
.update(latest_block.min(cx.max_end_block()));
104+
.update(latest_block.min(cx.end_block()));
105105

106106
let mut deployment_is_failed = cx.store.health().await?.is_failed();
107107
let mut entity_cache = EntityCache::new(cx.store.cheap_clone());
@@ -131,12 +131,12 @@ where
131131
// source's endBlock. This handles the case where endBlock has no entity
132132
// data — the persisted block pointer never advances to endBlock, but the
133133
// server's latest block confirms all queries have been served.
134-
if latest_block >= cx.max_end_block() {
134+
if latest_block >= cx.end_block() {
135135
cx.metrics.deployment_synced.record(true);
136136

137137
debug!(cx.logger, "Indexing completed; endBlock reached via server latest block";
138138
"latest_block" => latest_block,
139-
"max_end_block" => cx.max_end_block()
139+
"end_block" => cx.end_block()
140140
);
141141
return Ok(());
142142
}

0 commit comments

Comments
 (0)