Use one transaction for each migration when completing

We previously used one transaction across all the migrations being
completed, which is not ideal from a locking perspective. Preferably we
want to keep the transactions as short-lived as possible to avoid
interfering with other queries. The next step will be to use one
transaction for each action.

To achieve this, we need to introduce a new intermediate state called
`Completing` which tracks which migrations have been completed so far.
This commit is contained in:
fabianlindfors 2022-01-11 23:17:40 +01:00
parent bd7aab5675
commit c0ec6ca031
2 changed files with 76 additions and 21 deletions

View File

@ -58,11 +58,18 @@ impl Reshape {
self.state = State::load(&mut self.db);
// Make sure no migration is in progress
if let state::Status::InProgress { migrations: _ } = &self.state.status {
if let state::Status::InProgress { .. } = &self.state.status {
println!("Migration already in progress, please complete using 'reshape complete'");
return Ok(());
}
if let state::Status::Completing { .. } = &self.state.status {
println!(
"Migration already in progress and has started completion, please finish using 'reshape complete'"
);
return Ok(());
}
// Determine which migrations need to be applied by comparing the provided migrations
// with the already applied ones stored in the state. This will throw an error if the
// two sets of migrations don't agree, for example if a new migration has been added
@ -175,8 +182,16 @@ impl Reshape {
pub fn complete_migration(&mut self) -> anyhow::Result<()> {
// Make sure a migration is in progress
let remaining_migrations = match &self.state.status {
state::Status::InProgress { migrations } => migrations,
let (remaining_migrations, starting_migration_index) = match self.state.status.clone() {
state::Status::InProgress { migrations } => {
// Move into the Completing state. Once in this state,
// the migration can't be aborted.
self.state.completing(migrations.clone(), 0);
self.state.save(&mut self.db).context("failed to save state")?;
(migrations, 0)
},
state::Status::Completing { migrations, current_migration_index } => (migrations, current_migration_index),
state::Status::Applying { migrations: _ } => {
return Err(anyhow!("a previous migration unexpectedly failed. Please run `reshape migrate` to try applying the migration again."))
}
@ -186,17 +201,9 @@ impl Reshape {
}
};
helpers::tear_down_helpers(&mut self.db).context("failed to tear down helpers")?;
// Run all the completion changes as a transaction to avoid incomplete updates
let mut transaction = self
.db
.transaction()
.context("failed to start transaction")?;
// Remove previous migration's schema
if let Some(current_migration) = &self.state.current_migration {
transaction
self.db
.run(&format!(
"DROP SCHEMA IF EXISTS {} CASCADE",
schema_name_for_migration(current_migration)
@ -205,6 +212,18 @@ impl Reshape {
}
for (migration_index, migration) in remaining_migrations.iter().enumerate() {
// Skip all the migrations that have already been completed
if migration_index < starting_migration_index {
continue;
}
// Run each completion as a separate transaction. We need atomicity
// to ensure the migration completion changes are run only once.
let mut transaction = self
.db
.transaction()
.context("failed to start transaction")?;
println!("Completing '{}':", migration.name);
for (action_index, action) in migration.actions.iter().enumerate() {
@ -225,20 +244,32 @@ impl Reshape {
}
}
// Update state with which migrations have been completed. By running this
// in a transaction, we guarantee that a migration is only completed once.
// We want to use a single transaction for each migration to keep the length
// of the transaction as short as possible.
self.state
.completing(remaining_migrations.clone(), migration_index + 1);
self.state
.save(&mut transaction)
.context("failed to save state after completing migration")?;
transaction
.commit()
.context("failed to commit transaction")?;
println!();
}
// Remove helpers which are no longer in use
helpers::tear_down_helpers(&mut self.db).context("failed to tear down helpers")?;
self.state
.complete()
.context("failed to update state as completed")?;
self.state
.save(&mut transaction)
.save(&mut self.db)
.context("failed to save state after setting as completed")?;
transaction
.commit()
.context("failed to apply transaction")?;
Ok(())
}
@ -327,6 +358,12 @@ impl Reshape {
let remaining_migrations = match &self.state.status {
Status::InProgress { migrations } => migrations,
Status::Applying { migrations } => migrations,
Status::Completing {
migrations: _,
current_migration_index: _,
} => {
return Err(anyhow!("Migration completion has already been started. Please run `reshape complete` again to finish it."));
}
Status::Idle => {
println!("No migration is in progress");
return Ok(());

View File

@ -12,7 +12,7 @@ pub struct State {
pub migrations: Vec<Migration>,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(tag = "status")]
pub enum Status {
#[serde(rename = "idle")]
@ -23,6 +23,12 @@ pub enum Status {
#[serde(rename = "in_progress")]
InProgress { migrations: Vec<Migration> },
#[serde(rename = "completing")]
Completing {
migrations: Vec<Migration>,
current_migration_index: usize,
},
}
impl State {
@ -64,12 +70,15 @@ impl State {
Ok(())
}
// Complete will change the status from InProgress to Idle
// Complete will change the status from Completing to Idle
pub fn complete(&mut self) -> anyhow::Result<()> {
let current_status = std::mem::replace(&mut self.status, Status::Idle);
match current_status {
Status::InProgress { mut migrations } => {
Status::Completing {
mut migrations,
current_migration_index: _,
} => {
let target_migration = migrations.last().unwrap().name.to_string();
self.migrations.append(&mut migrations);
self.current_migration = Some(target_migration);
@ -77,7 +86,9 @@ impl State {
_ => {
// Move old status back
self.status = current_status;
return Err(anyhow!("status "));
return Err(anyhow!(
"couldn't update state to be completed, not in Completing state"
));
}
}
@ -96,6 +107,13 @@ impl State {
};
}
pub fn completing(&mut self, migrations: Vec<Migration>, current_migration_index: usize) {
self.status = Status::Completing {
migrations,
current_migration_index,
}
}
pub fn get_remaining_migrations(
&self,
new_migrations: impl IntoIterator<Item = Migration>,