diff --git a/src/db.rs b/src/db.rs index 4e298e2..c8bcdfc 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,5 +1,68 @@ +use anyhow::anyhow; use postgres::{types::ToSql, NoTls, Row}; +// DbLocker wraps a regular DbConn, only allowing access using the +// `lock` method. This method will acquire the advisory lock before +// allowing access to the database, and then release it afterwards. +// +// We use advisory locks to avoid multiple Reshape instances working +// on the same database as the same time. DbLocker is the only way to +// get a DbConn which ensures that all database access is protected by +// a lock. +// +// Postgres docs on advisory locks: +// https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS +pub struct DbLocker { + client: DbConn, +} + +impl DbLocker { + // Advisory lock keys in Postgres are 64-bit integers. + // The key we use was chosen randomly. + const LOCK_KEY: i64 = 4036779288569897133; + + pub fn connect(config: &postgres::Config) -> anyhow::Result { + let pg = config.connect(NoTls)?; + Ok(Self { + client: DbConn::new(pg), + }) + } + + pub fn lock( + &mut self, + f: impl FnOnce(&mut DbConn) -> anyhow::Result<()>, + ) -> anyhow::Result<()> { + self.acquire_lock()?; + let result = f(&mut self.client); + self.release_lock()?; + + result + } + + fn acquire_lock(&mut self) -> anyhow::Result<()> { + let success = self + .client + .query(&format!("SELECT pg_try_advisory_lock({})", Self::LOCK_KEY))? + .first() + .ok_or_else(|| anyhow!("unexpectedly failed when acquiring advisory lock")) + .map(|row| row.get::<'_, _, bool>(0))?; + + if success { + Ok(()) + } else { + Err(anyhow!("another instance of Reshape is already running")) + } + } + + fn release_lock(&mut self) -> anyhow::Result<()> { + self.client + .query(&format!("SELECT pg_advisory_unlock({})", Self::LOCK_KEY))? + .first() + .ok_or_else(|| anyhow!("unexpectedly failed when releasing advisory lock"))?; + Ok(()) + } +} + pub trait Conn { fn run(&mut self, query: &str) -> anyhow::Result<()>; fn query(&mut self, query: &str) -> anyhow::Result>; @@ -16,9 +79,8 @@ pub struct DbConn { } impl DbConn { - pub fn connect(config: &postgres::Config) -> anyhow::Result { - let client = config.connect(NoTls)?; - Ok(DbConn { client }) + fn new(client: postgres::Client) -> Self { + DbConn { client } } } diff --git a/src/lib.rs b/src/lib.rs index 8679aef..e48266f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ use crate::{ use anyhow::{anyhow, Context}; use colored::*; -use db::{Conn, DbConn}; +use db::{Conn, DbConn, DbLocker}; use postgres::Config; use schema::Table; @@ -18,8 +18,7 @@ mod state; pub use crate::state::State; pub struct Reshape { - pub state: State, - db: DbConn, + db: DbLocker, } impl Reshape { @@ -45,480 +44,72 @@ impl Reshape { } fn new_with_config(config: &Config) -> anyhow::Result { - let mut db = DbConn::connect(config)?; - let state = State::load(&mut db)?; - - Ok(Reshape { db, state }) + let db = DbLocker::connect(config)?; + Ok(Reshape { db }) } - pub fn migrate(&mut self, migrations: T) -> anyhow::Result<()> - where - T: IntoIterator, - { - self.state = State::load(&mut self.db)?; - - // Make sure no migration is in progress - if let State::InProgress { .. } = &self.state { - println!("Migration already in progress, please complete using 'reshape complete'"); - return Ok(()); - } - - if let State::Completing { .. } = &self.state { - println!( - "Migration already in progress and has started completion, please finish using 'reshape complete'" - ); - return Ok(()); - } - - // Determine which migrations need to be applied by comparing the provided migrations - // with the already applied ones stored in the state. This will throw an error if the - // two sets of migrations don't agree, for example if a new migration has been added - // in between two existing ones. - let current_migration = state::current_migration(&mut self.db)?; - let remaining_migrations = state::remaining_migrations(&mut self.db, migrations)?; - if remaining_migrations.is_empty() { - println!("No migrations left to apply"); - return Ok(()); - } - - // If we have already started applying some migrations we need to ensure that - // they are the same ones we want to apply now - if let State::Applying { - migrations: existing_migrations, - } = &self.state - { - if existing_migrations != &remaining_migrations { - return Err(anyhow!( - "a previous migration seems to have failed without cleaning up. Please run `reshape abort` and then run migrate again." - )); - } - } - - // Move to the "Applying" state which is necessary as we can't run the migrations - // and state update as a single transaction. If a migration unexpectedly fails without - // automatically aborting, this state saves us from dangling migrations. It forces the user - // to either run migrate again (which works as all migrations are idempotent) or abort. - self.state.applying(remaining_migrations.clone()); - self.state.save(&mut self.db)?; - - println!("Applying {} migrations\n", remaining_migrations.len()); - - helpers::set_up_helpers(&mut self.db, ¤t_migration) - .context("failed to set up helpers")?; - - let mut new_schema = Schema::new(); - let mut last_migration_index = usize::MAX; - let mut last_action_index = usize::MAX; - let mut result: anyhow::Result<()> = Ok(()); - - for (migration_index, migration) in remaining_migrations.iter().enumerate() { - println!("Migrating '{}':", migration.name); - last_migration_index = migration_index; - - for (action_index, action) in migration.actions.iter().enumerate() { - last_action_index = action_index; - - let description = action.describe(); - print!(" + {} ", description); - - let ctx = MigrationContext::new(migration_index, action_index); - result = action - .run(&ctx, &mut self.db, &new_schema) - .with_context(|| format!("failed to {}", description)); - - if result.is_ok() { - action.update_schema(&ctx, &mut new_schema); - println!("{}", "done".green()); - } else { - println!("{}", "failed".red()); - break; - } - } - - println!(); - } - - // If a migration failed, we abort all the migrations that were applied - if let Err(err) = result { - println!("A migration failed, aborting migrations that have already been applied"); - - // Set to the Aborting state. This is to ensure that the failed - // migration is fully aborted and nothing is left dangling. - // If the abort is interrupted for any reason, the user can try again - // by running `reshape abort`. - self.state.aborting( - remaining_migrations.clone(), - last_migration_index + 1, - last_action_index + 1, - ); - - // Abort will only - self.abort()?; - - return Err(err); - } - - // Create schema and views for migration - let target_migration = remaining_migrations.last().unwrap().name.to_string(); - self.create_schema_for_migration(&target_migration, &new_schema) - .with_context(|| { - format!("failed to create schema for migration {}", target_migration) - })?; - - // Update state once migrations have been performed - self.state.in_progress(remaining_migrations); - self.state - .save(&mut self.db) - .context("failed to save in-progress state")?; - - // If we started from a blank slate, we can finish the migration immediately - if current_migration.is_none() { - println!("Automatically completing migrations\n"); - self.complete_migration() - .context("failed to automatically complete migrations")?; - - println!("Migrations complete:"); - println!( - " - Run '{}' from your application to use the latest schema", - schema_query_for_migration(&target_migration) - ); - } else { - println!("Migrations have been applied and the new schema is ready for use:"); - println!( - " - Run '{}' from your application to use the latest schema", - schema_query_for_migration(&target_migration) - ); - println!( - " - Run 'reshape complete' once your application has been updated and the previous schema is no longer in use" - ); - } - - Ok(()) - } - - pub fn complete_migration(&mut self) -> anyhow::Result<()> { - // Make sure a migration is in progress - let (remaining_migrations, starting_migration_index, starting_action_index) = match self.state.clone() { - State::InProgress { migrations } => { - // Move into the Completing state. Once in this state, - // the migration can't be aborted and must be completed. - self.state.completing(migrations.clone(), 0, 0); - self.state.save(&mut self.db).context("failed to save state")?; - - (migrations, 0, 0) - }, - State::Completing { - migrations, - current_migration_index, - current_action_index - } => (migrations, current_migration_index, current_action_index), - State::Aborting { .. } => { - return Err(anyhow!("migration been aborted and can't be completed. Please finish using `reshape abort`.")) - } - State::Applying { .. } => { - return Err(anyhow!("a previous migration unexpectedly failed. Please run `reshape migrate` to try applying the migration again.")) - } - State::Idle => { - println!("No migration in progress"); - return Ok(()); - } - }; - - // Remove previous migration's schema - if let Some(current_migration) = &state::current_migration(&mut self.db)? { - self.db - .run(&format!( - "DROP SCHEMA IF EXISTS {} CASCADE", - schema_name_for_migration(current_migration) - )) - .context("failed to remove previous migration's schema")?; - } - - for (migration_index, migration) in remaining_migrations.iter().enumerate() { - // Skip all the migrations which have already been completed - if migration_index < starting_migration_index { - continue; - } - - println!("Completing '{}':", migration.name); - - for (action_index, action) in migration.actions.iter().enumerate() { - // Skip all actions which have already been completed - if migration_index == starting_migration_index - && action_index < starting_action_index - { - continue; - } - - let description = action.describe(); - print!(" + {} ", description); - - let ctx = MigrationContext::new(migration_index, action_index); - - // Update state to indicate that this action has been completed. - // We won't save this new state until after the action has completed. - self.state.completing( - remaining_migrations.clone(), - migration_index + 1, - action_index + 1, - ); - - // This did_save check is necessary because of the borrow checker. - // The Transaction which might be returned from action.complete - // contains a mutable reference to self.db. We need the Transaction - // to be dropped before we can save the state using self.db instead, - // which we achieve here by limiting the lifetime of the Transaction - // with a new block. - let did_save = { - let result = action - .complete(&ctx, &mut self.db) - .with_context(|| format!("failed to complete migration {}", migration.name)) - .with_context(|| format!("failed to complete action: {}", description)); - - let maybe_transaction = match result { - Ok(maybe_transaction) => { - println!("{}", "done".green()); - maybe_transaction - } - Err(e) => { - println!("{}", "failed".red()); - return Err(e); - } - }; - - // Update state with which migrations and actions have been completed. - // Each action can create and return a transaction if they need atomicity. - // We use this transaction to update the state to ensure the action only completes. - // once. - // We want to use a single transaction for each action to keep the length of - // the transaction as short as possible. Wherever possible, we don't want to - // use a transaction at all. - if let Some(mut transaction) = maybe_transaction { - self.state - .save(&mut transaction) - .context("failed to save state after completing action")?; - transaction - .commit() - .context("failed to commit transaction")?; - - true - } else { - false - } - }; - - // If the action didn't return a transaction we save the state normally instead - if !did_save { - self.state - .save(&mut self.db) - .context("failed to save state after completing action")?; - } - } - - println!(); - } - - // Remove helpers which are no longer in use - helpers::tear_down_helpers(&mut self.db).context("failed to tear down helpers")?; - - self.state - .complete(&mut self.db) - .context("failed to update state as completed")?; - - Ok(()) - } - - fn create_schema_for_migration( + pub fn migrate( &mut self, - migration_name: &str, - schema: &Schema, + migrations: impl IntoIterator, ) -> anyhow::Result<()> { - // Create schema for migration - let schema_name = schema_name_for_migration(migration_name); - self.db - .run(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema_name)) - .with_context(|| { - format!( - "failed to create schema {} for migration {}", - schema_name, migration_name - ) - })?; - - // Create views inside schema - for table in schema.get_tables(&mut self.db)? { - Self::create_view_for_table(&mut self.db, &table, &schema_name)?; - } - - Ok(()) + self.db.lock(|db| { + let mut state = State::load(db)?; + migrate(db, &mut state, migrations) + }) } - fn create_view_for_table( - db: &mut impl Conn, - table: &Table, - schema: &str, - ) -> anyhow::Result<()> { - let select_columns: Vec = table - .columns - .iter() - .map(|column| { - format!( - r#" - "{real_name}" AS "{alias}" - "#, - real_name = column.real_name, - alias = column.name, - ) - }) - .collect(); - - db.run(&format!( - r#" - CREATE OR REPLACE VIEW {schema}."{view_name}" AS - SELECT {columns} - FROM "{table_name}" - "#, - schema = schema, - table_name = table.real_name, - view_name = table.name, - columns = select_columns.join(","), - )) - .with_context(|| format!("failed to create view for table {}", table.name))?; - - Ok(()) - } - - pub fn remove(&mut self) -> anyhow::Result<()> { - // Remove migration schemas and views - if let Some(current_migration) = &state::current_migration(&mut self.db)? { - self.db.run(&format!( - "DROP SCHEMA IF EXISTS {} CASCADE", - schema_name_for_migration(current_migration) - ))?; - } - - if let State::InProgress { migrations } = &self.state { - let target_migration = migrations.last().unwrap().name.to_string(); - self.db.run(&format!( - "DROP SCHEMA IF EXISTS {} CASCADE", - schema_name_for_migration(&target_migration) - ))?; - } - - // Remove all tables - let schema = Schema::new(); - for table in schema.get_tables(&mut self.db)? { - self.db.run(&format!( - r#" - DROP TABLE IF EXISTS "{}" CASCADE - "#, - table.real_name - ))?; - } - - // Reset state - self.state.clear(&mut self.db)?; - - println!("Reshape and all data has been removed"); - - Ok(()) + pub fn complete(&mut self) -> anyhow::Result<()> { + self.db.lock(|db| { + let mut state = State::load(db)?; + complete(db, &mut state) + }) } pub fn abort(&mut self) -> anyhow::Result<()> { - let (remaining_migrations, last_migration_index, last_action_index) = match self - .state - .clone() - { - State::InProgress { migrations } | State::Applying { migrations } => { - // Set to the Aborting state. Once this is done, the migration has to - // be fully aborted and can't be completed. - self.state.aborting(migrations.clone(), 0, 0); - self.state.save(&mut self.db)?; - - (migrations, usize::MAX, usize::MAX) - } - State::Aborting { - migrations, - last_migration_index, - last_action_index, - } => (migrations, last_migration_index, last_action_index), - State::Completing { .. } => { - return Err(anyhow!("Migration completion has already been started. Please run `reshape complete` again to finish it.")); - } - State::Idle => { - println!("No migration is in progress"); - return Ok(()); - } - }; - - // Remove new migration's schema - let target_migration = remaining_migrations.last().unwrap().name.to_string(); - let schema_name = schema_name_for_migration(&target_migration); - self.db - .run(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema_name,)) - .with_context(|| format!("failed to drop schema {}", schema_name))?; - - // Abort all pending migrations - self.abort_migrations( - &remaining_migrations, - last_migration_index, - last_action_index, - )?; - - helpers::tear_down_helpers(&mut self.db).context("failed to tear down helpers")?; - - self.state = State::Idle; - self.state - .save(&mut self.db) - .context("failed to save state")?; - - Ok(()) + self.db.lock(|db| { + let mut state = State::load(db)?; + abort(db, &mut state) + }) } - fn abort_migrations( - &mut self, - migrations: &[Migration], - upper_migration_index: usize, - upper_action_index: usize, - ) -> anyhow::Result<()> { - // Abort all migrations in reverse order - for (migration_index, migration) in migrations.iter().enumerate().rev() { - // Skip migrations which shouldn't be aborted - // The reason can be that they have already been aborted or that - // the migration was never applied in the first place. - if migration_index >= upper_migration_index { - continue; + pub fn remove(&mut self) -> anyhow::Result<()> { + self.db.lock(|db| { + let mut state = State::load(db)?; + + // Remove migration schemas and views + if let Some(current_migration) = &state::current_migration(db)? { + db.run(&format!( + "DROP SCHEMA IF EXISTS {} CASCADE", + schema_name_for_migration(current_migration) + ))?; } - print!("Aborting '{}' ", migration.name); - - for (action_index, action) in migration.actions.iter().enumerate().rev() { - // Skip actions which shouldn't be aborted - // The reason can be that they have already been aborted or that - // the action was never applied in the first place. - if migration_index == upper_migration_index - 1 - && action_index >= upper_action_index - { - continue; - } - - let ctx = MigrationContext::new(migration_index, action_index); - action - .abort(&ctx, &mut self.db) - .with_context(|| format!("failed to abort migration {}", migration.name)) - .with_context(|| format!("failed to abort action: {}", action.describe()))?; - - // Update state with which migrations and actions have been aborted. - // We don't need to run this in a transaction as aborts are idempotent. - self.state - .aborting(migrations.to_vec(), migration_index, action_index); - self.state - .save(&mut self.db) - .context("failed to save state")?; + if let State::InProgress { migrations } = &state { + let target_migration = migrations.last().unwrap().name.to_string(); + db.run(&format!( + "DROP SCHEMA IF EXISTS {} CASCADE", + schema_name_for_migration(&target_migration) + ))?; } - println!("{}", "done".green()); - } - Ok(()) + // Remove all tables + let schema = Schema::new(); + for table in schema.get_tables(db)? { + db.run(&format!( + r#" + DROP TABLE IF EXISTS "{}" CASCADE + "#, + table.real_name + ))?; + } + + // Reset state + state.clear(db)?; + + println!("Reshape and all data has been removed"); + + Ok(()) + }) } } @@ -536,3 +127,397 @@ pub fn schema_query_for_migration(migration_name: &str) -> String { fn schema_name_for_migration(migration_name: &str) -> String { format!("migration_{}", migration_name) } + +fn migrate( + db: &mut DbConn, + state: &mut State, + migrations: impl IntoIterator, +) -> anyhow::Result<()> { + // Make sure no migration is in progress + if let State::InProgress { .. } = &state { + println!("Migration already in progress, please complete using 'reshape complete'"); + return Ok(()); + } + + if let State::Completing { .. } = &state { + println!( + "Migration already in progress and has started completion, please finish using 'reshape complete'" + ); + return Ok(()); + } + + // Determine which migrations need to be applied by comparing the provided migrations + // with the already applied ones stored in the state. This will throw an error if the + // two sets of migrations don't agree, for example if a new migration has been added + // in between two existing ones. + let current_migration = state::current_migration(db)?; + let remaining_migrations = state::remaining_migrations(db, migrations)?; + if remaining_migrations.is_empty() { + println!("No migrations left to apply"); + return Ok(()); + } + + // If we have already started applying some migrations we need to ensure that + // they are the same ones we want to apply now + if let State::Applying { + migrations: existing_migrations, + } = &state + { + if existing_migrations != &remaining_migrations { + return Err(anyhow!( + "a previous migration seems to have failed without cleaning up. Please run `reshape abort` and then run migrate again." + )); + } + } + + // Move to the "Applying" state which is necessary as we can't run the migrations + // and state update as a single transaction. If a migration unexpectedly fails without + // automatically aborting, this state saves us from dangling migrations. It forces the user + // to either run migrate again (which works as all migrations are idempotent) or abort. + state.applying(remaining_migrations.clone()); + state.save(db)?; + + println!("Applying {} migrations\n", remaining_migrations.len()); + + helpers::set_up_helpers(db, ¤t_migration).context("failed to set up helpers")?; + + let mut new_schema = Schema::new(); + let mut last_migration_index = usize::MAX; + let mut last_action_index = usize::MAX; + let mut result: anyhow::Result<()> = Ok(()); + + for (migration_index, migration) in remaining_migrations.iter().enumerate() { + println!("Migrating '{}':", migration.name); + last_migration_index = migration_index; + + for (action_index, action) in migration.actions.iter().enumerate() { + last_action_index = action_index; + + let description = action.describe(); + print!(" + {} ", description); + + let ctx = MigrationContext::new(migration_index, action_index); + result = action + .run(&ctx, db, &new_schema) + .with_context(|| format!("failed to {}", description)); + + if result.is_ok() { + action.update_schema(&ctx, &mut new_schema); + println!("{}", "done".green()); + } else { + println!("{}", "failed".red()); + break; + } + } + + println!(); + } + + // If a migration failed, we abort all the migrations that were applied + if let Err(err) = result { + println!("A migration failed, aborting migrations that have already been applied"); + + // Set to the Aborting state. This is to ensure that the failed + // migration is fully aborted and nothing is left dangling. + // If the abort is interrupted for any reason, the user can try again + // by running `reshape abort`. + state.aborting( + remaining_migrations.clone(), + last_migration_index + 1, + last_action_index + 1, + ); + + // Abort will only + abort(db, state)?; + + return Err(err); + } + + // Create schema and views for migration + let target_migration = remaining_migrations.last().unwrap().name.to_string(); + create_schema_for_migration(db, &target_migration, &new_schema) + .with_context(|| format!("failed to create schema for migration {}", target_migration))?; + + // Update state once migrations have been performed + state.in_progress(remaining_migrations); + state.save(db).context("failed to save in-progress state")?; + + // If we started from a blank slate, we can finish the migration immediately + if current_migration.is_none() { + println!("Automatically completing migrations\n"); + complete(db, state).context("failed to automatically complete migrations")?; + + println!("Migrations complete:"); + println!( + " - Run '{}' from your application to use the latest schema", + schema_query_for_migration(&target_migration) + ); + } else { + println!("Migrations have been applied and the new schema is ready for use:"); + println!( + " - Run '{}' from your application to use the latest schema", + schema_query_for_migration(&target_migration) + ); + println!( + " - Run 'reshape complete' once your application has been updated and the previous schema is no longer in use" + ); + } + + Ok(()) +} + +fn complete(db: &mut DbConn, state: &mut State) -> anyhow::Result<()> { + // Make sure a migration is in progress + let (remaining_migrations, starting_migration_index, starting_action_index) = match state.clone() { + State::InProgress { migrations } => { + // Move into the Completing state. Once in this state, + // the migration can't be aborted and must be completed. + state.completing(migrations.clone(), 0, 0); + state.save(db).context("failed to save state")?; + + (migrations, 0, 0) + }, + State::Completing { + migrations, + current_migration_index, + current_action_index + } => (migrations, current_migration_index, current_action_index), + State::Aborting { .. } => { + return Err(anyhow!("migration been aborted and can't be completed. Please finish using `reshape abort`.")) + } + State::Applying { .. } => { + return Err(anyhow!("a previous migration unexpectedly failed. Please run `reshape migrate` to try applying the migration again.")) + } + State::Idle => { + println!("No migration in progress"); + return Ok(()); + } + }; + + // Remove previous migration's schema + if let Some(current_migration) = &state::current_migration(db)? { + db.run(&format!( + "DROP SCHEMA IF EXISTS {} CASCADE", + schema_name_for_migration(current_migration) + )) + .context("failed to remove previous migration's schema")?; + } + + for (migration_index, migration) in remaining_migrations.iter().enumerate() { + // Skip all the migrations which have already been completed + if migration_index < starting_migration_index { + continue; + } + + println!("Completing '{}':", migration.name); + + for (action_index, action) in migration.actions.iter().enumerate() { + // Skip all actions which have already been completed + if migration_index == starting_migration_index && action_index < starting_action_index { + continue; + } + + let description = action.describe(); + print!(" + {} ", description); + + let ctx = MigrationContext::new(migration_index, action_index); + + // Update state to indicate that this action has been completed. + // We won't save this new state until after the action has completed. + state.completing( + remaining_migrations.clone(), + migration_index + 1, + action_index + 1, + ); + + // This did_save check is necessary because of the borrow checker. + // The Transaction which might be returned from action.complete + // contains a mutable reference to self.db. We need the Transaction + // to be dropped before we can save the state using self.db instead, + // which we achieve here by limiting the lifetime of the Transaction + // with a new block. + let did_save = { + let result = action + .complete(&ctx, db) + .with_context(|| format!("failed to complete migration {}", migration.name)) + .with_context(|| format!("failed to complete action: {}", description)); + + let maybe_transaction = match result { + Ok(maybe_transaction) => { + println!("{}", "done".green()); + maybe_transaction + } + Err(e) => { + println!("{}", "failed".red()); + return Err(e); + } + }; + + // Update state with which migrations and actions have been completed. + // Each action can create and return a transaction if they need atomicity. + // We use this transaction to update the state to ensure the action only completes. + // once. + // We want to use a single transaction for each action to keep the length of + // the transaction as short as possible. Wherever possible, we don't want to + // use a transaction at all. + if let Some(mut transaction) = maybe_transaction { + state + .save(&mut transaction) + .context("failed to save state after completing action")?; + transaction + .commit() + .context("failed to commit transaction")?; + + true + } else { + false + } + }; + + // If the action didn't return a transaction we save the state normally instead + if !did_save { + state + .save(db) + .context("failed to save state after completing action")?; + } + } + + println!(); + } + + // Remove helpers which are no longer in use + helpers::tear_down_helpers(db).context("failed to tear down helpers")?; + + state + .complete(db) + .context("failed to update state as completed")?; + + Ok(()) +} + +fn abort(db: &mut DbConn, state: &mut State) -> anyhow::Result<()> { + let (remaining_migrations, last_migration_index, last_action_index) = match state.clone() { + State::InProgress { migrations } | State::Applying { migrations } => { + // Set to the Aborting state. Once this is done, the migration has to + // be fully aborted and can't be completed. + state.aborting(migrations.clone(), 0, 0); + state.save(db)?; + + (migrations, usize::MAX, usize::MAX) + } + State::Aborting { + migrations, + last_migration_index, + last_action_index, + } => (migrations, last_migration_index, last_action_index), + State::Completing { .. } => { + return Err(anyhow!("Migration completion has already been started. Please run `reshape complete` again to finish it.")); + } + State::Idle => { + println!("No migration is in progress"); + return Ok(()); + } + }; + + // Remove new migration's schema + let target_migration = remaining_migrations.last().unwrap().name.to_string(); + let schema_name = schema_name_for_migration(&target_migration); + db.run(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema_name,)) + .with_context(|| format!("failed to drop schema {}", schema_name))?; + + // Abort all pending migrations + // Abort all migrations in reverse order + for (migration_index, migration) in remaining_migrations.iter().enumerate().rev() { + // Skip migrations which shouldn't be aborted + // The reason can be that they have already been aborted or that + // the migration was never applied in the first place. + if migration_index >= last_migration_index { + continue; + } + + print!("Aborting '{}' ", migration.name); + + for (action_index, action) in migration.actions.iter().enumerate().rev() { + // Skip actions which shouldn't be aborted + // The reason can be that they have already been aborted or that + // the action was never applied in the first place. + if migration_index == last_migration_index - 1 && action_index >= last_action_index { + continue; + } + + let ctx = MigrationContext::new(migration_index, action_index); + action + .abort(&ctx, db) + .with_context(|| format!("failed to abort migration {}", migration.name)) + .with_context(|| format!("failed to abort action: {}", action.describe()))?; + + // Update state with which migrations and actions have been aborted. + // We don't need to run this in a transaction as aborts are idempotent. + state.aborting(remaining_migrations.to_vec(), migration_index, action_index); + state.save(db).context("failed to save state")?; + } + + println!("{}", "done".green()); + } + + helpers::tear_down_helpers(db).context("failed to tear down helpers")?; + + *state = State::Idle; + state.save(db).context("failed to save state")?; + + Ok(()) +} + +fn create_schema_for_migration( + db: &mut DbConn, + migration_name: &str, + schema: &Schema, +) -> anyhow::Result<()> { + // Create schema for migration + let schema_name = schema_name_for_migration(migration_name); + db.run(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema_name)) + .with_context(|| { + format!( + "failed to create schema {} for migration {}", + schema_name, migration_name + ) + })?; + + // Create views inside schema + for table in schema.get_tables(db)? { + create_view_for_table(db, &table, &schema_name)?; + } + + Ok(()) +} + +fn create_view_for_table(db: &mut impl Conn, table: &Table, schema: &str) -> anyhow::Result<()> { + let select_columns: Vec = table + .columns + .iter() + .map(|column| { + format!( + r#" + "{real_name}" AS "{alias}" + "#, + real_name = column.real_name, + alias = column.name, + ) + }) + .collect(); + + db.run(&format!( + r#" + CREATE OR REPLACE VIEW {schema}."{view_name}" AS + SELECT {columns} + FROM "{table_name}" + "#, + schema = schema, + table_name = table.real_name, + view_name = table.name, + columns = select_columns.join(","), + )) + .with_context(|| format!("failed to create view for table {}", table.name))?; + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index c966edf..2013f03 100644 --- a/src/main.rs +++ b/src/main.rs @@ -72,14 +72,14 @@ fn run(opts: Opts) -> anyhow::Result<()> { // Automatically complete migration if --complete flag is set if opts.complete { - reshape.complete_migration()?; + reshape.complete()?; } Ok(()) } Command::Complete(opts) => { let mut reshape = reshape_from_connection_options(&opts)?; - reshape.complete_migration() + reshape.complete() } Command::Remove(opts) => { let mut reshape = reshape_from_connection_options(&opts)?; diff --git a/tests/add_column.rs b/tests/add_column.rs index c280d23..cee0b74 100644 --- a/tests/add_column.rs +++ b/tests/add_column.rs @@ -112,7 +112,7 @@ fn add_column() { (first_name.as_ref(), last_name.as_ref()) ); - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); common::assert_cleaned_up(&mut new_db); } @@ -205,7 +205,7 @@ fn add_column_nullable() { .simple_query("INSERT INTO users (id, name) VALUES (4, 'Test Testsson'), (5, NULL)") .unwrap(); - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); common::assert_cleaned_up(&mut new_db); } @@ -288,6 +288,6 @@ fn add_column_with_default() { .unwrap(); assert_eq!("DEFAULT", name); - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); common::assert_cleaned_up(&mut new_db); } diff --git a/tests/add_index.rs b/tests/add_index.rs index e9eaf0e..220515e 100644 --- a/tests/add_index.rs +++ b/tests/add_index.rs @@ -64,6 +64,6 @@ fn add_index() { assert_eq!(vec![(true, true)], result); - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); common::assert_cleaned_up(&mut new_db); } diff --git a/tests/alter_column.rs b/tests/alter_column.rs index 9251863..2881d85 100644 --- a/tests/alter_column.rs +++ b/tests/alter_column.rs @@ -99,7 +99,7 @@ fn alter_column_data() { .unwrap(); assert_eq!("test testsson", result.get::<_, &str>("name")); - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); common::assert_cleaned_up(&mut new_db); } @@ -198,7 +198,7 @@ fn alter_column_set_not_null() { .unwrap(); assert_eq!("Jane Doe", result.get::<_, &str>("name")); - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); common::assert_cleaned_up(&mut new_db); } @@ -280,7 +280,7 @@ fn alter_column_rename() { .map(|row| row.get::<_, String>("full_name")) .eq(expected)); - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); common::assert_cleaned_up(&mut new_db); } @@ -402,7 +402,7 @@ fn alter_column_multiple() { .unwrap(); assert_eq!(48, result); - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); common::assert_cleaned_up(&mut new_db); } @@ -502,7 +502,7 @@ fn alter_column_default() { .unwrap(); assert_eq!("NEW DEFAULT", result.get::<_, &str>("name")); - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); common::assert_cleaned_up(&mut new_db); } @@ -568,7 +568,7 @@ fn alter_column_with_index() { // Complete the second migration which should replace the existing column // with the temporary one - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); // Make sure index still exists let result: i64 = db diff --git a/tests/remove_column.rs b/tests/remove_column.rs index 9ff736f..55bfd05 100644 --- a/tests/remove_column.rs +++ b/tests/remove_column.rs @@ -76,6 +76,6 @@ fn remove_column() { result.as_ref().map(|row| row.get("name")) ); - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); common::assert_cleaned_up(&mut new_db); } diff --git a/tests/remove_index.rs b/tests/remove_index.rs index 3f6a9df..fa35aca 100644 --- a/tests/remove_index.rs +++ b/tests/remove_index.rs @@ -85,7 +85,7 @@ fn remove_index() { assert_eq!(vec![(true, true)], result); - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); // Ensure index has been removed after the migration is complete let count: i64 = db diff --git a/tests/remove_table.rs b/tests/remove_table.rs index 3e8c6a7..7410d80 100644 --- a/tests/remove_table.rs +++ b/tests/remove_table.rs @@ -49,6 +49,6 @@ fn remove_table() { // Ensure the table is not accessible through the new schema assert!(new_db.query("SELECT id FROM users", &[]).is_err()); - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); common::assert_cleaned_up(&mut new_db); } diff --git a/tests/rename_table.rs b/tests/rename_table.rs index 57497fb..d5be00b 100644 --- a/tests/rename_table.rs +++ b/tests/rename_table.rs @@ -75,6 +75,6 @@ fn rename_table() { assert!(old_db.simple_query("SELECT id FROM customers").is_err()); assert!(new_db.simple_query("SELECT id FROM users").is_err()); - reshape.complete_migration().unwrap(); + reshape.complete().unwrap(); common::assert_cleaned_up(&mut new_db); }