From c0ec6ca0312aa2ee7831bdd031e162f487297561 Mon Sep 17 00:00:00 2001 From: fabianlindfors Date: Tue, 11 Jan 2022 23:17:40 +0100 Subject: [PATCH] Use one transaction for each migration when completing We previously used one transaction across all the migrations being completed, which is not ideal from a locking perspective. Preferably we want to keep the transactions as short-lived as possible to avoid interfering with other queries. The next step will be to use one transaction for each action. To achieve this, we need to introduce a new intermediate state called `Completing` which tracks which migrations have been completed so far. --- src/lib.rs | 71 +++++++++++++++++++++++++++++++++++++++------------- src/state.rs | 26 ++++++++++++++++--- 2 files changed, 76 insertions(+), 21 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d40545c..1c25e9b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,11 +58,18 @@ impl Reshape { self.state = State::load(&mut self.db); // Make sure no migration is in progress - if let state::Status::InProgress { migrations: _ } = &self.state.status { + if let state::Status::InProgress { .. } = &self.state.status { println!("Migration already in progress, please complete using 'reshape complete'"); return Ok(()); } + if let state::Status::Completing { .. } = &self.state.status { + println!( + "Migration already in progress and has started completion, please finish using 'reshape complete'" + ); + return Ok(()); + } + // Determine which migrations need to be applied by comparing the provided migrations // with the already applied ones stored in the state. This will throw an error if the // two sets of migrations don't agree, for example if a new migration has been added @@ -175,8 +182,16 @@ impl Reshape { pub fn complete_migration(&mut self) -> anyhow::Result<()> { // Make sure a migration is in progress - let remaining_migrations = match &self.state.status { - state::Status::InProgress { migrations } => migrations, + let (remaining_migrations, starting_migration_index) = match self.state.status.clone() { + state::Status::InProgress { migrations } => { + // Move into the Completing state. Once in this state, + // the migration can't be aborted. + self.state.completing(migrations.clone(), 0); + self.state.save(&mut self.db).context("failed to save state")?; + + (migrations, 0) + }, + state::Status::Completing { migrations, current_migration_index } => (migrations, current_migration_index), state::Status::Applying { migrations: _ } => { return Err(anyhow!("a previous migration unexpectedly failed. Please run `reshape migrate` to try applying the migration again.")) } @@ -186,17 +201,9 @@ impl Reshape { } }; - helpers::tear_down_helpers(&mut self.db).context("failed to tear down helpers")?; - - // Run all the completion changes as a transaction to avoid incomplete updates - let mut transaction = self - .db - .transaction() - .context("failed to start transaction")?; - // Remove previous migration's schema if let Some(current_migration) = &self.state.current_migration { - transaction + self.db .run(&format!( "DROP SCHEMA IF EXISTS {} CASCADE", schema_name_for_migration(current_migration) @@ -205,6 +212,18 @@ impl Reshape { } for (migration_index, migration) in remaining_migrations.iter().enumerate() { + // Skip all the migrations that have already been completed + if migration_index < starting_migration_index { + continue; + } + + // Run each completion as a separate transaction. We need atomicity + // to ensure the migration completion changes are run only once. + let mut transaction = self + .db + .transaction() + .context("failed to start transaction")?; + println!("Completing '{}':", migration.name); for (action_index, action) in migration.actions.iter().enumerate() { @@ -225,20 +244,32 @@ impl Reshape { } } + // Update state with which migrations have been completed. By running this + // in a transaction, we guarantee that a migration is only completed once. + // We want to use a single transaction for each migration to keep the length + // of the transaction as short as possible. + self.state + .completing(remaining_migrations.clone(), migration_index + 1); + self.state + .save(&mut transaction) + .context("failed to save state after completing migration")?; + transaction + .commit() + .context("failed to commit transaction")?; + println!(); } + // Remove helpers which are no longer in use + helpers::tear_down_helpers(&mut self.db).context("failed to tear down helpers")?; + self.state .complete() .context("failed to update state as completed")?; self.state - .save(&mut transaction) + .save(&mut self.db) .context("failed to save state after setting as completed")?; - transaction - .commit() - .context("failed to apply transaction")?; - Ok(()) } @@ -327,6 +358,12 @@ impl Reshape { let remaining_migrations = match &self.state.status { Status::InProgress { migrations } => migrations, Status::Applying { migrations } => migrations, + Status::Completing { + migrations: _, + current_migration_index: _, + } => { + return Err(anyhow!("Migration completion has already been started. Please run `reshape complete` again to finish it.")); + } Status::Idle => { println!("No migration is in progress"); return Ok(()); diff --git a/src/state.rs b/src/state.rs index a8b3073..d06e2cf 100644 --- a/src/state.rs +++ b/src/state.rs @@ -12,7 +12,7 @@ pub struct State { pub migrations: Vec, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug)] #[serde(tag = "status")] pub enum Status { #[serde(rename = "idle")] @@ -23,6 +23,12 @@ pub enum Status { #[serde(rename = "in_progress")] InProgress { migrations: Vec }, + + #[serde(rename = "completing")] + Completing { + migrations: Vec, + current_migration_index: usize, + }, } impl State { @@ -64,12 +70,15 @@ impl State { Ok(()) } - // Complete will change the status from InProgress to Idle + // Complete will change the status from Completing to Idle pub fn complete(&mut self) -> anyhow::Result<()> { let current_status = std::mem::replace(&mut self.status, Status::Idle); match current_status { - Status::InProgress { mut migrations } => { + Status::Completing { + mut migrations, + current_migration_index: _, + } => { let target_migration = migrations.last().unwrap().name.to_string(); self.migrations.append(&mut migrations); self.current_migration = Some(target_migration); @@ -77,7 +86,9 @@ impl State { _ => { // Move old status back self.status = current_status; - return Err(anyhow!("status ")); + return Err(anyhow!( + "couldn't update state to be completed, not in Completing state" + )); } } @@ -96,6 +107,13 @@ impl State { }; } + pub fn completing(&mut self, migrations: Vec, current_migration_index: usize) { + self.status = Status::Completing { + migrations, + current_migration_index, + } + } + pub fn get_remaining_migrations( &self, new_migrations: impl IntoIterator,