Use one transaction for each action during abort

This is a continuation on the last two commits. We want to keep
transactions as short-lived as possible when performing schema changes.
This commit is contained in:
fabianlindfors 2022-01-12 00:06:54 +01:00
parent e27b5d2a23
commit 8a130490a0
2 changed files with 129 additions and 36 deletions

View File

@ -107,14 +107,17 @@ impl Reshape {
.context("failed to set up helpers")?;
let mut new_schema = Schema::new();
let mut processed_migrations: &[Migration] = &[];
let mut last_migration_index = usize::MAX;
let mut last_action_index = usize::MAX;
let mut result: anyhow::Result<()> = Ok(());
for (migration_index, migration) in remaining_migrations.iter().enumerate() {
println!("Migrating '{}':", migration.name);
processed_migrations = &remaining_migrations[..migration_index + 1];
last_migration_index = migration_index;
for (action_index, action) in migration.actions.iter().enumerate() {
last_action_index = action_index;
let description = action.describe();
print!(" + {} ", description);
@ -138,7 +141,23 @@ impl Reshape {
// If a migration failed, we abort all the migrations that were applied
if let Err(err) = result {
println!("A migration failed, aborting migrations that have already been applied");
abort_migrations(&mut self.db, processed_migrations)?;
// Set to the Aborting state. This is to ensure that the failed
// migration is fully aborted and nothing is left dangling.
// If the abort is interrupted for any reason, the user can try again
// by running `reshape abort`.
self.state.aborting(
remaining_migrations.clone(),
last_migration_index + 1,
last_action_index + 1,
);
self.state.save(&mut self.db)?;
self.abort_migrations(
&remaining_migrations,
last_migration_index + 1,
last_action_index + 1,
)?;
return Err(err);
}
@ -196,7 +215,10 @@ impl Reshape {
current_migration_index,
current_action_index
} => (migrations, current_migration_index, current_action_index),
state::Status::Applying { migrations: _ } => {
state::Status::Aborting { .. } => {
return Err(anyhow!("migration been aborted and can't be completed. Please finish using `reshape abort`."))
}
state::Status::Applying { .. } => {
return Err(anyhow!("a previous migration unexpectedly failed. Please run `reshape migrate` to try applying the migration again."))
}
state::Status::Idle => {
@ -369,9 +391,24 @@ impl Reshape {
}
pub fn abort(&mut self) -> anyhow::Result<()> {
let remaining_migrations = match &self.state.status {
Status::InProgress { migrations } => migrations,
Status::Applying { migrations } => migrations,
let (remaining_migrations, last_migration_index, last_action_index) = match self
.state
.status
.clone()
{
Status::InProgress { migrations } | Status::Applying { migrations } => {
// Set to the Aborting state. Once this is done, the migration has to
// be fully aborted and can't be completed.
self.state.aborting(migrations.clone(), 0, 0);
self.state.save(&mut self.db)?;
(migrations, usize::MAX, usize::MAX)
}
Status::Aborting {
migrations,
last_migration_index,
last_action_index,
} => (migrations, last_migration_index, last_action_index),
Status::Completing { .. } => {
return Err(anyhow!("Migration completion has already been started. Please run `reshape complete` again to finish it."));
}
@ -380,53 +417,89 @@ impl Reshape {
return Ok(());
}
};
let target_migration = remaining_migrations.last().unwrap().name.to_string();
helpers::tear_down_helpers(&mut self.db).context("failed to tear down helpers")?;
// Run all the abort changes as a transaction to avoid incomplete changes
let mut transaction = self
.db
.transaction()
.context("failed to start transaction")?;
// Remove new migration's schema
let target_migration = remaining_migrations.last().unwrap().name.to_string();
let schema_name = schema_name_for_migration(&target_migration);
transaction
self.db
.run(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema_name,))
.with_context(|| format!("failed to drop schema {}", schema_name))?;
// Abort all pending migrations
abort_migrations(&mut transaction, remaining_migrations)?;
self.abort_migrations(
&remaining_migrations,
last_migration_index,
last_action_index,
)?;
helpers::tear_down_helpers(&mut self.db).context("failed to tear down helpers")?;
self.state.status = state::Status::Idle;
self.state
.save(&mut transaction)
.save(&mut self.db)
.context("failed to save state")?;
transaction
.commit()
.context("failed to commit transaction")?;
Ok(())
}
}
fn abort_migrations(db: &mut dyn Conn, migrations: &[Migration]) -> anyhow::Result<()> {
// Abort all migrations in reverse order
for (migration_index, migration) in migrations.iter().enumerate().rev() {
print!("Aborting '{}' ", migration.name);
fn abort_migrations(
&mut self,
migrations: &[Migration],
upper_migration_index: usize,
upper_action_index: usize,
) -> anyhow::Result<()> {
// Abort all migrations in reverse order
for (migration_index, migration) in migrations.iter().enumerate().rev() {
// Skip migrations which shouldn't be aborted
// The reason can be that they have already been aborted or that
// the migration was never applied in the first place.
if migration_index >= upper_migration_index {
continue;
}
for (action_index, action) in migration.actions.iter().enumerate().rev() {
let ctx = MigrationContext::new(migration_index, action_index);
action
.abort(&ctx, db)
.with_context(|| format!("failed to abort migration {}", migration.name))
.with_context(|| format!("failed to abort action: {}", action.describe()))?;
print!("Aborting '{}' ", migration.name);
for (action_index, action) in migration.actions.iter().enumerate().rev() {
// Skip actions which shouldn't be aborted
// The reason can be that they have already been aborted or that
// the action was never applied in the first place.
if migration_index == upper_migration_index - 1
&& action_index >= upper_action_index
{
continue;
}
// Run each action abort as a separate transaction. We need atomicity
// to ensure the abort changes are run only once for each action.
let mut transaction = self
.db
.transaction()
.context("failed to start transaction")?;
let ctx = MigrationContext::new(migration_index, action_index);
action
.abort(&ctx, &mut transaction)
.with_context(|| format!("failed to abort migration {}", migration.name))
.with_context(|| format!("failed to abort action: {}", action.describe()))?;
// Update state with which migrations and actions have been aborted. By running this
// in a transaction, we guarantee that an action is only aborted once.
// We want to use a single transaction for each action to keep the length
// of the transaction as short as possible.
self.state
.aborting(migrations.to_vec(), migration_index, action_index);
self.state
.save(&mut transaction)
.context("failed to save state")?;
transaction
.commit()
.context("failed to commit transaction")?;
}
println!("{}", "done".green());
}
println!("{}", "done".green());
Ok(())
}
Ok(())
}
pub fn latest_schema_from_migrations(migrations: &[Migration]) -> Option<String> {

View File

@ -30,6 +30,13 @@ pub enum Status {
current_migration_index: usize,
current_action_index: usize,
},
#[serde(rename = "aborting")]
Aborting {
migrations: Vec<Migration>,
last_migration_index: usize,
last_action_index: usize,
},
}
impl State {
@ -118,6 +125,19 @@ impl State {
}
}
pub fn aborting(
&mut self,
migrations: Vec<Migration>,
last_migration_index: usize,
last_action_index: usize,
) {
self.status = Status::Aborting {
migrations,
last_migration_index,
last_action_index,
}
}
pub fn get_remaining_migrations(
&self,
new_migrations: impl IntoIterator<Item = Migration>,