Fix issue where column couldn't be altered multiple times

Previously, having multiple alter_column actions for a single column
could cause issues as the triggers and batch updates referenced the
wrong columns.

This commit also simplifies the batch update procedure. Rather than
running the actual update on existing rows, a NOP update will be
run which in turn will trigger the triggers to update the new temporary
columns.
This commit is contained in:
fabianlindfors 2021-11-15 00:11:19 +01:00
parent 94c0aefe50
commit 3cb74a535e
4 changed files with 107 additions and 17 deletions

View File

@ -134,8 +134,8 @@ impl Action for AddColumn {
} }
// Backfill values in batches // Backfill values in batches
if let Some(up) = &self.up { if self.up.is_some() {
common::batch_update(db, table, &self.column.name, up)?; common::batch_touch_rows(db, table, &self.column.name)?;
} }
Ok(()) Ok(())

View File

@ -154,7 +154,7 @@ impl Action for AlterColumn {
RETURNS TRIGGER AS $$ RETURNS TRIGGER AS $$
DECLARE DECLARE
{declarations} {declarations}
{existing_column} public.{table}.{existing_column}%TYPE := NEW.{existing_column}; {existing_column} public.{table}.{existing_column}%TYPE := NEW.{existing_column_real};
BEGIN BEGIN
NEW.{temp_column} = {up}; NEW.{temp_column} = {up};
RETURN NEW; RETURN NEW;
@ -180,6 +180,7 @@ impl Action for AlterColumn {
CREATE TRIGGER {update_new_trigger} BEFORE UPDATE OF {temp_column} ON {table} FOR EACH ROW EXECUTE PROCEDURE {update_new_trigger}(); CREATE TRIGGER {update_new_trigger} BEFORE UPDATE OF {temp_column} ON {table} FOR EACH ROW EXECUTE PROCEDURE {update_new_trigger}();
", ",
existing_column = column.name, existing_column = column.name,
existing_column_real = column.real_name(),
temp_column = self.temporary_column_name(ctx), temp_column = self.temporary_column_name(ctx),
up = up, up = up,
down = down, down = down,
@ -191,6 +192,9 @@ impl Action for AlterColumn {
); );
db.run(&query)?; db.run(&query)?;
// Backfill values in batches
common::batch_touch_rows(db, table, &column.name)?;
// Add a temporary NOT NULL constraint if the column shouldn't be nullable. // Add a temporary NOT NULL constraint if the column shouldn't be nullable.
// This constraint is set as NOT VALID so it doesn't apply to existing rows and // This constraint is set as NOT VALID so it doesn't apply to existing rows and
// the existing rows don't need to be scanned under an exclusive lock. // the existing rows don't need to be scanned under an exclusive lock.
@ -209,9 +213,6 @@ impl Action for AlterColumn {
db.run(&query)?; db.run(&query)?;
} }
// Backfill values in batches
common::batch_update(db, table, &self.temporary_column_name(ctx), up)?;
Ok(()) Ok(())
} }

View File

@ -83,14 +83,9 @@ impl ToSql for PostgresRawValue {
postgres::types::to_sql_checked!(); postgres::types::to_sql_checked!();
} }
const BATCH_SIZE: u16 = 1000; pub fn batch_touch_rows(db: &mut dyn Conn, table: &Table, column: &str) -> anyhow::Result<()> {
const BATCH_SIZE: u16 = 1000;
pub fn batch_update(
db: &mut dyn Conn,
table: &Table,
column: &str,
up: &str,
) -> anyhow::Result<()> {
let mut cursor: Option<PostgresRawValue> = None; let mut cursor: Option<PostgresRawValue> = None;
loop { loop {
@ -139,7 +134,7 @@ pub fn batch_update(
LIMIT {batch_size} LIMIT {batch_size}
), update AS ( ), update AS (
UPDATE {table} UPDATE {table}
SET {temp_column} = {up} SET {column} = {column}
FROM rows FROM rows
WHERE {primary_key_where} WHERE {primary_key_where}
RETURNING {returning_columns} RETURNING {returning_columns}
@ -152,8 +147,7 @@ pub fn batch_update(
primary_key_columns = primary_key_columns, primary_key_columns = primary_key_columns,
cursor_where = cursor_where, cursor_where = cursor_where,
batch_size = BATCH_SIZE, batch_size = BATCH_SIZE,
temp_column = column, column = column,
up = up,
primary_key_where = primary_key_where, primary_key_where = primary_key_where,
returning_columns = returning_columns, returning_columns = returning_columns,
); );
@ -165,9 +159,9 @@ pub fn batch_update(
if last_value.is_none() { if last_value.is_none() {
break; break;
} }
println!("{}", query);
cursor = last_value cursor = last_value
} }
Ok(()) Ok(())
} }

View File

@ -291,3 +291,98 @@ fn alter_column_rename() {
reshape.complete_migration().unwrap(); reshape.complete_migration().unwrap();
} }
#[test]
fn alter_column_multiple() {
let (mut reshape, mut old_db, mut new_db) = common::setup();
let create_users_table = Migration::new("create_users_table", None).with_action(CreateTable {
name: "users".to_string(),
primary_key: vec!["id".to_string()],
foreign_keys: vec![],
columns: vec![
Column {
name: "id".to_string(),
data_type: "SERIAL".to_string(),
nullable: true,
default: None,
},
Column {
name: "counter".to_string(),
data_type: "INTEGER".to_string(),
nullable: false,
default: None,
},
],
});
let increment_counter_twice = Migration::new("increment_counter_twice", None)
.with_action(AlterColumn {
table: "users".to_string(),
column: "counter".to_string(),
up: Some("counter + 1".to_string()),
down: Some("counter - 1".to_string()),
changes: ColumnChanges {
data_type: None,
nullable: None,
name: None,
},
})
.with_action(AlterColumn {
table: "users".to_string(),
column: "counter".to_string(),
up: Some("counter + 1".to_string()),
down: Some("counter - 1".to_string()),
changes: ColumnChanges {
data_type: None,
nullable: None,
name: None,
},
});
let first_migrations = vec![create_users_table.clone()];
let second_migrations = vec![create_users_table.clone(), increment_counter_twice.clone()];
// Run first migration, should automatically finish
reshape.migrate(first_migrations.clone()).unwrap();
assert!(matches!(reshape.state.status, Status::Idle));
assert_eq!(
Some(&create_users_table.name),
reshape.state.current_migration.as_ref()
);
// Update search paths
old_db
.simple_query(&reshape::schema_query_for_migration(
&first_migrations.last().unwrap().name,
))
.unwrap();
// Insert some test data
old_db
.simple_query(
"
INSERT INTO users (id, counter) VALUES
(1, 0),
(2, 100);
",
)
.unwrap();
// Run second migration
reshape.migrate(second_migrations.clone()).unwrap();
new_db
.simple_query(&reshape::schema_query_for_migration(
&second_migrations.last().unwrap().name,
))
.unwrap();
// Check that the existing data has been updated
let expected = vec![2, 102];
let results: Vec<i32> = new_db
.query("SELECT counter FROM users ORDER BY id", &[])
.unwrap()
.iter()
.map(|row| row.get::<_, i32>("counter"))
.collect();
assert_eq!(expected, results);
}