Skip to content

Commit 115e657

Browse files
committed
store: Link restored deployment to subgraph name
Replace the `assign_subgraph` call in `restore()` with `create_subgraph_version`, which creates the `subgraph_version` record, updates `subgraph.current_version`, and creates the assignment. This ensures the restored deployment is properly connected to its subgraph name rather than being left orphaned. Move `restore()` from `impl Inner` to `impl SubgraphStore` so it can use `self.cheap_clone()` for the `exists_and_synced` closure, matching the pattern used in `create_deployment_internal`.
1 parent a978be6 commit 115e657

2 files changed

Lines changed: 143 additions & 125 deletions

File tree

node/src/manager/commands/restore.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::time::Instant;
33

44
use console::style;
55
use graph::components::store::RestoreReporter;
6+
use graph::env::ENV_VARS;
67
use graph::{bail, prelude::anyhow::Result};
78
use indicatif::ProgressBar;
89

@@ -213,9 +214,18 @@ pub async fn run(
213214

214215
let shard = shard.map(Shard::new).transpose()?;
215216

217+
let version_switching_mode = ENV_VARS.subgraph_version_switching_mode;
218+
216219
let mut reporter = Box::new(RestoreProgress::new());
217220
subgraph_store
218-
.restore(&directory, shard, name, mode, &mut *reporter)
221+
.restore(
222+
&directory,
223+
shard,
224+
name,
225+
mode,
226+
version_switching_mode,
227+
&mut *reporter,
228+
)
219229
.await?;
220230
println!(
221231
"Restored {} into {} in shard {}",

store/postgres/src/subgraph_store.rs

Lines changed: 132 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,138 @@ impl SubgraphStore {
486486
self.create_deployment_internal(name, schema, deployment, node_id, network_name, mode, true)
487487
.await
488488
}
489+
490+
pub async fn restore(
491+
&self,
492+
dir: &std::path::Path,
493+
shard: Option<Shard>,
494+
name: Option<SubgraphName>,
495+
mode: RestoreMode,
496+
version_switching_mode: SubgraphVersionSwitchingMode,
497+
reporter: &mut dyn RestoreReporter,
498+
) -> Result<(), StoreError> {
499+
use crate::relational::dump::Metadata;
500+
501+
let metadata_path = dir.join("metadata.json");
502+
let metadata = Metadata::from_file(&metadata_path)?;
503+
504+
// Resolve the subgraph name for deployment rule matching. If not
505+
// supplied, look up an existing name from the DB; error if none.
506+
let name = match name {
507+
Some(name) => {
508+
// Validate that the named subgraph exists. Without this
509+
// check the deployment would be created but never linked
510+
// to a name, leaving it orphaned.
511+
if !self.mirror.subgraph_exists(&name).await? {
512+
return Err(StoreError::Input(format!(
513+
"subgraph `{name}` does not exist; \
514+
create it first with `graphman create {name}`"
515+
)));
516+
}
517+
name
518+
}
519+
None => {
520+
let names = self
521+
.mirror
522+
.subgraphs_by_deployment_hash(metadata.deployment.as_str())
523+
.await?;
524+
let (name, _) = names.into_iter().next().ok_or_else(|| {
525+
StoreError::Input(
526+
"no subgraph name found for this deployment; use --name to specify one"
527+
.into(),
528+
)
529+
})?;
530+
SubgraphName::new(name).map_err(|n| {
531+
StoreError::InternalError(format!("invalid subgraph name `{n}` in database"))
532+
})?
533+
}
534+
};
535+
536+
// Use deployment rules to determine which node should index this
537+
// deployment and how to place it.
538+
let (placed_shard, node) = self
539+
.place(&name, &metadata.network, DEFAULT_NODE_ID.clone())
540+
.await?;
541+
let shard = shard.unwrap_or(placed_shard);
542+
543+
// Validate that the target shard exists before making any DB changes
544+
self.stores
545+
.get(&shard)
546+
.ok_or_else(|| StoreError::UnknownShard(shard.to_string()))?;
547+
548+
let mut pconn = self.primary_conn().await?;
549+
let action = pconn
550+
.plan_restore(&shard, &metadata.deployment, &mode)
551+
.await?;
552+
553+
// Determine schema_version the same way allocate_site does
554+
let schema_version = match metadata.graft_base.as_ref() {
555+
Some(graft_base) => {
556+
let base_site = pconn.find_active_site(graft_base).await?.ok_or_else(|| {
557+
StoreError::DeploymentNotFound("graft_base not found".to_string())
558+
})?;
559+
base_site.schema_version
560+
}
561+
None => DeploymentSchemaVersion::LATEST,
562+
};
563+
564+
let site = match action {
565+
RestoreAction::Create { active } => {
566+
pconn
567+
.create_site(
568+
shard,
569+
metadata.deployment.clone(),
570+
metadata.network.clone(),
571+
schema_version,
572+
active,
573+
)
574+
.await?
575+
}
576+
RestoreAction::Replace { existing } => {
577+
let was_active = existing.active;
578+
let existing = Arc::new(existing);
579+
let store = self.for_site(&existing)?;
580+
store.drop_deployment(&existing).await?;
581+
pconn.drop_site(&existing).await?;
582+
// Drop and re-acquire the primary connection to avoid pool
583+
// deadlock: drop_deployment above used a separate connection
584+
// from the same pool, and create_site below needs one too.
585+
drop(pconn);
586+
let mut pconn = self.primary_conn().await?;
587+
pconn
588+
.create_site(
589+
shard,
590+
metadata.deployment.clone(),
591+
metadata.network.clone(),
592+
schema_version,
593+
was_active,
594+
)
595+
.await?
596+
}
597+
};
598+
599+
let site = Arc::new(site);
600+
let store = self.for_site(&site)?;
601+
store
602+
.restore(site.cheap_clone(), dir, &metadata, &mut *reporter)
603+
.await?;
604+
605+
// Link the restored deployment to the subgraph name and assign it
606+
// to the node determined by deployment rules
607+
let mut pconn = self.primary_conn().await?;
608+
let subgraph_store = self.cheap_clone();
609+
let exists_and_synced = async move |id: &DeploymentHash| {
610+
let (store, _) = subgraph_store.store(id).await?;
611+
store.deployment_exists_and_synced(id).await
612+
};
613+
let changes = pconn
614+
.create_subgraph_version(name, &site, node, version_switching_mode, exists_and_synced)
615+
.await?;
616+
let event = StoreEvent::new(changes);
617+
pconn.send_store_event(&self.sender, &event).await?;
618+
619+
Ok(())
620+
}
489621
}
490622

491623
impl std::ops::Deref for SubgraphStore {
@@ -1451,130 +1583,6 @@ impl Inner {
14511583

14521584
store.dump(site, directory, reporter).await
14531585
}
1454-
1455-
pub async fn restore(
1456-
&self,
1457-
dir: &std::path::Path,
1458-
shard: Option<Shard>,
1459-
name: Option<SubgraphName>,
1460-
mode: RestoreMode,
1461-
reporter: &mut dyn RestoreReporter,
1462-
) -> Result<(), StoreError> {
1463-
use crate::relational::dump::Metadata;
1464-
1465-
let metadata_path = dir.join("metadata.json");
1466-
let metadata = Metadata::from_file(&metadata_path)?;
1467-
1468-
// Resolve the subgraph name for deployment rule matching. If not
1469-
// supplied, look up an existing name from the DB; error if none.
1470-
let name = match name {
1471-
Some(name) => {
1472-
// Validate that the named subgraph exists. Without this
1473-
// check the deployment would be created but never linked
1474-
// to a name, leaving it orphaned.
1475-
if !self.mirror.subgraph_exists(&name).await? {
1476-
return Err(StoreError::Input(format!(
1477-
"subgraph `{name}` does not exist; \
1478-
create it first with `graphman create {name}`"
1479-
)));
1480-
}
1481-
name
1482-
}
1483-
None => {
1484-
let names = self
1485-
.mirror
1486-
.subgraphs_by_deployment_hash(metadata.deployment.as_str())
1487-
.await?;
1488-
let (name, _) = names.into_iter().next().ok_or_else(|| {
1489-
StoreError::Input(
1490-
"no subgraph name found for this deployment; use --name to specify one"
1491-
.into(),
1492-
)
1493-
})?;
1494-
SubgraphName::new(name).map_err(|n| {
1495-
StoreError::InternalError(format!("invalid subgraph name `{n}` in database"))
1496-
})?
1497-
}
1498-
};
1499-
1500-
// Use deployment rules to determine which node should index this
1501-
// deployment and how to place it.
1502-
let (placed_shard, node) = self
1503-
.place(&name, &metadata.network, DEFAULT_NODE_ID.clone())
1504-
.await?;
1505-
let shard = shard.unwrap_or(placed_shard);
1506-
1507-
// Validate that the target shard exists before making any DB changes
1508-
self.stores
1509-
.get(&shard)
1510-
.ok_or_else(|| StoreError::UnknownShard(shard.to_string()))?;
1511-
1512-
let mut pconn = self.primary_conn().await?;
1513-
let action = pconn
1514-
.plan_restore(&shard, &metadata.deployment, &mode)
1515-
.await?;
1516-
1517-
// Determine schema_version the same way allocate_site does
1518-
let schema_version = match metadata.graft_base.as_ref() {
1519-
Some(graft_base) => {
1520-
let base_site = pconn.find_active_site(graft_base).await?.ok_or_else(|| {
1521-
StoreError::DeploymentNotFound("graft_base not found".to_string())
1522-
})?;
1523-
base_site.schema_version
1524-
}
1525-
None => DeploymentSchemaVersion::LATEST,
1526-
};
1527-
1528-
let site = match action {
1529-
RestoreAction::Create { active } => {
1530-
pconn
1531-
.create_site(
1532-
shard,
1533-
metadata.deployment.clone(),
1534-
metadata.network.clone(),
1535-
schema_version,
1536-
active,
1537-
)
1538-
.await?
1539-
}
1540-
RestoreAction::Replace { existing } => {
1541-
let was_active = existing.active;
1542-
let existing = Arc::new(existing);
1543-
let store = self.for_site(&existing)?;
1544-
store.drop_deployment(&existing).await?;
1545-
pconn.drop_site(&existing).await?;
1546-
// Drop and re-acquire the primary connection to avoid pool
1547-
// deadlock: drop_deployment above used a separate connection
1548-
// from the same pool, and create_site below needs one too.
1549-
drop(pconn);
1550-
let mut pconn = self.primary_conn().await?;
1551-
pconn
1552-
.create_site(
1553-
shard,
1554-
metadata.deployment.clone(),
1555-
metadata.network.clone(),
1556-
schema_version,
1557-
was_active,
1558-
)
1559-
.await?
1560-
}
1561-
};
1562-
1563-
let site = Arc::new(site);
1564-
let store = self.for_site(&site)?;
1565-
store
1566-
.restore(site.cheap_clone(), dir, &metadata, &mut *reporter)
1567-
.await?;
1568-
1569-
// Assign the restored deployment to the node determined by
1570-
// deployment rules
1571-
let mut pconn = self.primary_conn().await?;
1572-
let changes = pconn.assign_subgraph(&site, &node).await?;
1573-
let event = StoreEvent::new(changes);
1574-
pconn.send_store_event(&self.sender, &event).await?;
1575-
1576-
Ok(())
1577-
}
15781586
}
15791587

15801588
const STATE_ENS_NOT_CHECKED: u8 = 0;

0 commit comments

Comments
 (0)