diff --git a/src/db.rs b/src/db.rs index 3e9f3dd..2f3b179 100644 --- a/src/db.rs +++ b/src/db.rs @@ -8,6 +8,7 @@ pub trait Conn { query: &str, params: &[&(dyn ToSql + Sync)], ) -> anyhow::Result>; + fn transaction(&mut self) -> anyhow::Result; } pub struct DbConn { @@ -19,11 +20,6 @@ impl DbConn { let client = config.connect(NoTls)?; Ok(DbConn { client }) } - - pub fn transaction(&mut self) -> anyhow::Result { - let transaction = self.client.transaction()?; - Ok(Transaction { transaction }) - } } impl Conn for DbConn { @@ -45,6 +41,11 @@ impl Conn for DbConn { let rows = self.client.query(query, params)?; Ok(rows) } + + fn transaction(&mut self) -> anyhow::Result { + let transaction = self.client.transaction()?; + Ok(Transaction { transaction }) + } } pub struct Transaction<'a> { @@ -77,4 +78,9 @@ impl Conn for Transaction<'_> { let rows = self.transaction.query(query, params)?; Ok(rows) } + + fn transaction(&mut self) -> anyhow::Result { + let transaction = self.transaction.transaction()?; + Ok(Transaction { transaction }) + } } diff --git a/src/lib.rs b/src/lib.rs index 2d5fdf2..2ac27f5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -253,44 +253,69 @@ impl Reshape { continue; } - // Run each action completion as a separate transaction. We need atomicity - // to ensure the completion changes are run only once for each action. - let mut transaction = self - .db - .transaction() - .context("failed to start transaction")?; - let description = action.describe(); print!(" + {} ", description); let ctx = MigrationContext::new(migration_index, action_index); - let result = action - .complete(&ctx, &mut transaction) - .with_context(|| format!("failed to complete migration {}", migration.name)) - .with_context(|| format!("failed to complete action: {}", description)); - if result.is_ok() { - println!("{}", "done".green()); - } else { - println!("{}", "failed".red()); - return result; - } - - // Update state with which migrations and actions have been completed. By running this - // in a transaction, we guarantee that an action is only completed once. - // We want to use a single transaction for each action to keep the length - // of the transaction as short as possible. + // Update state to indicate that this action has been completed. + // We won't save this new state until after the action has completed. self.state.completing( remaining_migrations.clone(), migration_index + 1, action_index + 1, ); - self.state - .save(&mut transaction) - .context("failed to save state after completing action")?; - transaction - .commit() - .context("failed to commit transaction")?; + + // This did_save check is necessary because of the borrow checker. + // The Transaction which might be returned from action.complete + // contains a mutable reference to self.db. We need the Transaction + // to be dropped before we can save the state using self.db instead, + // which we achieve here by limiting the lifetime of the Transaction + // with a new block. + let did_save = { + let result = action + .complete(&ctx, &mut self.db) + .with_context(|| format!("failed to complete migration {}", migration.name)) + .with_context(|| format!("failed to complete action: {}", description)); + + let maybe_transaction = match result { + Ok(maybe_transaction) => { + println!("{}", "done".green()); + maybe_transaction + } + Err(e) => { + println!("{}", "failed".red()); + return Err(e); + } + }; + + // Update state with which migrations and actions have been completed. + // Each action can create and return a transaction if they need atomicity. + // We use this transaction to update the state to ensure the action only completes. + // once. + // We want to use a single transaction for each action to keep the length of + // the transaction as short as possible. Wherever possible, we don't want to + // use a transaction at all. + if let Some(mut transaction) = maybe_transaction { + self.state + .save(&mut transaction) + .context("failed to save state after completing action")?; + transaction + .commit() + .context("failed to commit transaction")?; + + true + } else { + false + } + }; + + // If the action didn't return a transaction we save the state normally instead + if !did_save { + self.state + .save(&mut self.db) + .context("failed to save state after completing action")?; + } } println!(); diff --git a/src/migrations/add_column.rs b/src/migrations/add_column.rs index a5a15a2..6b7daaf 100644 --- a/src/migrations/add_column.rs +++ b/src/migrations/add_column.rs @@ -1,5 +1,8 @@ use super::{common, Action, Column, MigrationContext}; -use crate::{db::Conn, schema::Schema}; +use crate::{ + db::{Conn, Transaction}, + schema::Schema, +}; use anyhow::Context; use serde::{Deserialize, Serialize}; @@ -156,7 +159,13 @@ impl Action for AddColumn { Ok(()) } - fn complete(&self, ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()> { + fn complete<'a>( + &self, + ctx: &MigrationContext, + db: &'a mut dyn Conn, + ) -> anyhow::Result>> { + let mut transaction = db.transaction().context("failed to create transaction")?; + // Remove triggers and procedures let query = format!( " @@ -166,7 +175,9 @@ impl Action for AddColumn { table = self.table, trigger_name = self.trigger_name(ctx), ); - db.run(&query).context("failed to drop up trigger")?; + transaction + .run(&query) + .context("failed to drop up trigger")?; // Update column to be NOT NULL if necessary if !self.column.nullable { @@ -180,7 +191,8 @@ impl Action for AddColumn { table = self.table, constraint_name = self.not_null_constraint_name(ctx), ); - db.run(&query) + transaction + .run(&query) .context("failed to validate NOT NULL constraint")?; // Update the column to be NOT NULL. @@ -195,7 +207,9 @@ impl Action for AddColumn { table = self.table, column = self.temp_column_name(ctx), ); - db.run(&query).context("failed to set column as NOT NULL")?; + transaction + .run(&query) + .context("failed to set column as NOT NULL")?; // Drop the temporary constraint let query = format!( @@ -206,23 +220,25 @@ impl Action for AddColumn { table = self.table, constraint_name = self.not_null_constraint_name(ctx), ); - db.run(&query) + transaction + .run(&query) .context("failed to drop NOT NULL constraint")?; } // Rename the temporary column to its real name - db.run(&format!( - " + transaction + .run(&format!( + " ALTER TABLE {table} RENAME COLUMN {temp_column_name} TO {column_name} ", - table = self.table, - temp_column_name = self.temp_column_name(ctx), - column_name = self.column.name, - )) - .context("failed to rename column to final name")?; + table = self.table, + temp_column_name = self.temp_column_name(ctx), + column_name = self.column.name, + )) + .context("failed to rename column to final name")?; - Ok(()) + Ok(Some(transaction)) } fn update_schema(&self, ctx: &MigrationContext, schema: &mut Schema) { diff --git a/src/migrations/add_index.rs b/src/migrations/add_index.rs index 2ada56c..922d8da 100644 --- a/src/migrations/add_index.rs +++ b/src/migrations/add_index.rs @@ -1,5 +1,8 @@ use super::{Action, MigrationContext}; -use crate::{db::Conn, schema::Schema}; +use crate::{ + db::{Conn, Transaction}, + schema::Schema, +}; use anyhow::Context; use serde::{Deserialize, Serialize}; @@ -43,8 +46,12 @@ impl Action for AddIndex { Ok(()) } - fn complete(&self, _ctx: &MigrationContext, _db: &mut dyn Conn) -> anyhow::Result<()> { - Ok(()) + fn complete<'a>( + &self, + _ctx: &MigrationContext, + _db: &'a mut dyn Conn, + ) -> anyhow::Result>> { + Ok(None) } fn update_schema(&self, _ctx: &MigrationContext, _schema: &mut Schema) {} diff --git a/src/migrations/alter_column.rs b/src/migrations/alter_column.rs index 5cac2a0..d5c5ac0 100644 --- a/src/migrations/alter_column.rs +++ b/src/migrations/alter_column.rs @@ -1,5 +1,9 @@ use super::{Action, MigrationContext}; -use crate::{db::Conn, migrations::common, schema::Schema}; +use crate::{ + db::{Conn, Transaction}, + migrations::common, + schema::Schema, +}; use anyhow::{anyhow, Context}; use serde::{Deserialize, Serialize}; @@ -169,7 +173,11 @@ impl Action for AlterColumn { Ok(()) } - fn complete(&self, ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()> { + fn complete<'a>( + &self, + ctx: &MigrationContext, + db: &'a mut dyn Conn, + ) -> anyhow::Result>> { if self.can_short_circuit() { if let Some(new_name) = &self.changes.name { let query = format!( @@ -183,7 +191,7 @@ impl Action for AlterColumn { ); db.run(&query).context("failed to rename column")?; } - return Ok(()); + return Ok(None); } // Update column to be NOT NULL if necessary @@ -277,7 +285,7 @@ impl Action for AlterColumn { db.run(&query) .context("failed to drop up and down triggers")?; - Ok(()) + Ok(None) } fn update_schema(&self, ctx: &MigrationContext, schema: &mut Schema) { diff --git a/src/migrations/create_table.rs b/src/migrations/create_table.rs index ca1d304..9594be5 100644 --- a/src/migrations/create_table.rs +++ b/src/migrations/create_table.rs @@ -1,5 +1,8 @@ use super::{Action, Column, MigrationContext}; -use crate::{db::Conn, schema::Schema}; +use crate::{ + db::{Conn, Transaction}, + schema::Schema, +}; use anyhow::Context; use derive_builder::Builder; use serde::{Deserialize, Serialize}; @@ -82,9 +85,13 @@ impl Action for CreateTable { Ok(()) } - fn complete(&self, _ctx: &MigrationContext, _db: &mut dyn Conn) -> anyhow::Result<()> { + fn complete<'a>( + &self, + _ctx: &MigrationContext, + _db: &'a mut dyn Conn, + ) -> anyhow::Result>> { // Do nothing - Ok(()) + Ok(None) } fn update_schema(&self, _ctx: &MigrationContext, _schema: &mut Schema) {} diff --git a/src/migrations/mod.rs b/src/migrations/mod.rs index 4d9f3e9..f596f8b 100644 --- a/src/migrations/mod.rs +++ b/src/migrations/mod.rs @@ -1,4 +1,7 @@ -use crate::{db::Conn, schema::Schema}; +use crate::{ + db::{Conn, Transaction}, + schema::Schema, +}; use core::fmt::Debug; use serde::{Deserialize, Serialize}; @@ -98,7 +101,11 @@ pub trait Action: Debug { fn describe(&self) -> String; fn run(&self, ctx: &MigrationContext, db: &mut dyn Conn, schema: &Schema) -> anyhow::Result<()>; - fn complete(&self, ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()>; + fn complete<'a>( + &self, + ctx: &MigrationContext, + db: &'a mut dyn Conn, + ) -> anyhow::Result>>; fn update_schema(&self, ctx: &MigrationContext, schema: &mut Schema); fn abort(&self, ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()>; } diff --git a/src/migrations/remove_column.rs b/src/migrations/remove_column.rs index 86549f5..0e21597 100644 --- a/src/migrations/remove_column.rs +++ b/src/migrations/remove_column.rs @@ -1,5 +1,8 @@ use super::{Action, MigrationContext}; -use crate::{db::Conn, schema::Schema}; +use crate::{ + db::{Conn, Transaction}, + schema::Schema, +}; use anyhow::Context; use serde::{Deserialize, Serialize}; @@ -84,12 +87,16 @@ impl Action for RemoveColumn { Ok(()) } - fn complete(&self, ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()> { + fn complete<'a>( + &self, + ctx: &MigrationContext, + db: &'a mut dyn Conn, + ) -> anyhow::Result>> { // Remove column, function and trigger let query = format!( " ALTER TABLE {table} - DROP COLUMN {column}; + DROP COLUMN IF EXISTS {column}; DROP TRIGGER IF EXISTS {trigger_name} ON {table}; DROP FUNCTION IF EXISTS {trigger_name}; @@ -101,7 +108,7 @@ impl Action for RemoveColumn { db.run(&query) .context("failed to drop column and down trigger")?; - Ok(()) + Ok(None) } fn update_schema(&self, _ctx: &MigrationContext, schema: &mut Schema) { diff --git a/src/migrations/remove_table.rs b/src/migrations/remove_table.rs index 70d9a0f..e77304f 100644 --- a/src/migrations/remove_table.rs +++ b/src/migrations/remove_table.rs @@ -1,5 +1,8 @@ use super::{Action, MigrationContext}; -use crate::{db::Conn, schema::Schema}; +use crate::{ + db::{Conn, Transaction}, + schema::Schema, +}; use anyhow::Context; use serde::{Deserialize, Serialize}; @@ -23,17 +26,21 @@ impl Action for RemoveTable { Ok(()) } - fn complete(&self, _ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()> { + fn complete<'a>( + &self, + _ctx: &MigrationContext, + db: &'a mut dyn Conn, + ) -> anyhow::Result>> { // Remove table let query = format!( " - DROP TABLE {table}; + DROP TABLE IF EXISTS {table}; ", table = self.table, ); db.run(&query).context("failed to drop table")?; - Ok(()) + Ok(None) } fn update_schema(&self, _ctx: &MigrationContext, schema: &mut Schema) { diff --git a/src/migrations/rename_table.rs b/src/migrations/rename_table.rs index 5596f09..7f8659a 100644 --- a/src/migrations/rename_table.rs +++ b/src/migrations/rename_table.rs @@ -1,5 +1,8 @@ use super::{Action, MigrationContext}; -use crate::{db::Conn, schema::Schema}; +use crate::{ + db::{Conn, Transaction}, + schema::Schema, +}; use anyhow::Context; use serde::{Deserialize, Serialize}; @@ -24,11 +27,15 @@ impl Action for RenameTable { Ok(()) } - fn complete(&self, _ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()> { + fn complete<'a>( + &self, + _ctx: &MigrationContext, + db: &'a mut dyn Conn, + ) -> anyhow::Result>> { // Rename table let query = format!( " - ALTER TABLE {table} + ALTER TABLE IF EXISTS {table} RENAME TO {new_name} ", table = self.table, @@ -36,7 +43,7 @@ impl Action for RenameTable { ); db.run(&query).context("failed to rename table")?; - Ok(()) + Ok(None) } fn update_schema(&self, _ctx: &MigrationContext, schema: &mut Schema) {