From 12c91aa3731a3d6b5a385ec6e4575a05e0e1d87b Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 22 Apr 2025 12:43:49 -0700 Subject: [PATCH] core, graph, store: Pause failed subgraphs instead of unassigning Unassigning subgraphs has a number of unwanted side-effects, e.g., canceling copies. Pausing is safer alternative to avoid spending cycles on a subgraph we know will not make progress. --- core/src/subgraph/runner.rs | 2 +- graph/src/components/store/traits.rs | 2 +- store/postgres/src/writable.rs | 15 +++++++++++++-- store/test-store/tests/graph/entity_cache.rs | 2 +- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index fcd8fa30fbb..a8f3eeed441 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -527,7 +527,7 @@ where let store = &self.inputs.store; if !ENV_VARS.disable_fail_fast && !store.is_deployment_synced() { store - .unassign_subgraph() + .pause_subgraph() .map_err(|e| ProcessingError::Unknown(e.into()))?; // Use `Canceled` to avoiding setting the subgraph health to failed, an error was diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 73cb22269fe..063ca4aa010 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -395,7 +395,7 @@ pub trait WritableStore: ReadStore + DeploymentCursorTracker { /// Cheap, cached operation. fn is_deployment_synced(&self) -> bool; - fn unassign_subgraph(&self) -> Result<(), StoreError>; + fn pause_subgraph(&self) -> Result<(), StoreError>; /// Load the dynamic data sources for the given deployment async fn load_dynamic_data_sources( diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 628b1741e24..74b516433b6 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -358,6 +358,17 @@ impl SyncStore { }) } + fn pause_subgraph(&self, site: &Site) -> Result<(), StoreError> { + retry::forever(&self.logger, "unassign_subgraph", || { + let mut pconn = self.store.primary_conn()?; + pconn.transaction(|conn| -> Result<_, StoreError> { + let mut pconn = primary::Connection::new(conn); + let changes = pconn.pause_subgraph(site)?; + self.store.send_store_event(&StoreEvent::new(changes)) + }) + }) + } + async fn load_dynamic_data_sources( &self, block: BlockNumber, @@ -1722,8 +1733,8 @@ impl WritableStoreTrait for WritableStore { self.is_deployment_synced.load(Ordering::SeqCst) } - fn unassign_subgraph(&self) -> Result<(), StoreError> { - self.store.unassign_subgraph(&self.store.site) + fn pause_subgraph(&self) -> Result<(), StoreError> { + self.store.pause_subgraph(&self.store.site) } async fn load_dynamic_data_sources( diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index c4353ffb63b..cf9bc3faffa 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -141,7 +141,7 @@ impl WritableStore for MockStore { unimplemented!() } - fn unassign_subgraph(&self) -> Result<(), StoreError> { + fn pause_subgraph(&self) -> Result<(), StoreError> { unimplemented!() }