Remove explicit schema tracking

Before, we explicitly tracked the current schema and relied on that in
our migrations. This makes things more complicated as we need to keep
track of not just tables and columns but also primary keys, constraints
etc.

This commit remove the schema tracking and instead queries the
database for the current schema. During migrations, we temporarily store
the changes that are made, for example having temporary columns override
real ones and combine these with the current schema in the database.
This is handled in schema.rs.

These changes also broke our previously handling of triggers and
functions and how we detected if an insert/update was made against the
old or new schema during a migration. The previous method, using a
temporary __reshape_is_new column has been replaced with some helper
functions which inspect the search_path setting and uses that to
determine which schema is being used. During migrations, we can also set
the custom "reshape.is_old_schema" setting to force the old schema, for
example during batch updates.

This greatly simplifies the triggers as we can now simply call a helper
function in Postgres, `reshape.is_old_schema()`, to determine which
schema the modification was made for.
This commit is contained in:
fabianlindfors 2021-12-27 12:40:57 +01:00
parent fa269ee692
commit 9f23ed38c1
14 changed files with 418 additions and 378 deletions

View File

@ -12,4 +12,5 @@ anyhow = "1.0.44"
clap = "3.0.0-beta.5"
toml = "0.5"
version = "3.0.0"
colored = "2"
colored = "2"
bimap = "0.6.1"

38
src/helpers.rs Normal file
View File

@ -0,0 +1,38 @@
use anyhow::Context;
use crate::db::Conn;
pub fn setup_helpers(db: &mut dyn Conn, current_migration: &Option<String>) -> anyhow::Result<()> {
let predicate = if let Some(current_migration) = current_migration {
format!(
"current_setting('search_path') = 'migration_{}' OR setting_bool",
current_migration
)
} else {
format!("setting_bool")
};
let query = format!(
"
CREATE OR REPLACE FUNCTION reshape.is_old_schema()
RETURNS BOOLEAN AS $$
DECLARE
setting TEXT := current_setting('reshape.is_old_schema', TRUE);
setting_bool BOOLEAN := setting IS NOT NULL AND setting = 'YES';
BEGIN
RETURN {};
END
$$ language 'plpgsql';
",
predicate
);
db.query(&query)
.context("failed creating helper function reshape.is_old_schema()")?;
Ok(())
}
pub fn teardown_helpers(db: &mut dyn Conn) -> anyhow::Result<()> {
db.query("DROP FUNCTION reshape.is_old_schema;")?;
Ok(())
}

View File

@ -1,10 +1,15 @@
use crate::migrations::{Context, Migration};
use crate::{
migrations::{Context, Migration},
schema::Schema,
};
use colored::*;
use db::{Conn, DbConn};
use postgres::Config;
use schema::Table;
mod db;
mod helpers;
pub mod migrations;
mod schema;
mod state;
@ -49,12 +54,10 @@ impl Reshape {
where
T: IntoIterator<Item = Migration>,
{
self.state = State::load(&mut self.db);
// Make sure no migration is in progress
if let state::Status::InProgress {
migrations: _,
target_schema: _,
} = &self.state.status
{
if let state::Status::InProgress { migrations: _ } = &self.state.status {
println!("Migration already in progress, please complete using 'reshape complete'");
return Ok(());
}
@ -70,7 +73,9 @@ impl Reshape {
let target_migration = remaining_migrations.last().unwrap().name.to_string();
let mut new_schema = self.state.current_schema.clone();
let mut new_schema = Schema::new();
helpers::setup_helpers(&mut self.db, current_migration)?;
for (migration_index, migration) in remaining_migrations.iter().enumerate() {
println!("Migrating '{}':", migration.name);
@ -92,7 +97,7 @@ impl Reshape {
self.create_schema_for_migration(&target_migration, &new_schema)?;
// Update state once migrations have been performed
self.state.in_progress(remaining_migrations, new_schema);
self.state.in_progress(remaining_migrations);
self.state.save(&mut self.db)?;
// If we started from a blank slate, we can finish the migration immediately
@ -122,19 +127,16 @@ impl Reshape {
pub fn complete_migration(&mut self) -> anyhow::Result<()> {
// Make sure a migration is in progress
let remaining_migrations = match &self.state.status {
state::Status::InProgress {
migrations,
target_schema: _,
} => migrations,
state::Status::InProgress { migrations } => migrations,
_ => {
println!("No migration in progress");
return Ok(());
}
};
let target_migration = remaining_migrations.last().unwrap().name.to_string();
helpers::teardown_helpers(&mut self.db)?;
let mut temp_schema = self.state.current_schema.clone();
let mut temp_schema = Schema::new();
// Run all the completion changes as a transaction to avoid incomplete updates
let mut transaction = self.db.transaction()?;
@ -163,22 +165,6 @@ impl Reshape {
println!("");
}
// Remove any temporary is new columns from tables
for table in temp_schema.tables.iter_mut() {
if table.has_is_new {
table.has_is_new = false;
transaction.run(&format!(
"ALTER TABLE {table} DROP COLUMN __reshape_is_new CASCADE",
table = table.name,
))?;
// The view will automatically be dropped by CASCADE so let's recreate it
let schema = schema_name_for_migration(&target_migration);
Self::create_view_for_table(&mut transaction, table, &schema, false)?;
}
}
self.state.complete()?;
self.state.save(&mut transaction)?;
@ -190,7 +176,7 @@ impl Reshape {
fn create_schema_for_migration(
&mut self,
migration_name: &str,
schema: &schema::Schema,
schema: &Schema,
) -> anyhow::Result<()> {
// Create schema for migration
let schema_name = schema_name_for_migration(migration_name);
@ -198,8 +184,8 @@ impl Reshape {
.run(&format!("CREATE SCHEMA IF NOT EXISTS {}", schema_name))?;
// Create views inside schema
for table in &schema.tables {
Self::create_view_for_table(&mut self.db, table, &schema_name, true)?;
for table in schema.get_tables(&mut self.db)? {
Self::create_view_for_table(&mut self.db, &table, &schema_name)?;
}
Ok(())
@ -209,42 +195,23 @@ impl Reshape {
db: &mut impl Conn,
table: &Table,
schema: &str,
use_alias: bool,
) -> anyhow::Result<()> {
let mut select_columns: Vec<String> = table
let select_columns: Vec<String> = table
.columns
.iter()
.map(|column| {
if use_alias {
format!("{} AS {}", column.real_name(), column.name)
} else {
column.name.to_string()
}
})
.map(|column| format!("{} AS {}", column.real_name, column.name))
.collect();
if table.has_is_new {
select_columns.push("__reshape_is_new".to_string());
}
db.run(&format!(
"CREATE OR REPLACE VIEW {schema}.{table_name} AS
"CREATE OR REPLACE VIEW {schema}.{view_name} AS
SELECT {columns}
FROM {table_real_name}",
FROM {table_name}",
schema = schema,
table_name = table.name,
table_real_name = table.real_name(),
table_name = table.real_name,
view_name = table.name,
columns = select_columns.join(","),
))?;
if table.has_is_new {
db.run(&format!(
"ALTER VIEW {schema}.{view} ALTER __reshape_is_new SET DEFAULT TRUE",
schema = schema,
view = table.name,
))?;
}
Ok(())
}
@ -257,11 +224,7 @@ impl Reshape {
))?;
}
if let Status::InProgress {
migrations,
target_schema: _,
} = &self.state.status
{
if let Status::InProgress { migrations } = &self.state.status {
let target_migration = migrations.last().unwrap().name.to_string();
self.db.run(&format!(
"DROP SCHEMA IF EXISTS {} CASCADE",
@ -270,10 +233,10 @@ impl Reshape {
}
// Remove all tables
let schema = &self.state.current_schema;
for table in &schema.tables {
let schema = Schema::new();
for table in schema.get_tables(&mut self.db)? {
self.db
.run(&format!("DROP TABLE IF EXISTS {} CASCADE", table.name))?;
.run(&format!("DROP TABLE IF EXISTS {} CASCADE", table.real_name))?;
}
// Reset state
@ -286,10 +249,7 @@ impl Reshape {
pub fn abort(&mut self) -> anyhow::Result<()> {
let remaining_migrations = match &self.state.status {
Status::InProgress {
migrations,
target_schema: _,
} => migrations,
Status::InProgress { migrations } => migrations,
_ => {
println!("No migration is in progress");
return Ok(());
@ -297,6 +257,8 @@ impl Reshape {
};
let target_migration = remaining_migrations.last().unwrap().name.to_string();
helpers::teardown_helpers(&mut self.db)?;
// Run all the abort changes as a transaction to avoid incomplete changes
let mut transaction = self.db.transaction()?;

View File

@ -1,8 +1,5 @@
use super::{common, Action, Column, Context};
use crate::{
db::Conn,
schema::{self, Schema},
};
use crate::{db::Conn, schema::Schema};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
@ -42,7 +39,7 @@ impl Action for AddColumn {
}
fn run(&self, ctx: &Context, db: &mut dyn Conn, schema: &Schema) -> anyhow::Result<()> {
let table = schema.find_table(&self.table)?;
let table = schema.get_table(db, &self.table)?;
let mut definition_parts = vec![
self.column.name.to_string(),
@ -65,23 +62,17 @@ impl Action for AddColumn {
db.run(&query)?;
if let Some(up) = &self.up {
let query = format!(
"
ALTER TABLE {table}
ADD COLUMN IF NOT EXISTS __reshape_is_new BOOLEAN DEFAULT FALSE NOT NULL;
",
table = self.table,
);
db.run(&query)?;
let table = schema.get_table(db, &self.table)?;
let declarations: Vec<String> = table
.columns
.iter()
.map(|column| {
format!(
"{name} public.{table}.{name}%TYPE := NEW.{name};",
table = table.name,
name = column.name,
"{alias} public.{table}.{real_name}%TYPE := NEW.{real_name};",
table = table.real_name,
alias = column.name,
real_name = column.real_name,
)
})
.collect();
@ -92,7 +83,7 @@ impl Action for AddColumn {
CREATE OR REPLACE FUNCTION {trigger_name}()
RETURNS TRIGGER AS $$
BEGIN
IF NOT NEW.__reshape_is_new THEN
IF reshape.is_old_schema() THEN
DECLARE
{declarations}
BEGIN
@ -115,6 +106,11 @@ impl Action for AddColumn {
db.run(&query)?;
}
// Backfill values in batches
if self.up.is_some() {
common::batch_touch_rows(db, &table.real_name, &self.column.name)?;
}
// Add a temporary NOT NULL constraint if the column shouldn't be nullable.
// This constraint is set as NOT VALID so it doesn't apply to existing rows and
// the existing rows don't need to be scanned under an exclusive lock.
@ -133,11 +129,6 @@ impl Action for AddColumn {
db.run(&query)?;
}
// Backfill values in batches
if self.up.is_some() {
common::batch_touch_rows(db, &table.real_name(), &self.column.name)?;
}
Ok(())
}
@ -196,20 +187,7 @@ impl Action for AddColumn {
Ok(())
}
fn update_schema(&self, _ctx: &Context, schema: &mut Schema) -> anyhow::Result<()> {
let table = schema.find_table_mut(&self.table)?;
if self.up.is_some() {
table.has_is_new = true;
}
table.add_column(schema::Column {
name: self.column.name.to_string(),
real_name: None,
data_type: self.column.data_type.to_string(),
nullable: self.column.nullable,
});
fn update_schema(&self, _ctx: &Context, _schema: &mut Schema) -> anyhow::Result<()> {
Ok(())
}

View File

@ -1,8 +1,5 @@
use super::{Action, Context};
use crate::{
db::Conn,
schema::{Column, Schema},
};
use crate::{db::Conn, schema::Schema};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
@ -19,14 +16,13 @@ impl Action for AddIndex {
}
fn run(&self, _ctx: &Context, db: &mut dyn Conn, schema: &Schema) -> anyhow::Result<()> {
let table = schema.find_table(&self.table)?;
let column_real_names: Vec<&str> = self
let table = schema.get_table(db, &self.table)?;
let column_real_names: Vec<String> = table
.columns
.iter()
.map(|col| table.find_column(col))
.collect::<anyhow::Result<Vec<&Column>>>()?
.iter()
.map(|col| col.real_name())
.filter(|column| self.columns.contains(&column.name))
.map(|column| column.real_name.to_string())
.collect();
db.run(&format!(

View File

@ -25,28 +25,19 @@ impl AlterColumn {
format!("{}_new_{}", ctx.prefix(), self.column)
}
fn insert_trigger_name(&self, ctx: &Context) -> String {
fn up_trigger_name(&self, ctx: &Context) -> String {
format!(
"{}_alter_column_insert_trigger_{}_{}",
"{}_alter_column_up_trigger_{}_{}",
ctx.prefix(),
self.table,
self.column
)
}
fn update_old_trigger_name(&self, ctx: &Context) -> String {
fn down_trigger_name(&self, ctx: &Context) -> String {
format!(
"{}_alter_column_update_old_trigger_{}_{}",
ctx.prefix(),
self.table,
self.column
)
}
fn update_new_trigger_name(&self, ctx: &Context) -> String {
format!(
"{}_alter_column_update_new_trigger_{}_{}",
ctx.prefix(),
"{}_alter_column_down_trigger_{}_{}",
ctx.prefix_inverse(),
self.table,
self.column
)
@ -75,9 +66,6 @@ impl Action for AlterColumn {
}
fn run(&self, ctx: &Context, db: &mut dyn Conn, schema: &Schema) -> anyhow::Result<()> {
let table = schema.find_table(&self.table)?;
let column = table.find_column(&self.column)?;
// If we are only changing the name of a column, we don't have to do anything at this stage
// We'll set the new schema to point to the old column. When the migration is completed,
// we rename the actual column.
@ -91,7 +79,19 @@ impl Action for AlterColumn {
_ => return Err(anyhow!("missing up or down values")),
};
let temporary_column_type = self.changes.data_type.as_ref().unwrap_or(&column.data_type);
let table = schema.get_table(db, &self.table)?;
let column = table
.columns
.iter()
.find(|column| column.name == self.column)
.ok_or_else(|| anyhow!("no such column exists"))?;
let temporary_column_type = self
.changes
.data_type
.as_ref()
.unwrap_or_else(|| &column.data_type);
// Add temporary, nullable column
let query = format!(
@ -105,18 +105,16 @@ impl Action for AlterColumn {
);
db.run(&query)?;
// Add temporary is new column
db.run(&common::add_is_new_column_query(&self.table))?;
let declarations: Vec<String> = table
.columns
.iter()
.filter(|c| c.name != column.name)
.filter(|column| column.name != self.column)
.map(|column| {
format!(
"{name} public.{table}.{name}%TYPE := NEW.{name};",
table = table.name,
name = column.name,
"{alias} public.{table}.{real_name}%TYPE := NEW.{real_name};",
table = table.real_name,
alias = column.name,
real_name = column.real_name,
)
})
.collect();
@ -124,20 +122,13 @@ impl Action for AlterColumn {
// Add triggers to fill in values as they are inserted/updated
let query = format!(
"
CREATE OR REPLACE FUNCTION {insert_trigger}()
CREATE OR REPLACE FUNCTION {up_trigger}()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.__reshape_is_new THEN
IF reshape.is_old_schema() THEN
DECLARE
{declarations}
{existing_column} public.{table}.{temp_column}%TYPE := NEW.{temp_column};
BEGIN
NEW.{existing_column} = {down};
END;
ELSIF NOT NEW.__reshape_is_new THEN
DECLARE
{declarations}
{existing_column} public.{table}.{existing_column}%TYPE := NEW.{existing_column};
{existing_column} public.{table}.{existing_column_real}%TYPE := NEW.{existing_column_real};
BEGIN
NEW.{temp_column} = {up};
END;
@ -146,54 +137,41 @@ impl Action for AlterColumn {
END
$$ language 'plpgsql';
DROP TRIGGER IF EXISTS {insert_trigger} ON {table};
CREATE TRIGGER {insert_trigger} BEFORE INSERT ON {table} FOR EACH ROW EXECUTE PROCEDURE {insert_trigger}();
DROP TRIGGER IF EXISTS {up_trigger} ON {table};
CREATE TRIGGER {up_trigger} BEFORE INSERT OR UPDATE ON {table} FOR EACH ROW EXECUTE PROCEDURE {up_trigger}();
CREATE OR REPLACE FUNCTION {update_old_trigger}()
CREATE OR REPLACE FUNCTION {down_trigger}()
RETURNS TRIGGER AS $$
DECLARE
{declarations}
{existing_column} public.{table}.{existing_column}%TYPE := NEW.{existing_column_real};
BEGIN
NEW.{temp_column} = {up};
IF NOT reshape.is_old_schema() THEN
DECLARE
{declarations}
{existing_column} public.{table}.{temp_column}%TYPE := NEW.{temp_column};
BEGIN
NEW.{existing_column_real} = {down};
END;
END IF;
RETURN NEW;
END
$$ language 'plpgsql';
DROP TRIGGER IF EXISTS {update_old_trigger} ON {table};
CREATE TRIGGER {update_old_trigger} BEFORE UPDATE OF {existing_column} ON {table} FOR EACH ROW EXECUTE PROCEDURE {update_old_trigger}();
CREATE OR REPLACE FUNCTION {update_new_trigger}()
RETURNS TRIGGER AS $$
DECLARE
{declarations}
{existing_column} public.{table}.{temp_column}%TYPE := NEW.{temp_column};
BEGIN
NEW.{existing_column} = {up};
RETURN NEW;
END
$$ language 'plpgsql';
DROP TRIGGER IF EXISTS {update_new_trigger} ON {table};
CREATE TRIGGER {update_new_trigger} BEFORE UPDATE OF {temp_column} ON {table} FOR EACH ROW EXECUTE PROCEDURE {update_new_trigger}();
DROP TRIGGER IF EXISTS {down_trigger} ON {table};
CREATE TRIGGER {down_trigger} BEFORE INSERT OR UPDATE ON {table} FOR EACH ROW EXECUTE PROCEDURE {down_trigger}();
",
existing_column = column.name,
existing_column_real = column.real_name(),
existing_column = &self.column,
existing_column_real = column.real_name,
temp_column = self.temporary_column_name(ctx),
up = up,
down = down,
table = self.table,
insert_trigger = self.insert_trigger_name(ctx),
update_old_trigger = self.update_old_trigger_name(ctx),
update_new_trigger = self.update_new_trigger_name(ctx),
up_trigger = self.up_trigger_name(ctx),
down_trigger = self.down_trigger_name(ctx),
declarations = declarations.join("\n"),
);
db.run(&query)?;
// Backfill values in batches
common::batch_touch_rows(db, &table.real_name(), &column.name)?;
// Backfill values in batches by touching the previous column
common::batch_touch_rows(db, &table.real_name, &column.real_name)?;
// Add a temporary NOT NULL constraint if the column shouldn't be nullable.
// This constraint is set as NOT VALID so it doesn't apply to existing rows and
@ -233,18 +211,21 @@ impl Action for AlterColumn {
return Ok(());
}
let column = schema
.find_table(&self.table)
.and_then(|table| table.find_column(&self.column))?;
let column_name = self.changes.name.as_deref().unwrap_or(column.real_name());
let table = schema.get_table(db, &self.table)?;
let column = table
.columns
.iter()
.find(|column| column.name == self.column)
.ok_or_else(|| anyhow!("no such column exists"))?;
let column_name = self.changes.name.as_deref().unwrap_or(&column.real_name);
// Remove old column
let query = format!(
"
ALTER TABLE {} DROP COLUMN {} CASCADE
",
self.table,
column.real_name()
self.table, column.real_name
);
db.run(&query)?;
@ -262,19 +243,15 @@ impl Action for AlterColumn {
// Remove triggers and procedures
let query = format!(
"
DROP TRIGGER IF EXISTS {insert_trigger} ON {table};
DROP FUNCTION IF EXISTS {insert_trigger};
DROP TRIGGER IF EXISTS {up_trigger} ON {table};
DROP FUNCTION IF EXISTS {up_trigger};
DROP TRIGGER IF EXISTS {update_old_trigger} ON {table};
DROP FUNCTION IF EXISTS {update_old_trigger};
DROP TRIGGER IF EXISTS {update_new_trigger} ON {table};
DROP FUNCTION IF EXISTS {update_new_trigger};
DROP TRIGGER IF EXISTS {down_trigger} ON {table};
DROP FUNCTION IF EXISTS {down_trigger};
",
table = self.table,
insert_trigger = self.insert_trigger_name(ctx),
update_old_trigger = self.update_old_trigger_name(ctx),
update_new_trigger = self.update_new_trigger_name(ctx),
up_trigger = self.up_trigger_name(ctx),
down_trigger = self.down_trigger_name(ctx),
);
db.run(&query)?;
@ -322,28 +299,18 @@ impl Action for AlterColumn {
}
fn update_schema(&self, ctx: &Context, schema: &mut Schema) -> anyhow::Result<()> {
let table = schema.find_table_mut(&self.table)?;
let column = table.find_column_mut(&self.column)?;
// If we are only changing the name of a column, we haven't created a temporary column
// Instead we rename the schema column but point it to the old column
// Instead, we rename the schema column but point it to the old column
if self.can_short_circuit() {
if let Some(new_name) = &self.changes.name {
column.real_name = Some(column.real_name().to_string());
column.name = new_name.to_string();
schema.set_column_alias(&self.table, &self.column, new_name);
println!("Schema is now: {:?}", schema);
}
return Ok(());
}
column.name = self
.changes
.name
.as_ref()
.map(|n| n.to_string())
.unwrap_or(self.column.to_string());
column.real_name = Some(self.temporary_column_name(ctx));
table.has_is_new = true;
schema.set_column_alias(&self.table, &self.temporary_column_name(ctx), &self.column);
Ok(())
}
@ -352,19 +319,15 @@ impl Action for AlterColumn {
// Remove triggers and procedures
let query = format!(
"
DROP TRIGGER IF EXISTS {insert_trigger} ON {table};
DROP FUNCTION IF EXISTS {insert_trigger};
DROP TRIGGER IF EXISTS {up_trigger} ON {table};
DROP FUNCTION IF EXISTS {up_trigger};
DROP TRIGGER IF EXISTS {update_old_trigger} ON {table};
DROP FUNCTION IF EXISTS {update_old_trigger};
DROP TRIGGER IF EXISTS {update_new_trigger} ON {table};
DROP FUNCTION IF EXISTS {update_new_trigger};
DROP TRIGGER IF EXISTS {down_trigger} ON {table};
DROP FUNCTION IF EXISTS {down_trigger};
",
table = self.table,
insert_trigger = self.insert_trigger_name(ctx),
update_old_trigger = self.update_old_trigger_name(ctx),
update_new_trigger = self.update_new_trigger_name(ctx),
up_trigger = self.up_trigger_name(ctx),
down_trigger = self.down_trigger_name(ctx),
);
db.run(&query)?;
@ -379,9 +342,6 @@ impl Action for AlterColumn {
);
db.run(&query)?;
// Drop temporary "is new" column
db.run(&common::drop_is_new_column_query(&self.table))?;
Ok(())
}
}

View File

@ -20,26 +20,6 @@ fn nullable_default() -> bool {
true
}
pub fn add_is_new_column_query(table: &str) -> String {
format!(
"
ALTER TABLE {table}
ADD COLUMN IF NOT EXISTS __reshape_is_new BOOLEAN DEFAULT FALSE NOT NULL;
",
table = table,
)
}
pub fn drop_is_new_column_query(table: &str) -> String {
format!(
"
ALTER TABLE {table}
DROP COLUMN IF EXISTS __reshape_is_new;
",
table = table,
)
}
#[derive(Debug)]
struct PostgresRawValue {
bytes: Vec<u8>,
@ -86,6 +66,8 @@ impl ToSql for PostgresRawValue {
pub fn batch_touch_rows(db: &mut dyn Conn, table: &str, column: &str) -> anyhow::Result<()> {
const BATCH_SIZE: u16 = 1000;
db.query("SET reshape.is_old_schema = 'YES'")?;
let mut cursor: Option<PostgresRawValue> = None;
loop {
@ -127,12 +109,12 @@ pub fn batch_touch_rows(db: &mut dyn Conn, table: &str, column: &str) -> anyhow:
"
WITH rows AS (
SELECT {primary_key_columns}
FROM {table}
FROM public.{table}
{cursor_where}
ORDER BY {primary_key_columns}
LIMIT {batch_size}
), update AS (
UPDATE {table}
UPDATE public.{table}
SET {column} = {column}
FROM rows
WHERE {primary_key_where}
@ -162,6 +144,8 @@ pub fn batch_touch_rows(db: &mut dyn Conn, table: &str, column: &str) -> anyhow:
cursor = last_value
}
db.query("SET reshape.is_old_schema = ''")?;
Ok(())
}

View File

@ -1,8 +1,5 @@
use super::{Action, Column, Context};
use crate::{
db::Conn,
schema::{Schema, Table},
};
use crate::{db::Conn, schema::Schema};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
@ -73,20 +70,7 @@ impl Action for CreateTable {
Ok(())
}
fn update_schema(&self, _ctx: &Context, schema: &mut Schema) -> anyhow::Result<()> {
let mut table = Table::new(self.name.to_string());
table.primary_key = self.primary_key.clone();
for column in &self.columns {
table.add_column(crate::schema::Column {
name: column.name.to_string(),
real_name: None,
data_type: column.data_type.to_string(),
nullable: column.nullable,
});
}
schema.add_table(table);
fn update_schema(&self, _ctx: &Context, _schema: &mut Schema) -> anyhow::Result<()> {
Ok(())
}

View File

@ -78,7 +78,18 @@ impl Context {
}
fn prefix(&self) -> String {
format!("reshape_{}_{}", self.migration_index, self.action_index)
format!(
"__reshape_{:0>4}_{:0>4}",
self.migration_index, self.action_index
)
}
fn prefix_inverse(&self) -> String {
format!(
"__reshape_{:0>4}_{:0>4}",
1000 - self.migration_index,
1000 - self.action_index
)
}
}

View File

@ -32,16 +32,17 @@ impl Action for RemoveColumn {
fn run(&self, ctx: &Context, db: &mut dyn Conn, schema: &Schema) -> anyhow::Result<()> {
// Add down trigger
if let Some(down) = &self.down {
let table = schema.find_table(&self.table)?;
let table = schema.get_table(db, &self.table)?;
let declarations: Vec<String> = table
.columns
.iter()
.map(|column| {
format!(
"{name} public.{table}.{name}%TYPE := NEW.{name};",
table = table.name,
name = column.name,
"{alias} public.{table}.{real_name}%TYPE := NEW.{real_name};",
table = table.real_name,
alias = column.name,
real_name = column.real_name,
)
})
.collect();
@ -97,8 +98,7 @@ impl Action for RemoveColumn {
}
fn update_schema(&self, _ctx: &Context, schema: &mut Schema) -> anyhow::Result<()> {
let table = schema.find_table_mut(&self.table)?;
table.remove_column(&self.column);
schema.set_column_hidden(&self.table, &self.column);
Ok(())
}

View File

@ -31,7 +31,8 @@ impl Action for RemoveTable {
}
fn update_schema(&self, _ctx: &Context, schema: &mut Schema) -> anyhow::Result<()> {
schema.remove_table(&self.table)?;
schema.set_table_hidden(&self.table);
Ok(())
}

View File

@ -34,9 +34,8 @@ impl Action for RenameTable {
}
fn update_schema(&self, _ctx: &Context, schema: &mut Schema) -> anyhow::Result<()> {
let mut table = schema.find_table_mut(&self.table)?;
table.real_name = Some(self.table.to_string());
table.name = self.new_name.to_string();
schema.set_table_alias(&self.table, &self.new_name);
Ok(())
}

View File

@ -1,119 +1,258 @@
use crate::db::Conn;
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use bimap::BiMap;
use std::collections::{hash_map::Entry, HashMap, HashSet};
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Debug)]
pub struct Schema {
pub tables: Vec<Table>,
table_alias_to_name: BiMap<String, String>,
hidden_tables: HashSet<String>,
table_schemas_by_name: HashMap<String, TableSchema>,
}
impl Schema {
pub fn new() -> Schema {
Schema { tables: Vec::new() }
Schema {
table_alias_to_name: BiMap::new(),
hidden_tables: HashSet::new(),
table_schemas_by_name: HashMap::new(),
}
}
pub fn add_table(&mut self, table: Table) -> &mut Self {
self.tables.push(table);
self
pub fn set_table_alias(&mut self, current_name: &str, alias: &str) {
self.table_alias_to_name
.insert(alias.to_string(), current_name.to_string());
}
pub fn find_table(&self, name: &str) -> anyhow::Result<&Table> {
self.tables
.iter()
.find(|table| table.name == name)
.ok_or_else(|| anyhow!("no table {}", name))
pub fn set_table_hidden(&mut self, table: &str) {
let real_name = self
.get_table_real_name(table)
.unwrap_or_else(|| table.to_string());
self.hidden_tables.insert(real_name);
}
pub fn find_table_mut(&mut self, name: &str) -> anyhow::Result<&mut Table> {
self.tables
.iter_mut()
.find(|table| table.name == name)
.ok_or_else(|| anyhow!("no table {}", name))
pub fn set_column_alias(&mut self, table: &str, current_name: &str, alias: &str) {
let table_real_name = self
.get_table_real_name(table)
.unwrap_or_else(|| table.to_string());
let column = match self.table_schemas_by_name.entry(table_real_name) {
Entry::Occupied(o) => o.into_mut(),
Entry::Vacant(v) => v.insert(TableSchema::new()),
};
column
.column_alias_to_name
.insert(alias.to_string(), current_name.to_string());
}
pub fn remove_table(&mut self, name: &str) -> anyhow::Result<()> {
let index = self
.tables
.iter()
.position(|table| table.name == name)
.ok_or_else(|| anyhow!("no table {}", name))?;
pub fn set_column_hidden(&mut self, table: &str, name: &str) {
let table_real_name = self
.get_table_real_name(table)
.unwrap_or_else(|| table.to_string());
let column = match self.table_schemas_by_name.entry(table_real_name) {
Entry::Occupied(o) => o.into_mut(),
Entry::Vacant(v) => v.insert(TableSchema::new()),
};
self.tables.swap_remove(index);
Ok(())
column.hidden_columns.insert(name.to_string());
}
fn get_table_real_name(&self, name: &str) -> Option<String> {
self.table_alias_to_name
.get_by_left(name)
.map(|real_name| real_name.to_string())
}
fn get_table_alias_from_real_name<'a>(&'a self, real_name: &'a str) -> Option<String> {
self.table_alias_to_name
.get_by_right(real_name)
.map(|name| name.to_string())
}
fn is_table_hidden(&self, table: &str) -> bool {
let table_real_name = self
.get_table_real_name(table)
.unwrap_or_else(|| table.to_string());
self.hidden_tables.contains(&table_real_name)
}
fn get_column_real_name<'a>(&'a self, table: &'a str, name: &'a str) -> Option<&'a str> {
let table_real_name = self
.get_table_real_name(table)
.unwrap_or_else(|| table.to_string());
self.table_schemas_by_name
.get(&table_real_name)
.map(|column| column.get_column_real_name(name))
}
fn get_column_alias_from_real_name(&self, table: &str, real_name: &str) -> Option<String> {
let table_real_name = self
.get_table_real_name(table)
.unwrap_or_else(|| table.to_string());
self.table_schemas_by_name
.get(&table_real_name)
.and_then(|column| column.get_column_alias_from_real_name(real_name))
}
fn is_column_hidden(&self, table: &str, name: &str) -> bool {
let table_real_name = self
.get_table_real_name(table)
.unwrap_or_else(|| table.to_string());
self.table_schemas_by_name
.get(&table_real_name)
.map(|column| {
let alias = column
.get_column_alias_from_real_name(name)
.unwrap_or_else(|| name.to_string());
column.hidden_columns.contains(&alias)
})
.unwrap_or(false)
}
}
impl Default for Schema {
fn default() -> Self {
Self::new()
#[derive(Debug)]
struct TableSchema {
column_alias_to_name: BiMap<String, String>,
hidden_columns: HashSet<String>,
}
impl TableSchema {
fn new() -> TableSchema {
TableSchema {
column_alias_to_name: BiMap::new(),
hidden_columns: HashSet::new(),
}
}
fn get_column_real_name<'a>(&'a self, name: &'a str) -> &'a str {
if let Some(real_name) = self.column_alias_to_name.get_by_left(name) {
real_name
} else {
name
}
}
fn get_column_alias_from_real_name(&self, real_name: &str) -> Option<String> {
self.column_alias_to_name
.get_by_right(real_name)
.map(|name| name.to_string())
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Debug)]
pub struct Table {
pub name: String,
pub real_name: String,
pub columns: Vec<Column>,
pub primary_key: Vec<String>,
#[serde(skip)]
pub real_name: Option<String>,
#[serde(skip)]
pub has_is_new: bool,
}
impl Table {
pub fn new(name: impl Into<String>) -> Table {
Table {
name: name.into(),
real_name: None,
primary_key: vec![],
columns: vec![],
has_is_new: false,
}
}
pub fn real_name(&self) -> &str {
self.real_name.as_ref().unwrap_or(&self.name)
}
pub fn add_column(&mut self, column: Column) -> &mut Self {
self.columns.push(column);
self
}
pub fn remove_column(&mut self, column_name: &str) -> &mut Self {
if let Some(index) = self.columns.iter().position(|c| c.name == column_name) {
self.columns.remove(index);
}
self
}
pub fn find_column(&self, name: &str) -> anyhow::Result<&Column> {
self.columns
.iter()
.find(|column| column.name == name)
.ok_or_else(|| anyhow!("no column {} on table {}", name, self.name))
}
pub fn find_column_mut(&mut self, name: &str) -> anyhow::Result<&mut Column> {
self.columns
.iter_mut()
.find(|column| column.name == name)
.ok_or(anyhow!("no column {} on table {}", name, self.name))
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Debug)]
pub struct Column {
pub name: String,
pub real_name: String,
pub data_type: String,
pub nullable: bool,
#[serde(skip)]
pub real_name: Option<String>,
}
impl Column {
pub fn real_name(&self) -> &str {
self.real_name.as_ref().unwrap_or(&self.name)
impl Schema {
pub fn get_tables(&self, db: &mut dyn Conn) -> anyhow::Result<Vec<Table>> {
db.query(
"
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
",
)?
.iter()
.map(|row| row.get::<'_, _, String>("table_name"))
.filter(|real_name| !self.is_table_hidden(real_name))
.map(|real_name| self.get_table_by_real_name(db, &real_name))
.collect()
}
pub fn get_table(&self, db: &mut dyn Conn, table_name: &str) -> anyhow::Result<Table> {
let real_table_name = self
.get_table_real_name(table_name)
.unwrap_or_else(|| table_name.to_string());
self.get_table_by_real_name(db, &real_table_name)
}
pub fn get_table_by_real_name(
&self,
db: &mut dyn Conn,
real_table_name: &str,
) -> anyhow::Result<Table> {
if self.is_table_hidden(real_table_name) {
return Err(anyhow!("no table named {}", real_table_name));
}
let real_columns: Vec<(String, String, bool)> = db
.query(&format!(
"
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = '{table}' AND table_schema = 'public'
ORDER BY ordinal_position
",
table = real_table_name,
))?
.iter()
.map(|row| {
(
row.get("column_name"),
row.get("data_type"),
row.get::<'_, _, String>("is_nullable") == "YES",
)
})
.collect();
let mut columns: Vec<Column> = Vec::new();
for (column_name, data_type, nullable) in real_columns {
if self.is_column_hidden(real_table_name, &column_name) {
continue;
}
if is_column_temporary(&column_name) {
continue;
}
let (name, real_name) = if let Some(alias) =
self.get_column_alias_from_real_name(real_table_name, &column_name)
{
(alias, column_name)
} else {
let real_name = self
.get_column_real_name(real_table_name, &column_name)
.unwrap_or(&column_name);
(column_name.to_string(), real_name.to_string())
};
columns.push(Column {
name,
real_name,
data_type,
nullable,
});
}
let table_name = self
.get_table_alias_from_real_name(real_table_name)
.unwrap_or_else(|| real_table_name.to_string());
let table = Table {
name: table_name,
real_name: real_table_name.to_string(),
columns: columns,
};
Ok(table)
}
}
fn is_column_temporary(name: &str) -> bool {
name.starts_with("__reshape_")
}

View File

@ -1,4 +1,3 @@
use crate::schema::Schema;
use crate::{db::Conn, migrations::Migration};
use anyhow::anyhow;
@ -9,7 +8,6 @@ use version::version;
pub struct State {
pub version: String,
pub status: Status,
pub current_schema: Schema,
pub current_migration: Option<String>,
pub migrations: Vec<Migration>,
}
@ -21,10 +19,7 @@ pub enum Status {
Idle,
#[serde(rename = "in_progress")]
InProgress {
migrations: Vec<Migration>,
target_schema: Schema,
},
InProgress { migrations: Vec<Migration> },
}
impl State {
@ -61,7 +56,6 @@ impl State {
let default = Self::default();
self.status = default.status;
self.current_migration = default.current_migration;
self.current_schema = default.current_schema;
self.migrations = default.migrations;
Ok(())
@ -106,24 +100,19 @@ impl State {
self.status = current_status;
return Err(anyhow!("status is not in progress"));
}
Status::InProgress {
mut migrations,
target_schema,
} => {
Status::InProgress { mut migrations } => {
let target_migration = migrations.last().unwrap().name.to_string();
self.migrations.append(&mut migrations);
self.current_migration = Some(target_migration);
self.current_schema = target_schema;
}
}
Ok(())
}
pub fn in_progress(&mut self, new_migrations: Vec<Migration>, new_schema: Schema) {
pub fn in_progress(&mut self, new_migrations: Vec<Migration>) {
self.status = Status::InProgress {
migrations: new_migrations,
target_schema: new_schema,
};
}
@ -146,7 +135,6 @@ impl State {
}
let items: Vec<Migration> = new_iter.collect();
println!("Remaining migrations count: {:?}", items);
// Return the remaining migrations
Ok(items)
@ -166,7 +154,6 @@ impl Default for State {
version: version!().to_string(),
status: Status::Idle,
current_migration: None,
current_schema: Schema::new(),
migrations: vec![],
}
}