mirror of
https://github.com/ilyakooo0/reshape.git
synced 2024-11-25 23:13:29 +03:00
Let actions complete without transaction
Some migrations can be completed without needing a transaction for atomicity. We should avoid transactions for DDL as far as possible to avoid interfering with other queries and holding locks too long. It's possible that all actions could be completed without a transaction but I'm not sure of that yet. Some migrations also won't work with transactions. For example dropping an index using `DROP INDEX CONCURRENTLY` doesn't work inside a transation. Next step is to perform the same change for aborts.
This commit is contained in:
parent
8a130490a0
commit
8e08941007
16
src/db.rs
16
src/db.rs
@ -8,6 +8,7 @@ pub trait Conn {
|
||||
query: &str,
|
||||
params: &[&(dyn ToSql + Sync)],
|
||||
) -> anyhow::Result<Vec<Row>>;
|
||||
fn transaction(&mut self) -> anyhow::Result<Transaction>;
|
||||
}
|
||||
|
||||
pub struct DbConn {
|
||||
@ -19,11 +20,6 @@ impl DbConn {
|
||||
let client = config.connect(NoTls)?;
|
||||
Ok(DbConn { client })
|
||||
}
|
||||
|
||||
pub fn transaction(&mut self) -> anyhow::Result<Transaction> {
|
||||
let transaction = self.client.transaction()?;
|
||||
Ok(Transaction { transaction })
|
||||
}
|
||||
}
|
||||
|
||||
impl Conn for DbConn {
|
||||
@ -45,6 +41,11 @@ impl Conn for DbConn {
|
||||
let rows = self.client.query(query, params)?;
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
fn transaction(&mut self) -> anyhow::Result<Transaction> {
|
||||
let transaction = self.client.transaction()?;
|
||||
Ok(Transaction { transaction })
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Transaction<'a> {
|
||||
@ -77,4 +78,9 @@ impl Conn for Transaction<'_> {
|
||||
let rows = self.transaction.query(query, params)?;
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
fn transaction(&mut self) -> anyhow::Result<Transaction> {
|
||||
let transaction = self.transaction.transaction()?;
|
||||
Ok(Transaction { transaction })
|
||||
}
|
||||
}
|
||||
|
81
src/lib.rs
81
src/lib.rs
@ -253,44 +253,69 @@ impl Reshape {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Run each action completion as a separate transaction. We need atomicity
|
||||
// to ensure the completion changes are run only once for each action.
|
||||
let mut transaction = self
|
||||
.db
|
||||
.transaction()
|
||||
.context("failed to start transaction")?;
|
||||
|
||||
let description = action.describe();
|
||||
print!(" + {} ", description);
|
||||
|
||||
let ctx = MigrationContext::new(migration_index, action_index);
|
||||
let result = action
|
||||
.complete(&ctx, &mut transaction)
|
||||
.with_context(|| format!("failed to complete migration {}", migration.name))
|
||||
.with_context(|| format!("failed to complete action: {}", description));
|
||||
|
||||
if result.is_ok() {
|
||||
println!("{}", "done".green());
|
||||
} else {
|
||||
println!("{}", "failed".red());
|
||||
return result;
|
||||
}
|
||||
|
||||
// Update state with which migrations and actions have been completed. By running this
|
||||
// in a transaction, we guarantee that an action is only completed once.
|
||||
// We want to use a single transaction for each action to keep the length
|
||||
// of the transaction as short as possible.
|
||||
// Update state to indicate that this action has been completed.
|
||||
// We won't save this new state until after the action has completed.
|
||||
self.state.completing(
|
||||
remaining_migrations.clone(),
|
||||
migration_index + 1,
|
||||
action_index + 1,
|
||||
);
|
||||
self.state
|
||||
.save(&mut transaction)
|
||||
.context("failed to save state after completing action")?;
|
||||
transaction
|
||||
.commit()
|
||||
.context("failed to commit transaction")?;
|
||||
|
||||
// This did_save check is necessary because of the borrow checker.
|
||||
// The Transaction which might be returned from action.complete
|
||||
// contains a mutable reference to self.db. We need the Transaction
|
||||
// to be dropped before we can save the state using self.db instead,
|
||||
// which we achieve here by limiting the lifetime of the Transaction
|
||||
// with a new block.
|
||||
let did_save = {
|
||||
let result = action
|
||||
.complete(&ctx, &mut self.db)
|
||||
.with_context(|| format!("failed to complete migration {}", migration.name))
|
||||
.with_context(|| format!("failed to complete action: {}", description));
|
||||
|
||||
let maybe_transaction = match result {
|
||||
Ok(maybe_transaction) => {
|
||||
println!("{}", "done".green());
|
||||
maybe_transaction
|
||||
}
|
||||
Err(e) => {
|
||||
println!("{}", "failed".red());
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
// Update state with which migrations and actions have been completed.
|
||||
// Each action can create and return a transaction if they need atomicity.
|
||||
// We use this transaction to update the state to ensure the action only completes.
|
||||
// once.
|
||||
// We want to use a single transaction for each action to keep the length of
|
||||
// the transaction as short as possible. Wherever possible, we don't want to
|
||||
// use a transaction at all.
|
||||
if let Some(mut transaction) = maybe_transaction {
|
||||
self.state
|
||||
.save(&mut transaction)
|
||||
.context("failed to save state after completing action")?;
|
||||
transaction
|
||||
.commit()
|
||||
.context("failed to commit transaction")?;
|
||||
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
// If the action didn't return a transaction we save the state normally instead
|
||||
if !did_save {
|
||||
self.state
|
||||
.save(&mut self.db)
|
||||
.context("failed to save state after completing action")?;
|
||||
}
|
||||
}
|
||||
|
||||
println!();
|
||||
|
@ -1,5 +1,8 @@
|
||||
use super::{common, Action, Column, MigrationContext};
|
||||
use crate::{db::Conn, schema::Schema};
|
||||
use crate::{
|
||||
db::{Conn, Transaction},
|
||||
schema::Schema,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@ -156,7 +159,13 @@ impl Action for AddColumn {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn complete(&self, ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()> {
|
||||
fn complete<'a>(
|
||||
&self,
|
||||
ctx: &MigrationContext,
|
||||
db: &'a mut dyn Conn,
|
||||
) -> anyhow::Result<Option<Transaction<'a>>> {
|
||||
let mut transaction = db.transaction().context("failed to create transaction")?;
|
||||
|
||||
// Remove triggers and procedures
|
||||
let query = format!(
|
||||
"
|
||||
@ -166,7 +175,9 @@ impl Action for AddColumn {
|
||||
table = self.table,
|
||||
trigger_name = self.trigger_name(ctx),
|
||||
);
|
||||
db.run(&query).context("failed to drop up trigger")?;
|
||||
transaction
|
||||
.run(&query)
|
||||
.context("failed to drop up trigger")?;
|
||||
|
||||
// Update column to be NOT NULL if necessary
|
||||
if !self.column.nullable {
|
||||
@ -180,7 +191,8 @@ impl Action for AddColumn {
|
||||
table = self.table,
|
||||
constraint_name = self.not_null_constraint_name(ctx),
|
||||
);
|
||||
db.run(&query)
|
||||
transaction
|
||||
.run(&query)
|
||||
.context("failed to validate NOT NULL constraint")?;
|
||||
|
||||
// Update the column to be NOT NULL.
|
||||
@ -195,7 +207,9 @@ impl Action for AddColumn {
|
||||
table = self.table,
|
||||
column = self.temp_column_name(ctx),
|
||||
);
|
||||
db.run(&query).context("failed to set column as NOT NULL")?;
|
||||
transaction
|
||||
.run(&query)
|
||||
.context("failed to set column as NOT NULL")?;
|
||||
|
||||
// Drop the temporary constraint
|
||||
let query = format!(
|
||||
@ -206,23 +220,25 @@ impl Action for AddColumn {
|
||||
table = self.table,
|
||||
constraint_name = self.not_null_constraint_name(ctx),
|
||||
);
|
||||
db.run(&query)
|
||||
transaction
|
||||
.run(&query)
|
||||
.context("failed to drop NOT NULL constraint")?;
|
||||
}
|
||||
|
||||
// Rename the temporary column to its real name
|
||||
db.run(&format!(
|
||||
"
|
||||
transaction
|
||||
.run(&format!(
|
||||
"
|
||||
ALTER TABLE {table}
|
||||
RENAME COLUMN {temp_column_name} TO {column_name}
|
||||
",
|
||||
table = self.table,
|
||||
temp_column_name = self.temp_column_name(ctx),
|
||||
column_name = self.column.name,
|
||||
))
|
||||
.context("failed to rename column to final name")?;
|
||||
table = self.table,
|
||||
temp_column_name = self.temp_column_name(ctx),
|
||||
column_name = self.column.name,
|
||||
))
|
||||
.context("failed to rename column to final name")?;
|
||||
|
||||
Ok(())
|
||||
Ok(Some(transaction))
|
||||
}
|
||||
|
||||
fn update_schema(&self, ctx: &MigrationContext, schema: &mut Schema) {
|
||||
|
@ -1,5 +1,8 @@
|
||||
use super::{Action, MigrationContext};
|
||||
use crate::{db::Conn, schema::Schema};
|
||||
use crate::{
|
||||
db::{Conn, Transaction},
|
||||
schema::Schema,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@ -43,8 +46,12 @@ impl Action for AddIndex {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn complete(&self, _ctx: &MigrationContext, _db: &mut dyn Conn) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
fn complete<'a>(
|
||||
&self,
|
||||
_ctx: &MigrationContext,
|
||||
_db: &'a mut dyn Conn,
|
||||
) -> anyhow::Result<Option<Transaction<'a>>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn update_schema(&self, _ctx: &MigrationContext, _schema: &mut Schema) {}
|
||||
|
@ -1,5 +1,9 @@
|
||||
use super::{Action, MigrationContext};
|
||||
use crate::{db::Conn, migrations::common, schema::Schema};
|
||||
use crate::{
|
||||
db::{Conn, Transaction},
|
||||
migrations::common,
|
||||
schema::Schema,
|
||||
};
|
||||
use anyhow::{anyhow, Context};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@ -169,7 +173,11 @@ impl Action for AlterColumn {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn complete(&self, ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()> {
|
||||
fn complete<'a>(
|
||||
&self,
|
||||
ctx: &MigrationContext,
|
||||
db: &'a mut dyn Conn,
|
||||
) -> anyhow::Result<Option<Transaction<'a>>> {
|
||||
if self.can_short_circuit() {
|
||||
if let Some(new_name) = &self.changes.name {
|
||||
let query = format!(
|
||||
@ -183,7 +191,7 @@ impl Action for AlterColumn {
|
||||
);
|
||||
db.run(&query).context("failed to rename column")?;
|
||||
}
|
||||
return Ok(());
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Update column to be NOT NULL if necessary
|
||||
@ -277,7 +285,7 @@ impl Action for AlterColumn {
|
||||
db.run(&query)
|
||||
.context("failed to drop up and down triggers")?;
|
||||
|
||||
Ok(())
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn update_schema(&self, ctx: &MigrationContext, schema: &mut Schema) {
|
||||
|
@ -1,5 +1,8 @@
|
||||
use super::{Action, Column, MigrationContext};
|
||||
use crate::{db::Conn, schema::Schema};
|
||||
use crate::{
|
||||
db::{Conn, Transaction},
|
||||
schema::Schema,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use derive_builder::Builder;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -82,9 +85,13 @@ impl Action for CreateTable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn complete(&self, _ctx: &MigrationContext, _db: &mut dyn Conn) -> anyhow::Result<()> {
|
||||
fn complete<'a>(
|
||||
&self,
|
||||
_ctx: &MigrationContext,
|
||||
_db: &'a mut dyn Conn,
|
||||
) -> anyhow::Result<Option<Transaction<'a>>> {
|
||||
// Do nothing
|
||||
Ok(())
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn update_schema(&self, _ctx: &MigrationContext, _schema: &mut Schema) {}
|
||||
|
@ -1,4 +1,7 @@
|
||||
use crate::{db::Conn, schema::Schema};
|
||||
use crate::{
|
||||
db::{Conn, Transaction},
|
||||
schema::Schema,
|
||||
};
|
||||
use core::fmt::Debug;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@ -98,7 +101,11 @@ pub trait Action: Debug {
|
||||
fn describe(&self) -> String;
|
||||
fn run(&self, ctx: &MigrationContext, db: &mut dyn Conn, schema: &Schema)
|
||||
-> anyhow::Result<()>;
|
||||
fn complete(&self, ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()>;
|
||||
fn complete<'a>(
|
||||
&self,
|
||||
ctx: &MigrationContext,
|
||||
db: &'a mut dyn Conn,
|
||||
) -> anyhow::Result<Option<Transaction<'a>>>;
|
||||
fn update_schema(&self, ctx: &MigrationContext, schema: &mut Schema);
|
||||
fn abort(&self, ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()>;
|
||||
}
|
||||
|
@ -1,5 +1,8 @@
|
||||
use super::{Action, MigrationContext};
|
||||
use crate::{db::Conn, schema::Schema};
|
||||
use crate::{
|
||||
db::{Conn, Transaction},
|
||||
schema::Schema,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@ -84,12 +87,16 @@ impl Action for RemoveColumn {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn complete(&self, ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()> {
|
||||
fn complete<'a>(
|
||||
&self,
|
||||
ctx: &MigrationContext,
|
||||
db: &'a mut dyn Conn,
|
||||
) -> anyhow::Result<Option<Transaction<'a>>> {
|
||||
// Remove column, function and trigger
|
||||
let query = format!(
|
||||
"
|
||||
ALTER TABLE {table}
|
||||
DROP COLUMN {column};
|
||||
DROP COLUMN IF EXISTS {column};
|
||||
|
||||
DROP TRIGGER IF EXISTS {trigger_name} ON {table};
|
||||
DROP FUNCTION IF EXISTS {trigger_name};
|
||||
@ -101,7 +108,7 @@ impl Action for RemoveColumn {
|
||||
db.run(&query)
|
||||
.context("failed to drop column and down trigger")?;
|
||||
|
||||
Ok(())
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn update_schema(&self, _ctx: &MigrationContext, schema: &mut Schema) {
|
||||
|
@ -1,5 +1,8 @@
|
||||
use super::{Action, MigrationContext};
|
||||
use crate::{db::Conn, schema::Schema};
|
||||
use crate::{
|
||||
db::{Conn, Transaction},
|
||||
schema::Schema,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@ -23,17 +26,21 @@ impl Action for RemoveTable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn complete(&self, _ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()> {
|
||||
fn complete<'a>(
|
||||
&self,
|
||||
_ctx: &MigrationContext,
|
||||
db: &'a mut dyn Conn,
|
||||
) -> anyhow::Result<Option<Transaction<'a>>> {
|
||||
// Remove table
|
||||
let query = format!(
|
||||
"
|
||||
DROP TABLE {table};
|
||||
DROP TABLE IF EXISTS {table};
|
||||
",
|
||||
table = self.table,
|
||||
);
|
||||
db.run(&query).context("failed to drop table")?;
|
||||
|
||||
Ok(())
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn update_schema(&self, _ctx: &MigrationContext, schema: &mut Schema) {
|
||||
|
@ -1,5 +1,8 @@
|
||||
use super::{Action, MigrationContext};
|
||||
use crate::{db::Conn, schema::Schema};
|
||||
use crate::{
|
||||
db::{Conn, Transaction},
|
||||
schema::Schema,
|
||||
};
|
||||
use anyhow::Context;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@ -24,11 +27,15 @@ impl Action for RenameTable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn complete(&self, _ctx: &MigrationContext, db: &mut dyn Conn) -> anyhow::Result<()> {
|
||||
fn complete<'a>(
|
||||
&self,
|
||||
_ctx: &MigrationContext,
|
||||
db: &'a mut dyn Conn,
|
||||
) -> anyhow::Result<Option<Transaction<'a>>> {
|
||||
// Rename table
|
||||
let query = format!(
|
||||
"
|
||||
ALTER TABLE {table}
|
||||
ALTER TABLE IF EXISTS {table}
|
||||
RENAME TO {new_name}
|
||||
",
|
||||
table = self.table,
|
||||
@ -36,7 +43,7 @@ impl Action for RenameTable {
|
||||
);
|
||||
db.run(&query).context("failed to rename table")?;
|
||||
|
||||
Ok(())
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn update_schema(&self, _ctx: &MigrationContext, schema: &mut Schema) {
|
||||
|
Loading…
Reference in New Issue
Block a user