Skip to content

Commit 4db386b

Browse files
committed
core: Remove error return from IndexingContext.revert_data_sources
The unnecessary return makes callers look like they could fail
1 parent 102d998 commit 4db386b

File tree

2 files changed

+24
-35
lines changed

2 files changed

+24
-35
lines changed

core/src/subgraph/context/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,12 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
114114
/// `process_trigger`.
115115
///
116116
/// File data sources that have been marked not done during this process will get re-queued
117-
pub fn revert_data_sources(&mut self, reverted_block: BlockNumber) -> Result<(), Error> {
117+
pub fn revert_data_sources(&mut self, reverted_block: BlockNumber) {
118118
let removed = self.instance.revert_data_sources(reverted_block);
119119

120120
removed
121121
.into_iter()
122-
.try_for_each(|source| self.offchain_monitor.add_source(source))
122+
.for_each(|source| self.offchain_monitor.add_source(source))
123123
}
124124

125125
pub fn add_dynamic_data_source(
@@ -136,7 +136,7 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
136136
if let Some((source, is_processed)) = offchain_fields {
137137
// monitor data source only if it has not yet been processed.
138138
if !is_processed {
139-
self.offchain_monitor.add_source(source)?;
139+
self.offchain_monitor.add_source(source);
140140
}
141141
}
142142
}
@@ -212,15 +212,14 @@ impl OffchainMonitor {
212212
}
213213
}
214214

215-
fn add_source(&mut self, source: offchain::Source) -> Result<(), Error> {
215+
fn add_source(&mut self, source: offchain::Source) {
216216
match source {
217217
offchain::Source::Ipfs(path) => self.ipfs_monitor.monitor(IpfsRequest {
218218
ctx: IpfsContext::new(&self.deployment_hash, &self.logger),
219219
path,
220220
}),
221221
offchain::Source::Arweave(base64) => self.arweave_monitor.monitor(base64),
222222
};
223-
Ok(())
224223
}
225224

226225
pub fn ready_offchain_events(&mut self) -> Result<Vec<offchain::TriggerData>, Error> {

core/src/subgraph/runner/mod.rs

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,13 @@ where
124124
/// be removed. The same thing also applies to the block cache.
125125
/// This function must be called before continuing to process in order to avoid
126126
/// duplicated host insertion and POI issues with dirty entity changes.
127-
fn revert_state_to(&mut self, block_number: BlockNumber) -> Result<(), Error> {
127+
fn revert_state_to(&mut self, block_number: BlockNumber) {
128128
self.state.entity_lfu_cache = LfuCache::new();
129129

130130
// 1. Revert all hosts(created by DDS) at a block higher than `block_number`.
131131
// 2. Unmark any offchain data sources that were marked done on the blocks being removed.
132132
// When no offchain datasources are present, 2. should be a noop.
133-
self.ctx.revert_data_sources(block_number + 1)?;
134-
Ok(())
133+
self.ctx.revert_data_sources(block_number + 1);
135134
}
136135

137136
#[cfg(debug_assertions)]
@@ -420,7 +419,7 @@ where
420419
let store = self.inputs.store.cheap_clone();
421420
if let Some(store) = store.restart().await? {
422421
let last_good_block = store.block_ptr().map(|ptr| ptr.number).unwrap_or(0);
423-
self.revert_state_to(last_good_block)?;
422+
self.revert_state_to(last_good_block);
424423
self.inputs = Arc::new(self.inputs.with_store(store));
425424
}
426425
}
@@ -522,10 +521,7 @@ where
522521
block_stream,
523522
to_ptr,
524523
cursor,
525-
} => {
526-
self.handle_revert_state(block_stream, to_ptr, cursor)
527-
.await?
528-
}
524+
} => self.handle_revert_state(block_stream, to_ptr, cursor).await,
529525

530526
RunnerState::Stopped { reason } => {
531527
return self.finalize(reason).await;
@@ -610,20 +606,20 @@ where
610606
block_stream: Cancelable<Box<dyn BlockStream<C>>>,
611607
revert_to_ptr: BlockPtr,
612608
cursor: FirehoseCursor,
613-
) -> Result<RunnerState<C>, SubgraphRunnerError> {
609+
) -> RunnerState<C> {
614610
let stopwatch = &self.metrics.stream.stopwatch;
615611
let _section = stopwatch.start_section(HANDLE_REVERT_SECTION_NAME);
616612

617-
let action = self.handle_revert(revert_to_ptr, cursor).await?;
613+
let action = self.handle_revert(revert_to_ptr, cursor).await;
618614

619615
match action {
620-
Action::Continue => Ok(RunnerState::AwaitingBlock { block_stream }),
621-
Action::Restart => Ok(RunnerState::Restarting {
616+
Action::Continue => RunnerState::AwaitingBlock { block_stream },
617+
Action::Restart => RunnerState::Restarting {
622618
reason: RestartReason::StoreError,
623-
}),
624-
Action::Stop => Ok(RunnerState::Stopped {
619+
},
620+
Action::Stop => RunnerState::Stopped {
625621
reason: StopReason::Canceled,
626-
}),
622+
},
627623
}
628624
}
629625

@@ -1355,7 +1351,7 @@ where
13551351
.block_ptr()
13561352
.map(|ptr| ptr.number)
13571353
.unwrap_or(0);
1358-
self.revert_state_to(last_good_block)?;
1354+
self.revert_state_to(last_good_block);
13591355

13601356
Ok(Action::Restart)
13611357
}
@@ -1369,7 +1365,7 @@ where
13691365
.block_ptr()
13701366
.map(|ptr| ptr.number)
13711367
.unwrap_or(0);
1372-
self.revert_state_to(last_good_block)?;
1368+
self.revert_state_to(last_good_block);
13731369

13741370
let message = format!("{:#}", e).replace('\n', "\t");
13751371
let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure);
@@ -1388,7 +1384,7 @@ where
13881384
.block_ptr()
13891385
.map(|ptr| ptr.number)
13901386
.unwrap_or(0);
1391-
self.revert_state_to(last_good_block)?;
1387+
self.revert_state_to(last_good_block);
13921388

13931389
let message = format!("{:#}", e).replace('\n', "\t");
13941390

@@ -1612,19 +1608,15 @@ where
16121608
C: Blockchain,
16131609
T: RuntimeHostBuilder<C>,
16141610
{
1615-
async fn handle_revert(
1616-
&mut self,
1617-
revert_to_ptr: BlockPtr,
1618-
cursor: FirehoseCursor,
1619-
) -> Result<Action, Error> {
1611+
async fn handle_revert(&mut self, revert_to_ptr: BlockPtr, cursor: FirehoseCursor) -> Action {
16201612
// Current deployment head in the database / WritableAgent Mutex cache.
16211613
//
16221614
// Safe unwrap because in a Revert event we're sure the subgraph has
16231615
// advanced at least once.
16241616
let subgraph_ptr = self.inputs.store.block_ptr().unwrap();
16251617
if revert_to_ptr.number >= subgraph_ptr.number {
16261618
info!(&self.logger, "Block to revert is higher than subgraph pointer, nothing to do"; "subgraph_ptr" => &subgraph_ptr, "revert_to_ptr" => &revert_to_ptr);
1627-
return Ok(Action::Continue);
1619+
return Action::Continue;
16281620
}
16291621

16301622
info!(&self.logger, "Reverting block to get back to main chain"; "subgraph_ptr" => &subgraph_ptr, "revert_to_ptr" => &revert_to_ptr);
@@ -1638,7 +1630,7 @@ where
16381630
error!(&self.logger, "Could not revert block. Retrying"; "error" => %e);
16391631

16401632
// Exit inner block stream consumption loop and go up to loop that restarts subgraph
1641-
return Ok(Action::Restart);
1633+
return Action::Restart;
16421634
}
16431635

16441636
self.metrics
@@ -1650,17 +1642,15 @@ where
16501642
.deployment_head
16511643
.set(subgraph_ptr.number as f64);
16521644

1653-
self.revert_state_to(revert_to_ptr.number)?;
1645+
self.revert_state_to(revert_to_ptr.number);
16541646

16551647
let needs_restart: bool = self.needs_restart(revert_to_ptr, subgraph_ptr);
16561648

1657-
let action = if needs_restart {
1649+
if needs_restart {
16581650
Action::Restart
16591651
} else {
16601652
Action::Continue
1661-
};
1662-
1663-
Ok(action)
1653+
}
16641654
}
16651655

16661656
/// Determines if the subgraph needs to be restarted.

0 commit comments

Comments
 (0)