Skip to content

Commit 27bf875

Browse files
store: Add rebuild_storage to ChainStore and BlockStore
Adds ChainStore::rebuild_storage, which drops the chain's schema (if present), upserts the ethereum_networks row to reset head tracking columns, and recreates the schema with empty tables — all in a single transaction. Adds BlockStore::has_namespace and BlockStore::rebuild_chain_storage as the public entry points used by the graphman command.
1 parent f031138 commit 27bf875

File tree

2 files changed

+74
-2
lines changed

2 files changed

+74
-2
lines changed

store/postgres/src/block_store.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,29 @@ impl BlockStore {
484484
Ok(())
485485
}
486486

487+
pub async fn has_namespace(&self, chain: &primary::Chain) -> Result<bool, StoreError> {
488+
let pool = self
489+
.pools
490+
.get(&chain.shard)
491+
.ok_or_else(|| internal_error!("no pool for shard {}", chain.shard))?;
492+
let nsp = crate::primary::Namespace::special(chain.storage.to_string());
493+
let mut conn = pool.get_permitted().await?;
494+
crate::catalog::has_namespace(&mut conn, &nsp).await
495+
}
496+
497+
pub async fn rebuild_chain_storage(
498+
&self,
499+
chain: &str,
500+
ident: &ChainIdentifier,
501+
) -> Result<(), StoreError> {
502+
let chain_store = self
503+
.store(chain)
504+
.await
505+
.ok_or_else(|| internal_error!("No chain store found for {}", chain))?;
506+
507+
Ok(chain_store.rebuild_storage(ident).await?)
508+
}
509+
487510
// Helper to clone the list of chain stores to avoid holding the lock
488511
// while awaiting
489512
fn stores(&self) -> Vec<Arc<ChainStore>> {

store/postgres/src/chain_store.rs

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use async_trait::async_trait;
33
use diesel::sql_types::Text;
44
use diesel::{insert_into, update, ExpressionMethods, OptionalExtension, QueryDsl};
55
use diesel_async::AsyncConnection;
6-
use diesel_async::{scoped_futures::ScopedFutureExt, RunQueryDsl};
6+
use diesel_async::{scoped_futures::ScopedFutureExt, RunQueryDsl, SimpleAsyncConnection};
77

88
use graph::components::store::ChainHeadStore;
99
use graph::data::store::ethereum::call;
@@ -12,7 +12,7 @@ use graph::parking_lot::RwLock;
1212
use graph::prelude::alloy::primitives::B256;
1313
use graph::prelude::MetricsRegistry;
1414
use graph::prometheus::{CounterVec, GaugeVec};
15-
use graph::slog::{info, o, Logger};
15+
use graph::slog::{debug, info, o, Logger};
1616
use graph::stable_hash::crypto_stable_hash;
1717
use graph::util::herd_cache::HerdCache;
1818

@@ -2479,6 +2479,55 @@ impl ChainStore {
24792479
.await
24802480
}
24812481

2482+
/// Drop the chain's storage schema (if it exists), reset head
2483+
/// metadata in `ethereum_networks`, and rebuild the schema with
2484+
/// empty tables. If the `ethereum_networks` row is missing, it is
2485+
/// created from the provided `ident`.
2486+
pub(crate) async fn rebuild_storage(&self, ident: &ChainIdentifier) -> Result<(), Error> {
2487+
use public::ethereum_networks as n;
2488+
2489+
let nsp = self.storage.to_string();
2490+
2491+
debug!(&self.logger, "Rebuilding storage for chain"; "chain" => &self.chain, "namespace" => &nsp);
2492+
2493+
let mut conn = self.pool.get_permitted().await?;
2494+
conn.transaction(|conn| {
2495+
async {
2496+
debug!(&self.logger, "Dropping existing schema if present"; "namespace" => &nsp);
2497+
conn.batch_execute(&format!("DROP SCHEMA IF EXISTS {nsp} CASCADE"))
2498+
.await?;
2499+
2500+
debug!(&self.logger, "Upserting ethereum_networks row"; "chain" => &self.chain);
2501+
insert_into(n::table)
2502+
.values((
2503+
n::name.eq(&self.chain),
2504+
n::namespace.eq(&self.storage),
2505+
n::net_version.eq(&ident.net_version),
2506+
n::genesis_block_hash.eq(ident.genesis_block_hash.hash_hex()),
2507+
))
2508+
.on_conflict(n::name)
2509+
.do_update()
2510+
.set((
2511+
n::namespace.eq(&self.storage),
2512+
n::net_version.eq(&ident.net_version),
2513+
n::genesis_block_hash.eq(ident.genesis_block_hash.hash_hex()),
2514+
n::head_block_hash.eq(None::<String>),
2515+
n::head_block_number.eq(None::<i64>),
2516+
n::head_block_cursor.eq(None::<String>),
2517+
))
2518+
.execute(conn)
2519+
.await?;
2520+
2521+
debug!(&self.logger, "Creating storage schema and tables"; "namespace" => &nsp);
2522+
self.storage.create(conn).await?;
2523+
2524+
Ok(())
2525+
}
2526+
.scope_boxed()
2527+
})
2528+
.await
2529+
}
2530+
24822531
pub async fn chain_head_pointers(
24832532
conn: &mut AsyncPgConnection,
24842533
) -> Result<HashMap<String, BlockPtr>, StoreError> {

0 commit comments

Comments
 (0)