Use advisory lock to avoid conflicts between instances

This protects all database access using a Postgres advisory lock. This
way we can avoid problems when two instances of Reshape try to change
the same database at the same time.
This commit is contained in:
fabianlindfors 2022-01-14 11:25:32 +01:00
parent d28110611c
commit 88da0d2364
10 changed files with 528 additions and 481 deletions

View File

@ -1,5 +1,68 @@
use anyhow::anyhow;
use postgres::{types::ToSql, NoTls, Row}; use postgres::{types::ToSql, NoTls, Row};
// DbLocker wraps a regular DbConn, only allowing access using the
// `lock` method. This method will acquire the advisory lock before
// allowing access to the database, and then release it afterwards.
//
// We use advisory locks to avoid multiple Reshape instances working
// on the same database as the same time. DbLocker is the only way to
// get a DbConn which ensures that all database access is protected by
// a lock.
//
// Postgres docs on advisory locks:
// https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
pub struct DbLocker {
client: DbConn,
}
impl DbLocker {
// Advisory lock keys in Postgres are 64-bit integers.
// The key we use was chosen randomly.
const LOCK_KEY: i64 = 4036779288569897133;
pub fn connect(config: &postgres::Config) -> anyhow::Result<Self> {
let pg = config.connect(NoTls)?;
Ok(Self {
client: DbConn::new(pg),
})
}
pub fn lock(
&mut self,
f: impl FnOnce(&mut DbConn) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
self.acquire_lock()?;
let result = f(&mut self.client);
self.release_lock()?;
result
}
fn acquire_lock(&mut self) -> anyhow::Result<()> {
let success = self
.client
.query(&format!("SELECT pg_try_advisory_lock({})", Self::LOCK_KEY))?
.first()
.ok_or_else(|| anyhow!("unexpectedly failed when acquiring advisory lock"))
.map(|row| row.get::<'_, _, bool>(0))?;
if success {
Ok(())
} else {
Err(anyhow!("another instance of Reshape is already running"))
}
}
fn release_lock(&mut self) -> anyhow::Result<()> {
self.client
.query(&format!("SELECT pg_advisory_unlock({})", Self::LOCK_KEY))?
.first()
.ok_or_else(|| anyhow!("unexpectedly failed when releasing advisory lock"))?;
Ok(())
}
}
pub trait Conn { pub trait Conn {
fn run(&mut self, query: &str) -> anyhow::Result<()>; fn run(&mut self, query: &str) -> anyhow::Result<()>;
fn query(&mut self, query: &str) -> anyhow::Result<Vec<Row>>; fn query(&mut self, query: &str) -> anyhow::Result<Vec<Row>>;
@ -16,9 +79,8 @@ pub struct DbConn {
} }
impl DbConn { impl DbConn {
pub fn connect(config: &postgres::Config) -> anyhow::Result<DbConn> { fn new(client: postgres::Client) -> Self {
let client = config.connect(NoTls)?; DbConn { client }
Ok(DbConn { client })
} }
} }

View File

@ -5,7 +5,7 @@ use crate::{
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use colored::*; use colored::*;
use db::{Conn, DbConn}; use db::{Conn, DbConn, DbLocker};
use postgres::Config; use postgres::Config;
use schema::Table; use schema::Table;
@ -18,8 +18,7 @@ mod state;
pub use crate::state::State; pub use crate::state::State;
pub struct Reshape { pub struct Reshape {
pub state: State, db: DbLocker,
db: DbConn,
} }
impl Reshape { impl Reshape {
@ -45,480 +44,72 @@ impl Reshape {
} }
fn new_with_config(config: &Config) -> anyhow::Result<Reshape> { fn new_with_config(config: &Config) -> anyhow::Result<Reshape> {
let mut db = DbConn::connect(config)?; let db = DbLocker::connect(config)?;
let state = State::load(&mut db)?; Ok(Reshape { db })
Ok(Reshape { db, state })
} }
pub fn migrate<T>(&mut self, migrations: T) -> anyhow::Result<()> pub fn migrate(
where
T: IntoIterator<Item = Migration>,
{
self.state = State::load(&mut self.db)?;
// Make sure no migration is in progress
if let State::InProgress { .. } = &self.state {
println!("Migration already in progress, please complete using 'reshape complete'");
return Ok(());
}
if let State::Completing { .. } = &self.state {
println!(
"Migration already in progress and has started completion, please finish using 'reshape complete'"
);
return Ok(());
}
// Determine which migrations need to be applied by comparing the provided migrations
// with the already applied ones stored in the state. This will throw an error if the
// two sets of migrations don't agree, for example if a new migration has been added
// in between two existing ones.
let current_migration = state::current_migration(&mut self.db)?;
let remaining_migrations = state::remaining_migrations(&mut self.db, migrations)?;
if remaining_migrations.is_empty() {
println!("No migrations left to apply");
return Ok(());
}
// If we have already started applying some migrations we need to ensure that
// they are the same ones we want to apply now
if let State::Applying {
migrations: existing_migrations,
} = &self.state
{
if existing_migrations != &remaining_migrations {
return Err(anyhow!(
"a previous migration seems to have failed without cleaning up. Please run `reshape abort` and then run migrate again."
));
}
}
// Move to the "Applying" state which is necessary as we can't run the migrations
// and state update as a single transaction. If a migration unexpectedly fails without
// automatically aborting, this state saves us from dangling migrations. It forces the user
// to either run migrate again (which works as all migrations are idempotent) or abort.
self.state.applying(remaining_migrations.clone());
self.state.save(&mut self.db)?;
println!("Applying {} migrations\n", remaining_migrations.len());
helpers::set_up_helpers(&mut self.db, &current_migration)
.context("failed to set up helpers")?;
let mut new_schema = Schema::new();
let mut last_migration_index = usize::MAX;
let mut last_action_index = usize::MAX;
let mut result: anyhow::Result<()> = Ok(());
for (migration_index, migration) in remaining_migrations.iter().enumerate() {
println!("Migrating '{}':", migration.name);
last_migration_index = migration_index;
for (action_index, action) in migration.actions.iter().enumerate() {
last_action_index = action_index;
let description = action.describe();
print!(" + {} ", description);
let ctx = MigrationContext::new(migration_index, action_index);
result = action
.run(&ctx, &mut self.db, &new_schema)
.with_context(|| format!("failed to {}", description));
if result.is_ok() {
action.update_schema(&ctx, &mut new_schema);
println!("{}", "done".green());
} else {
println!("{}", "failed".red());
break;
}
}
println!();
}
// If a migration failed, we abort all the migrations that were applied
if let Err(err) = result {
println!("A migration failed, aborting migrations that have already been applied");
// Set to the Aborting state. This is to ensure that the failed
// migration is fully aborted and nothing is left dangling.
// If the abort is interrupted for any reason, the user can try again
// by running `reshape abort`.
self.state.aborting(
remaining_migrations.clone(),
last_migration_index + 1,
last_action_index + 1,
);
// Abort will only
self.abort()?;
return Err(err);
}
// Create schema and views for migration
let target_migration = remaining_migrations.last().unwrap().name.to_string();
self.create_schema_for_migration(&target_migration, &new_schema)
.with_context(|| {
format!("failed to create schema for migration {}", target_migration)
})?;
// Update state once migrations have been performed
self.state.in_progress(remaining_migrations);
self.state
.save(&mut self.db)
.context("failed to save in-progress state")?;
// If we started from a blank slate, we can finish the migration immediately
if current_migration.is_none() {
println!("Automatically completing migrations\n");
self.complete_migration()
.context("failed to automatically complete migrations")?;
println!("Migrations complete:");
println!(
" - Run '{}' from your application to use the latest schema",
schema_query_for_migration(&target_migration)
);
} else {
println!("Migrations have been applied and the new schema is ready for use:");
println!(
" - Run '{}' from your application to use the latest schema",
schema_query_for_migration(&target_migration)
);
println!(
" - Run 'reshape complete' once your application has been updated and the previous schema is no longer in use"
);
}
Ok(())
}
pub fn complete_migration(&mut self) -> anyhow::Result<()> {
// Make sure a migration is in progress
let (remaining_migrations, starting_migration_index, starting_action_index) = match self.state.clone() {
State::InProgress { migrations } => {
// Move into the Completing state. Once in this state,
// the migration can't be aborted and must be completed.
self.state.completing(migrations.clone(), 0, 0);
self.state.save(&mut self.db).context("failed to save state")?;
(migrations, 0, 0)
},
State::Completing {
migrations,
current_migration_index,
current_action_index
} => (migrations, current_migration_index, current_action_index),
State::Aborting { .. } => {
return Err(anyhow!("migration been aborted and can't be completed. Please finish using `reshape abort`."))
}
State::Applying { .. } => {
return Err(anyhow!("a previous migration unexpectedly failed. Please run `reshape migrate` to try applying the migration again."))
}
State::Idle => {
println!("No migration in progress");
return Ok(());
}
};
// Remove previous migration's schema
if let Some(current_migration) = &state::current_migration(&mut self.db)? {
self.db
.run(&format!(
"DROP SCHEMA IF EXISTS {} CASCADE",
schema_name_for_migration(current_migration)
))
.context("failed to remove previous migration's schema")?;
}
for (migration_index, migration) in remaining_migrations.iter().enumerate() {
// Skip all the migrations which have already been completed
if migration_index < starting_migration_index {
continue;
}
println!("Completing '{}':", migration.name);
for (action_index, action) in migration.actions.iter().enumerate() {
// Skip all actions which have already been completed
if migration_index == starting_migration_index
&& action_index < starting_action_index
{
continue;
}
let description = action.describe();
print!(" + {} ", description);
let ctx = MigrationContext::new(migration_index, action_index);
// Update state to indicate that this action has been completed.
// We won't save this new state until after the action has completed.
self.state.completing(
remaining_migrations.clone(),
migration_index + 1,
action_index + 1,
);
// This did_save check is necessary because of the borrow checker.
// The Transaction which might be returned from action.complete
// contains a mutable reference to self.db. We need the Transaction
// to be dropped before we can save the state using self.db instead,
// which we achieve here by limiting the lifetime of the Transaction
// with a new block.
let did_save = {
let result = action
.complete(&ctx, &mut self.db)
.with_context(|| format!("failed to complete migration {}", migration.name))
.with_context(|| format!("failed to complete action: {}", description));
let maybe_transaction = match result {
Ok(maybe_transaction) => {
println!("{}", "done".green());
maybe_transaction
}
Err(e) => {
println!("{}", "failed".red());
return Err(e);
}
};
// Update state with which migrations and actions have been completed.
// Each action can create and return a transaction if they need atomicity.
// We use this transaction to update the state to ensure the action only completes.
// once.
// We want to use a single transaction for each action to keep the length of
// the transaction as short as possible. Wherever possible, we don't want to
// use a transaction at all.
if let Some(mut transaction) = maybe_transaction {
self.state
.save(&mut transaction)
.context("failed to save state after completing action")?;
transaction
.commit()
.context("failed to commit transaction")?;
true
} else {
false
}
};
// If the action didn't return a transaction we save the state normally instead
if !did_save {
self.state
.save(&mut self.db)
.context("failed to save state after completing action")?;
}
}
println!();
}
// Remove helpers which are no longer in use
helpers::tear_down_helpers(&mut self.db).context("failed to tear down helpers")?;
self.state
.complete(&mut self.db)
.context("failed to update state as completed")?;
Ok(())
}
fn create_schema_for_migration(
&mut self, &mut self,
migration_name: &str, migrations: impl IntoIterator<Item = Migration>,
schema: &Schema,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
// Create schema for migration self.db.lock(|db| {
let schema_name = schema_name_for_migration(migration_name); let mut state = State::load(db)?;
self.db migrate(db, &mut state, migrations)
.run(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema_name)) })
.with_context(|| {
format!(
"failed to create schema {} for migration {}",
schema_name, migration_name
)
})?;
// Create views inside schema
for table in schema.get_tables(&mut self.db)? {
Self::create_view_for_table(&mut self.db, &table, &schema_name)?;
}
Ok(())
} }
fn create_view_for_table( pub fn complete(&mut self) -> anyhow::Result<()> {
db: &mut impl Conn, self.db.lock(|db| {
table: &Table, let mut state = State::load(db)?;
schema: &str, complete(db, &mut state)
) -> anyhow::Result<()> { })
let select_columns: Vec<String> = table
.columns
.iter()
.map(|column| {
format!(
r#"
"{real_name}" AS "{alias}"
"#,
real_name = column.real_name,
alias = column.name,
)
})
.collect();
db.run(&format!(
r#"
CREATE OR REPLACE VIEW {schema}."{view_name}" AS
SELECT {columns}
FROM "{table_name}"
"#,
schema = schema,
table_name = table.real_name,
view_name = table.name,
columns = select_columns.join(","),
))
.with_context(|| format!("failed to create view for table {}", table.name))?;
Ok(())
}
pub fn remove(&mut self) -> anyhow::Result<()> {
// Remove migration schemas and views
if let Some(current_migration) = &state::current_migration(&mut self.db)? {
self.db.run(&format!(
"DROP SCHEMA IF EXISTS {} CASCADE",
schema_name_for_migration(current_migration)
))?;
}
if let State::InProgress { migrations } = &self.state {
let target_migration = migrations.last().unwrap().name.to_string();
self.db.run(&format!(
"DROP SCHEMA IF EXISTS {} CASCADE",
schema_name_for_migration(&target_migration)
))?;
}
// Remove all tables
let schema = Schema::new();
for table in schema.get_tables(&mut self.db)? {
self.db.run(&format!(
r#"
DROP TABLE IF EXISTS "{}" CASCADE
"#,
table.real_name
))?;
}
// Reset state
self.state.clear(&mut self.db)?;
println!("Reshape and all data has been removed");
Ok(())
} }
pub fn abort(&mut self) -> anyhow::Result<()> { pub fn abort(&mut self) -> anyhow::Result<()> {
let (remaining_migrations, last_migration_index, last_action_index) = match self self.db.lock(|db| {
.state let mut state = State::load(db)?;
.clone() abort(db, &mut state)
{ })
State::InProgress { migrations } | State::Applying { migrations } => {
// Set to the Aborting state. Once this is done, the migration has to
// be fully aborted and can't be completed.
self.state.aborting(migrations.clone(), 0, 0);
self.state.save(&mut self.db)?;
(migrations, usize::MAX, usize::MAX)
}
State::Aborting {
migrations,
last_migration_index,
last_action_index,
} => (migrations, last_migration_index, last_action_index),
State::Completing { .. } => {
return Err(anyhow!("Migration completion has already been started. Please run `reshape complete` again to finish it."));
}
State::Idle => {
println!("No migration is in progress");
return Ok(());
}
};
// Remove new migration's schema
let target_migration = remaining_migrations.last().unwrap().name.to_string();
let schema_name = schema_name_for_migration(&target_migration);
self.db
.run(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema_name,))
.with_context(|| format!("failed to drop schema {}", schema_name))?;
// Abort all pending migrations
self.abort_migrations(
&remaining_migrations,
last_migration_index,
last_action_index,
)?;
helpers::tear_down_helpers(&mut self.db).context("failed to tear down helpers")?;
self.state = State::Idle;
self.state
.save(&mut self.db)
.context("failed to save state")?;
Ok(())
} }
fn abort_migrations( pub fn remove(&mut self) -> anyhow::Result<()> {
&mut self, self.db.lock(|db| {
migrations: &[Migration], let mut state = State::load(db)?;
upper_migration_index: usize,
upper_action_index: usize, // Remove migration schemas and views
) -> anyhow::Result<()> { if let Some(current_migration) = &state::current_migration(db)? {
// Abort all migrations in reverse order db.run(&format!(
for (migration_index, migration) in migrations.iter().enumerate().rev() { "DROP SCHEMA IF EXISTS {} CASCADE",
// Skip migrations which shouldn't be aborted schema_name_for_migration(current_migration)
// The reason can be that they have already been aborted or that ))?;
// the migration was never applied in the first place.
if migration_index >= upper_migration_index {
continue;
} }
print!("Aborting '{}' ", migration.name); if let State::InProgress { migrations } = &state {
let target_migration = migrations.last().unwrap().name.to_string();
for (action_index, action) in migration.actions.iter().enumerate().rev() { db.run(&format!(
// Skip actions which shouldn't be aborted "DROP SCHEMA IF EXISTS {} CASCADE",
// The reason can be that they have already been aborted or that schema_name_for_migration(&target_migration)
// the action was never applied in the first place. ))?;
if migration_index == upper_migration_index - 1
&& action_index >= upper_action_index
{
continue;
}
let ctx = MigrationContext::new(migration_index, action_index);
action
.abort(&ctx, &mut self.db)
.with_context(|| format!("failed to abort migration {}", migration.name))
.with_context(|| format!("failed to abort action: {}", action.describe()))?;
// Update state with which migrations and actions have been aborted.
// We don't need to run this in a transaction as aborts are idempotent.
self.state
.aborting(migrations.to_vec(), migration_index, action_index);
self.state
.save(&mut self.db)
.context("failed to save state")?;
} }
println!("{}", "done".green()); // Remove all tables
} let schema = Schema::new();
Ok(()) for table in schema.get_tables(db)? {
db.run(&format!(
r#"
DROP TABLE IF EXISTS "{}" CASCADE
"#,
table.real_name
))?;
}
// Reset state
state.clear(db)?;
println!("Reshape and all data has been removed");
Ok(())
})
} }
} }
@ -536,3 +127,397 @@ pub fn schema_query_for_migration(migration_name: &str) -> String {
fn schema_name_for_migration(migration_name: &str) -> String { fn schema_name_for_migration(migration_name: &str) -> String {
format!("migration_{}", migration_name) format!("migration_{}", migration_name)
} }
fn migrate(
db: &mut DbConn,
state: &mut State,
migrations: impl IntoIterator<Item = Migration>,
) -> anyhow::Result<()> {
// Make sure no migration is in progress
if let State::InProgress { .. } = &state {
println!("Migration already in progress, please complete using 'reshape complete'");
return Ok(());
}
if let State::Completing { .. } = &state {
println!(
"Migration already in progress and has started completion, please finish using 'reshape complete'"
);
return Ok(());
}
// Determine which migrations need to be applied by comparing the provided migrations
// with the already applied ones stored in the state. This will throw an error if the
// two sets of migrations don't agree, for example if a new migration has been added
// in between two existing ones.
let current_migration = state::current_migration(db)?;
let remaining_migrations = state::remaining_migrations(db, migrations)?;
if remaining_migrations.is_empty() {
println!("No migrations left to apply");
return Ok(());
}
// If we have already started applying some migrations we need to ensure that
// they are the same ones we want to apply now
if let State::Applying {
migrations: existing_migrations,
} = &state
{
if existing_migrations != &remaining_migrations {
return Err(anyhow!(
"a previous migration seems to have failed without cleaning up. Please run `reshape abort` and then run migrate again."
));
}
}
// Move to the "Applying" state which is necessary as we can't run the migrations
// and state update as a single transaction. If a migration unexpectedly fails without
// automatically aborting, this state saves us from dangling migrations. It forces the user
// to either run migrate again (which works as all migrations are idempotent) or abort.
state.applying(remaining_migrations.clone());
state.save(db)?;
println!("Applying {} migrations\n", remaining_migrations.len());
helpers::set_up_helpers(db, &current_migration).context("failed to set up helpers")?;
let mut new_schema = Schema::new();
let mut last_migration_index = usize::MAX;
let mut last_action_index = usize::MAX;
let mut result: anyhow::Result<()> = Ok(());
for (migration_index, migration) in remaining_migrations.iter().enumerate() {
println!("Migrating '{}':", migration.name);
last_migration_index = migration_index;
for (action_index, action) in migration.actions.iter().enumerate() {
last_action_index = action_index;
let description = action.describe();
print!(" + {} ", description);
let ctx = MigrationContext::new(migration_index, action_index);
result = action
.run(&ctx, db, &new_schema)
.with_context(|| format!("failed to {}", description));
if result.is_ok() {
action.update_schema(&ctx, &mut new_schema);
println!("{}", "done".green());
} else {
println!("{}", "failed".red());
break;
}
}
println!();
}
// If a migration failed, we abort all the migrations that were applied
if let Err(err) = result {
println!("A migration failed, aborting migrations that have already been applied");
// Set to the Aborting state. This is to ensure that the failed
// migration is fully aborted and nothing is left dangling.
// If the abort is interrupted for any reason, the user can try again
// by running `reshape abort`.
state.aborting(
remaining_migrations.clone(),
last_migration_index + 1,
last_action_index + 1,
);
// Abort will only
abort(db, state)?;
return Err(err);
}
// Create schema and views for migration
let target_migration = remaining_migrations.last().unwrap().name.to_string();
create_schema_for_migration(db, &target_migration, &new_schema)
.with_context(|| format!("failed to create schema for migration {}", target_migration))?;
// Update state once migrations have been performed
state.in_progress(remaining_migrations);
state.save(db).context("failed to save in-progress state")?;
// If we started from a blank slate, we can finish the migration immediately
if current_migration.is_none() {
println!("Automatically completing migrations\n");
complete(db, state).context("failed to automatically complete migrations")?;
println!("Migrations complete:");
println!(
" - Run '{}' from your application to use the latest schema",
schema_query_for_migration(&target_migration)
);
} else {
println!("Migrations have been applied and the new schema is ready for use:");
println!(
" - Run '{}' from your application to use the latest schema",
schema_query_for_migration(&target_migration)
);
println!(
" - Run 'reshape complete' once your application has been updated and the previous schema is no longer in use"
);
}
Ok(())
}
fn complete(db: &mut DbConn, state: &mut State) -> anyhow::Result<()> {
// Make sure a migration is in progress
let (remaining_migrations, starting_migration_index, starting_action_index) = match state.clone() {
State::InProgress { migrations } => {
// Move into the Completing state. Once in this state,
// the migration can't be aborted and must be completed.
state.completing(migrations.clone(), 0, 0);
state.save(db).context("failed to save state")?;
(migrations, 0, 0)
},
State::Completing {
migrations,
current_migration_index,
current_action_index
} => (migrations, current_migration_index, current_action_index),
State::Aborting { .. } => {
return Err(anyhow!("migration been aborted and can't be completed. Please finish using `reshape abort`."))
}
State::Applying { .. } => {
return Err(anyhow!("a previous migration unexpectedly failed. Please run `reshape migrate` to try applying the migration again."))
}
State::Idle => {
println!("No migration in progress");
return Ok(());
}
};
// Remove previous migration's schema
if let Some(current_migration) = &state::current_migration(db)? {
db.run(&format!(
"DROP SCHEMA IF EXISTS {} CASCADE",
schema_name_for_migration(current_migration)
))
.context("failed to remove previous migration's schema")?;
}
for (migration_index, migration) in remaining_migrations.iter().enumerate() {
// Skip all the migrations which have already been completed
if migration_index < starting_migration_index {
continue;
}
println!("Completing '{}':", migration.name);
for (action_index, action) in migration.actions.iter().enumerate() {
// Skip all actions which have already been completed
if migration_index == starting_migration_index && action_index < starting_action_index {
continue;
}
let description = action.describe();
print!(" + {} ", description);
let ctx = MigrationContext::new(migration_index, action_index);
// Update state to indicate that this action has been completed.
// We won't save this new state until after the action has completed.
state.completing(
remaining_migrations.clone(),
migration_index + 1,
action_index + 1,
);
// This did_save check is necessary because of the borrow checker.
// The Transaction which might be returned from action.complete
// contains a mutable reference to self.db. We need the Transaction
// to be dropped before we can save the state using self.db instead,
// which we achieve here by limiting the lifetime of the Transaction
// with a new block.
let did_save = {
let result = action
.complete(&ctx, db)
.with_context(|| format!("failed to complete migration {}", migration.name))
.with_context(|| format!("failed to complete action: {}", description));
let maybe_transaction = match result {
Ok(maybe_transaction) => {
println!("{}", "done".green());
maybe_transaction
}
Err(e) => {
println!("{}", "failed".red());
return Err(e);
}
};
// Update state with which migrations and actions have been completed.
// Each action can create and return a transaction if they need atomicity.
// We use this transaction to update the state to ensure the action only completes.
// once.
// We want to use a single transaction for each action to keep the length of
// the transaction as short as possible. Wherever possible, we don't want to
// use a transaction at all.
if let Some(mut transaction) = maybe_transaction {
state
.save(&mut transaction)
.context("failed to save state after completing action")?;
transaction
.commit()
.context("failed to commit transaction")?;
true
} else {
false
}
};
// If the action didn't return a transaction we save the state normally instead
if !did_save {
state
.save(db)
.context("failed to save state after completing action")?;
}
}
println!();
}
// Remove helpers which are no longer in use
helpers::tear_down_helpers(db).context("failed to tear down helpers")?;
state
.complete(db)
.context("failed to update state as completed")?;
Ok(())
}
fn abort(db: &mut DbConn, state: &mut State) -> anyhow::Result<()> {
let (remaining_migrations, last_migration_index, last_action_index) = match state.clone() {
State::InProgress { migrations } | State::Applying { migrations } => {
// Set to the Aborting state. Once this is done, the migration has to
// be fully aborted and can't be completed.
state.aborting(migrations.clone(), 0, 0);
state.save(db)?;
(migrations, usize::MAX, usize::MAX)
}
State::Aborting {
migrations,
last_migration_index,
last_action_index,
} => (migrations, last_migration_index, last_action_index),
State::Completing { .. } => {
return Err(anyhow!("Migration completion has already been started. Please run `reshape complete` again to finish it."));
}
State::Idle => {
println!("No migration is in progress");
return Ok(());
}
};
// Remove new migration's schema
let target_migration = remaining_migrations.last().unwrap().name.to_string();
let schema_name = schema_name_for_migration(&target_migration);
db.run(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema_name,))
.with_context(|| format!("failed to drop schema {}", schema_name))?;
// Abort all pending migrations
// Abort all migrations in reverse order
for (migration_index, migration) in remaining_migrations.iter().enumerate().rev() {
// Skip migrations which shouldn't be aborted
// The reason can be that they have already been aborted or that
// the migration was never applied in the first place.
if migration_index >= last_migration_index {
continue;
}
print!("Aborting '{}' ", migration.name);
for (action_index, action) in migration.actions.iter().enumerate().rev() {
// Skip actions which shouldn't be aborted
// The reason can be that they have already been aborted or that
// the action was never applied in the first place.
if migration_index == last_migration_index - 1 && action_index >= last_action_index {
continue;
}
let ctx = MigrationContext::new(migration_index, action_index);
action
.abort(&ctx, db)
.with_context(|| format!("failed to abort migration {}", migration.name))
.with_context(|| format!("failed to abort action: {}", action.describe()))?;
// Update state with which migrations and actions have been aborted.
// We don't need to run this in a transaction as aborts are idempotent.
state.aborting(remaining_migrations.to_vec(), migration_index, action_index);
state.save(db).context("failed to save state")?;
}
println!("{}", "done".green());
}
helpers::tear_down_helpers(db).context("failed to tear down helpers")?;
*state = State::Idle;
state.save(db).context("failed to save state")?;
Ok(())
}
fn create_schema_for_migration(
db: &mut DbConn,
migration_name: &str,
schema: &Schema,
) -> anyhow::Result<()> {
// Create schema for migration
let schema_name = schema_name_for_migration(migration_name);
db.run(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema_name))
.with_context(|| {
format!(
"failed to create schema {} for migration {}",
schema_name, migration_name
)
})?;
// Create views inside schema
for table in schema.get_tables(db)? {
create_view_for_table(db, &table, &schema_name)?;
}
Ok(())
}
fn create_view_for_table(db: &mut impl Conn, table: &Table, schema: &str) -> anyhow::Result<()> {
let select_columns: Vec<String> = table
.columns
.iter()
.map(|column| {
format!(
r#"
"{real_name}" AS "{alias}"
"#,
real_name = column.real_name,
alias = column.name,
)
})
.collect();
db.run(&format!(
r#"
CREATE OR REPLACE VIEW {schema}."{view_name}" AS
SELECT {columns}
FROM "{table_name}"
"#,
schema = schema,
table_name = table.real_name,
view_name = table.name,
columns = select_columns.join(","),
))
.with_context(|| format!("failed to create view for table {}", table.name))?;
Ok(())
}

View File

@ -72,14 +72,14 @@ fn run(opts: Opts) -> anyhow::Result<()> {
// Automatically complete migration if --complete flag is set // Automatically complete migration if --complete flag is set
if opts.complete { if opts.complete {
reshape.complete_migration()?; reshape.complete()?;
} }
Ok(()) Ok(())
} }
Command::Complete(opts) => { Command::Complete(opts) => {
let mut reshape = reshape_from_connection_options(&opts)?; let mut reshape = reshape_from_connection_options(&opts)?;
reshape.complete_migration() reshape.complete()
} }
Command::Remove(opts) => { Command::Remove(opts) => {
let mut reshape = reshape_from_connection_options(&opts)?; let mut reshape = reshape_from_connection_options(&opts)?;

View File

@ -112,7 +112,7 @@ fn add_column() {
(first_name.as_ref(), last_name.as_ref()) (first_name.as_ref(), last_name.as_ref())
); );
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
common::assert_cleaned_up(&mut new_db); common::assert_cleaned_up(&mut new_db);
} }
@ -205,7 +205,7 @@ fn add_column_nullable() {
.simple_query("INSERT INTO users (id, name) VALUES (4, 'Test Testsson'), (5, NULL)") .simple_query("INSERT INTO users (id, name) VALUES (4, 'Test Testsson'), (5, NULL)")
.unwrap(); .unwrap();
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
common::assert_cleaned_up(&mut new_db); common::assert_cleaned_up(&mut new_db);
} }
@ -288,6 +288,6 @@ fn add_column_with_default() {
.unwrap(); .unwrap();
assert_eq!("DEFAULT", name); assert_eq!("DEFAULT", name);
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
common::assert_cleaned_up(&mut new_db); common::assert_cleaned_up(&mut new_db);
} }

View File

@ -64,6 +64,6 @@ fn add_index() {
assert_eq!(vec![(true, true)], result); assert_eq!(vec![(true, true)], result);
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
common::assert_cleaned_up(&mut new_db); common::assert_cleaned_up(&mut new_db);
} }

View File

@ -99,7 +99,7 @@ fn alter_column_data() {
.unwrap(); .unwrap();
assert_eq!("test testsson", result.get::<_, &str>("name")); assert_eq!("test testsson", result.get::<_, &str>("name"));
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
common::assert_cleaned_up(&mut new_db); common::assert_cleaned_up(&mut new_db);
} }
@ -198,7 +198,7 @@ fn alter_column_set_not_null() {
.unwrap(); .unwrap();
assert_eq!("Jane Doe", result.get::<_, &str>("name")); assert_eq!("Jane Doe", result.get::<_, &str>("name"));
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
common::assert_cleaned_up(&mut new_db); common::assert_cleaned_up(&mut new_db);
} }
@ -280,7 +280,7 @@ fn alter_column_rename() {
.map(|row| row.get::<_, String>("full_name")) .map(|row| row.get::<_, String>("full_name"))
.eq(expected)); .eq(expected));
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
common::assert_cleaned_up(&mut new_db); common::assert_cleaned_up(&mut new_db);
} }
@ -402,7 +402,7 @@ fn alter_column_multiple() {
.unwrap(); .unwrap();
assert_eq!(48, result); assert_eq!(48, result);
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
common::assert_cleaned_up(&mut new_db); common::assert_cleaned_up(&mut new_db);
} }
@ -502,7 +502,7 @@ fn alter_column_default() {
.unwrap(); .unwrap();
assert_eq!("NEW DEFAULT", result.get::<_, &str>("name")); assert_eq!("NEW DEFAULT", result.get::<_, &str>("name"));
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
common::assert_cleaned_up(&mut new_db); common::assert_cleaned_up(&mut new_db);
} }
@ -568,7 +568,7 @@ fn alter_column_with_index() {
// Complete the second migration which should replace the existing column // Complete the second migration which should replace the existing column
// with the temporary one // with the temporary one
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
// Make sure index still exists // Make sure index still exists
let result: i64 = db let result: i64 = db

View File

@ -76,6 +76,6 @@ fn remove_column() {
result.as_ref().map(|row| row.get("name")) result.as_ref().map(|row| row.get("name"))
); );
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
common::assert_cleaned_up(&mut new_db); common::assert_cleaned_up(&mut new_db);
} }

View File

@ -85,7 +85,7 @@ fn remove_index() {
assert_eq!(vec![(true, true)], result); assert_eq!(vec![(true, true)], result);
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
// Ensure index has been removed after the migration is complete // Ensure index has been removed after the migration is complete
let count: i64 = db let count: i64 = db

View File

@ -49,6 +49,6 @@ fn remove_table() {
// Ensure the table is not accessible through the new schema // Ensure the table is not accessible through the new schema
assert!(new_db.query("SELECT id FROM users", &[]).is_err()); assert!(new_db.query("SELECT id FROM users", &[]).is_err());
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
common::assert_cleaned_up(&mut new_db); common::assert_cleaned_up(&mut new_db);
} }

View File

@ -75,6 +75,6 @@ fn rename_table() {
assert!(old_db.simple_query("SELECT id FROM customers").is_err()); assert!(old_db.simple_query("SELECT id FROM customers").is_err());
assert!(new_db.simple_query("SELECT id FROM users").is_err()); assert!(new_db.simple_query("SELECT id FROM users").is_err());
reshape.complete_migration().unwrap(); reshape.complete().unwrap();
common::assert_cleaned_up(&mut new_db); common::assert_cleaned_up(&mut new_db);
} }