Skip to content

Commit 4e8f7bf

Browse files
committed
store: Add shard-aware restore planning with RestoreMode and RestoreAction
1 parent ff0292c commit 4e8f7bf

3 files changed

Lines changed: 81 additions & 3 deletions

File tree

graph/src/components/store/err.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ pub enum StoreError {
7474
StatementTimeout,
7575
#[error("database constraint violated: {0}")]
7676
ConstraintViolation(String),
77+
/// The input to some operation, usually user input, makes it impossible
78+
/// to complete the operation. This must be a deterministic error
79+
#[error("{0}")]
80+
Input(String),
7781
}
7882

7983
// Convenience to report an internal error
@@ -132,6 +136,7 @@ impl Clone for StoreError {
132136
}
133137
Self::StatementTimeout => Self::StatementTimeout,
134138
Self::ConstraintViolation(arg0) => Self::ConstraintViolation(arg0.clone()),
139+
Self::Input(arg0) => Self::Input(arg0.clone()),
135140
}
136141
}
137142
}
@@ -187,7 +192,8 @@ impl StoreError {
187192
| UnknownAttribute(_, _)
188193
| InvalidIdentifier(_)
189194
| UnsupportedFilter(_, _)
190-
| ConstraintViolation(_) => true,
195+
| ConstraintViolation(_)
196+
| Input(_) => true,
191197

192198
// non-deterministic errors
193199
Unknown(_)

store/postgres/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub use self::notification_listener::NotificationSender;
6565
pub use self::pool::{
6666
AsyncPgConnection, ConnectionPool, ForeignServer, PoolCoordinator, PoolRole, ScopedFutureExt,
6767
};
68-
pub use self::primary::{db_version, UnusedDeployment};
68+
pub use self::primary::{db_version, RestoreMode, UnusedDeployment};
6969
pub use self::store::Store;
7070
pub use self::store_events::SubscriptionManager;
7171
pub use self::subgraph_store::{unused, DeploymentPlacer, Shard, SubgraphStore, PRIMARY_SHARD};

store/postgres/src/primary.rs

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,29 @@ mod queries {
776776
}
777777
}
778778

779+
/// How to handle conflicts when restoring a deployment.
780+
/// Constructed from CLI flags --replace, --add, --force.
781+
pub enum RestoreMode {
782+
/// No conflict flags given — error if deployment exists anywhere
783+
Default,
784+
/// --replace: drop and recreate in the target shard
785+
Replace,
786+
/// --add: create a copy in a shard that doesn't have the deployment
787+
Add,
788+
/// --force: restore no matter what (replace if in shard, add if not)
789+
Force,
790+
}
791+
792+
/// The action `plan_restore` decided on based on `RestoreMode` and current
793+
/// state.
794+
pub enum RestoreAction {
795+
/// Create a new site (active=true if fresh, active=false if copy
796+
/// exists elsewhere)
797+
Create { active: bool },
798+
/// Drop existing site in target shard, then recreate
799+
Replace { existing: Site },
800+
}
801+
779802
/// A wrapper for a database connection that provides access to functionality
780803
/// that works only on the primary database
781804
pub struct Connection {
@@ -1406,7 +1429,7 @@ impl Connection {
14061429
shard,
14071430
namespace,
14081431
network,
1409-
active: true,
1432+
active,
14101433
schema_version,
14111434
_creation_disallowed: (),
14121435
})
@@ -1546,6 +1569,55 @@ impl Connection {
15461569
.await
15471570
}
15481571

1572+
/// Determine what action to take when restoring `subgraph` into `shard`
1573+
/// based on the `mode` and the current state of the deployment.
1574+
pub async fn plan_restore(
1575+
&mut self,
1576+
shard: &Shard,
1577+
subgraph: &DeploymentHash,
1578+
mode: &RestoreMode,
1579+
) -> Result<RestoreAction, StoreError> {
1580+
let conn = &mut self.conn;
1581+
let in_shard = queries::find_site_in_shard(conn, subgraph, shard).await?;
1582+
let active = queries::find_active_site(conn, subgraph).await?;
1583+
1584+
match (in_shard, active, mode) {
1585+
// Deployment exists in target shard
1586+
(Some(existing), _, RestoreMode::Replace | RestoreMode::Force) => {
1587+
Ok(RestoreAction::Replace { existing })
1588+
}
1589+
(Some(_), _, RestoreMode::Default | RestoreMode::Add) => {
1590+
Err(StoreError::Input(format!(
1591+
"deployment {} already exists in shard {}; use --replace or --force",
1592+
subgraph,
1593+
shard.as_str()
1594+
)))
1595+
}
1596+
// Deployment does not exist in target shard but exists elsewhere
1597+
(None, Some(ref active_site), RestoreMode::Add | RestoreMode::Force) => {
1598+
let _ = active_site;
1599+
Ok(RestoreAction::Create { active: false })
1600+
}
1601+
(None, Some(active_site), RestoreMode::Default) => Err(StoreError::Input(format!(
1602+
"deployment {} already exists in shard {}; use --add --shard {} or --force",
1603+
subgraph,
1604+
active_site.shard.as_str(),
1605+
shard.as_str()
1606+
))),
1607+
(None, Some(_), RestoreMode::Replace) => Err(StoreError::Input(format!(
1608+
"deployment {} is not in shard {}; nothing to replace",
1609+
subgraph,
1610+
shard.as_str()
1611+
))),
1612+
// Deployment does not exist anywhere
1613+
(None, None, RestoreMode::Replace) => Err(StoreError::Input(format!(
1614+
"deployment {} does not exist; nothing to replace",
1615+
subgraph
1616+
))),
1617+
(None, None, _) => Ok(RestoreAction::Create { active: true }),
1618+
}
1619+
}
1620+
15491621
pub async fn locate_site(
15501622
&mut self,
15511623
locator: DeploymentLocator,

0 commit comments

Comments
 (0)