Use one transaction for each action during completion

This builds on the previous commit to make the transactions even more
granular when completing a migration. Keeping the transaction span short
is important to avoid interfering with other queries.
This commit is contained in:
fabianlindfors 2022-01-11 23:25:55 +01:00
parent c0ec6ca031
commit e27b5d2a23
2 changed files with 50 additions and 35 deletions

View File

@ -182,16 +182,20 @@ impl Reshape {
pub fn complete_migration(&mut self) -> anyhow::Result<()> {
// Make sure a migration is in progress
let (remaining_migrations, starting_migration_index) = match self.state.status.clone() {
let (remaining_migrations, starting_migration_index, starting_action_index) = match self.state.status.clone() {
state::Status::InProgress { migrations } => {
// Move into the Completing state. Once in this state,
// the migration can't be aborted.
self.state.completing(migrations.clone(), 0);
// the migration can't be aborted and must be completed.
self.state.completing(migrations.clone(), 0, 0);
self.state.save(&mut self.db).context("failed to save state")?;
(migrations, 0)
(migrations, 0, 0)
},
state::Status::Completing { migrations, current_migration_index } => (migrations, current_migration_index),
state::Status::Completing {
migrations,
current_migration_index,
current_action_index
} => (migrations, current_migration_index, current_action_index),
state::Status::Applying { migrations: _ } => {
return Err(anyhow!("a previous migration unexpectedly failed. Please run `reshape migrate` to try applying the migration again."))
}
@ -212,21 +216,28 @@ impl Reshape {
}
for (migration_index, migration) in remaining_migrations.iter().enumerate() {
// Skip all the migrations that have already been completed
// Skip all the migrations which have already been completed
if migration_index < starting_migration_index {
continue;
}
// Run each completion as a separate transaction. We need atomicity
// to ensure the migration completion changes are run only once.
let mut transaction = self
.db
.transaction()
.context("failed to start transaction")?;
println!("Completing '{}':", migration.name);
for (action_index, action) in migration.actions.iter().enumerate() {
// Skip all actions which have already been completed
if migration_index == starting_migration_index
&& action_index < starting_action_index
{
continue;
}
// Run each action completion as a separate transaction. We need atomicity
// to ensure the completion changes are run only once for each action.
let mut transaction = self
.db
.transaction()
.context("failed to start transaction")?;
let description = action.describe();
print!(" + {} ", description);
@ -242,20 +253,23 @@ impl Reshape {
println!("{}", "failed".red());
return result;
}
}
// Update state with which migrations have been completed. By running this
// in a transaction, we guarantee that a migration is only completed once.
// We want to use a single transaction for each migration to keep the length
// of the transaction as short as possible.
self.state
.completing(remaining_migrations.clone(), migration_index + 1);
self.state
.save(&mut transaction)
.context("failed to save state after completing migration")?;
transaction
.commit()
.context("failed to commit transaction")?;
// Update state with which migrations and actions have been completed. By running this
// in a transaction, we guarantee that an action is only completed once.
// We want to use a single transaction for each action to keep the length
// of the transaction as short as possible.
self.state.completing(
remaining_migrations.clone(),
migration_index + 1,
action_index + 1,
);
self.state
.save(&mut transaction)
.context("failed to save state after completing action")?;
transaction
.commit()
.context("failed to commit transaction")?;
}
println!();
}
@ -358,10 +372,7 @@ impl Reshape {
let remaining_migrations = match &self.state.status {
Status::InProgress { migrations } => migrations,
Status::Applying { migrations } => migrations,
Status::Completing {
migrations: _,
current_migration_index: _,
} => {
Status::Completing { .. } => {
return Err(anyhow!("Migration completion has already been started. Please run `reshape complete` again to finish it."));
}
Status::Idle => {

View File

@ -28,6 +28,7 @@ pub enum Status {
Completing {
migrations: Vec<Migration>,
current_migration_index: usize,
current_action_index: usize,
},
}
@ -75,10 +76,7 @@ impl State {
let current_status = std::mem::replace(&mut self.status, Status::Idle);
match current_status {
Status::Completing {
mut migrations,
current_migration_index: _,
} => {
Status::Completing { mut migrations, .. } => {
let target_migration = migrations.last().unwrap().name.to_string();
self.migrations.append(&mut migrations);
self.current_migration = Some(target_migration);
@ -107,10 +105,16 @@ impl State {
};
}
pub fn completing(&mut self, migrations: Vec<Migration>, current_migration_index: usize) {
pub fn completing(
&mut self,
migrations: Vec<Migration>,
current_migration_index: usize,
current_action_index: usize,
) {
self.status = Status::Completing {
migrations,
current_migration_index,
current_action_index,
}
}