diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 54752a3d25f..a529fad8583 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -938,7 +938,7 @@ impl Context { store } - fn pools(self) -> HashMap { + fn pools(&self) -> HashMap { let (_, pools) = self.store_and_pools(); pools } @@ -1327,7 +1327,9 @@ async fn main() -> anyhow::Result<()> { .await } Activate { deployment, shard } => { - commands::copy::activate(ctx.subgraph_store(), deployment, shard) + let pools = ctx.pools(); + let store = ctx.subgraph_store(); + commands::copy::activate(&pools, store, deployment, shard) } List => commands::copy::list(ctx.pools()), Status { dst } => commands::copy::status(ctx.pools(), &dst), diff --git a/node/src/manager/commands/copy.rs b/node/src/manager/commands/copy.rs index 620e27eef6c..38a9eb5c0e8 100644 --- a/node/src/manager/commands/copy.rs +++ b/node/src/manager/commands/copy.rs @@ -154,7 +154,12 @@ pub async fn create( Ok(()) } -pub fn activate(store: Arc, deployment: String, shard: String) -> Result<(), Error> { +pub fn activate( + pools: &HashMap, + store: Arc, + deployment: String, + shard: String, +) -> Result<(), Error> { let shard = Shard::new(shard)?; let deployment = DeploymentHash::new(deployment).map_err(|s| anyhow!("illegal deployment hash `{}`", s))?; @@ -167,8 +172,23 @@ pub fn activate(store: Arc, deployment: String, shard: String) -> shard ) })?; - store.activate(&deployment)?; - println!("activated copy {}", deployment); + + let (state, _, _) = match CopyState::find(pools, &shard, deployment.id.0)? { + Some((state, tables, on_sync)) => (state, tables, on_sync), + None => { + println!( + "copying is queued but has not started or no copy operation for {} exists", + deployment.id.0 + ); + return Ok(()); + } + }; + if state.finished_at.is_some() { + store.activate(&deployment)?; + println!("activated copy {}", deployment); + } else { + println!("copying is not finished yet"); + } Ok(()) }