Initial commit

This commit is contained in:
fabianlindfors 2021-10-19 16:32:37 +02:00
commit 7d8302a0a4
22 changed files with 2642 additions and 0 deletions

25
.github/workflows/lint.yaml vendored Normal file
View File

@ -0,0 +1,25 @@
name: Lint
on:
push:
branches: [ main ]
env:
CARGO_TERM_COLOR: always
jobs:
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Select Rust toolchain with Clippy
uses: actions-rs/toolchain@v1
with:
toolchain: stable
components: clippy
override: true
- name: Use cache for Rust dependencies
uses: Swatinem/rust-cache@v1
- name: Lint using Clippy
run: cargo clippy

29
.github/workflows/release.yaml vendored Normal file
View File

@ -0,0 +1,29 @@
name: Release
on:
release:
types: [ created ]
env:
CARGO_TERM_COLOR: always
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Select Rust toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Use cache for Rust dependencies
uses: Swatinem/rust-cache@v1
- name: Build
run: cargo build --release && mv target/release/reshape ./reshape-linux_amd64
- name: Save binary to release
uses: skx/github-action-publish-binaries@master
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
args: reshape-linux_amd64

42
.github/workflows/test.yaml vendored Normal file
View File

@ -0,0 +1,42 @@
name: Tests
on:
push:
branches: [ main ]
env:
CARGO_TERM_COLOR: always
jobs:
integration-tests:
runs-on: ubuntu-latest
services:
postgres:
image: postgres
ports:
- 5432:5432
env:
POSTGRES_DB: migra_test
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Select Rust toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Use cache for Rust dependencies
uses: Swatinem/rust-cache@v1
- name: Run integration tests
run: cargo test -- --test-threads=1
env:
POSTGRES_CONNECTION_STRING: "postgres://postgres:postgres@127.0.0.1/migra_test"

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/target
Cargo.lock

15
Cargo.toml Normal file
View File

@ -0,0 +1,15 @@
[package]
name = "reshape"
version = "0.1.0"
edition = "2021"
[dependencies]
postgres = { version = "0.19.2", features = ["with-serde_json-1"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
typetag = "0.1.7"
anyhow = "1.0.44"
clap = "3.0.0-beta.5"
toml = "0.5"
version = "3.0.0"
colored = "2"

245
README.md Normal file
View File

@ -0,0 +1,245 @@
# Reshape
[![Test status badge](https://github.com/fabianlindfors/Reshape/actions/workflows/test.yaml/badge.svg)](https://github.com/fabianlindfors/Reshape/actions/workflows/test.yaml)
Reshape is an easy-to-use, zero-downtime schema migration tool for Postgres. It automatically handles complex migrations that would normally require downtime or manual multi-step changes. During a migration, Reshape ensures both the old and new schema are available at the same time, allowing you to gradually roll out your application.
*Note: Reshape is **experimental** and should not be used in production. It can (and probably will) destroy your data and break your application.*
- [Getting started]()
- [Installation]()
- [Creating your first migration]()
- [Preparing your application]()
- [Running your migration]()
- [Writing migrations]()
- [Basics]()
- [Tables]()
- [Create table]()
- [Rename table]()
- [Drop table]()
- [Columns]()
- [Create column]()
- [Alter column]()
- [Remove column]()
- [How it works]()
## Getting started
### Installation
On macOS:
```brew install reshape```
On Debian:
```apt-get install reshape```
### Creating your first migration
Each migration should be stored as a separate file under `migrations/`. The files can be in either JSON or TOML format. The name of the file will become the name of your migration and they will be sorted by file name. We recommend prefixing every migration with an incrementing number.
Let's create a simple migration to set up a new table `users` with two fields, `id` and `name`. We'll create a file called `migration/1_create_users_table.toml`:
```toml
[[actions]]
type = "create_table"
table = "users"
[[actions.columns]]
name = "id"
type = "SERIAL"
[[actions.columns]]
name = "name"
type = "TEXT"
```
This is the equivalent of running `CREATE TABLE users (id SERIAL, name TEXT)`.
### Preparing your application
Reshape relies on your application using a specific schema. When establishing the connection to Postgres in your application, you need to run a query to select the most recent schema. This query can be generated using: `reshape generate-schema-query`.
To pass it along to your application, you could use an environment variable in your build script: `RESHAPE_SCHEMA_QUERY=$(reshape generate-schema-query)`. Then in your application:
```python
# Example for Python
reshape_schema_query = os.getenv("RESHAPE_SCHEMA_QUERY")
db.execute(reshape_schema_query)
```
### Running your migration
To create your new `users` table, run:
```bash
reshape migrate
```
As this is the first migration, Reshape will automatically complete it. For subsequent migrations, you will need to first run `reshape migrate`, roll out your application and then complete the migration using `reshape complete`.
## Writing migrations
### Basics
Every migration consists of one or more actions. The actions will be run sequentially. Here's an example of a migration with two actions to create two tables, `customers` and `products`:
```toml
[[actions]]
type = "create_table"
table = "customers"
[[actions.columns]]
name = "id"
type = "SERIAL"
[[actions]]
type = "create_table"
table = "products"
[[actions.columns]]
name = "sku"
type = "TEXT"
```
Every action has a `type`. The supported types are detailed below.
### Create table
The `create_table` action will create a new table with the specified columns and indices.
*Example: creating a `customers` table with a few columns and a primary key*
```toml
[[actions]]
type = "create_table"
table = "customers"
primary_key = "id"
[[actions.columns]]
name = "id"
type = "SERIAL"
[[actions.columns]]
name = "name"
type = "SERIAL"
# Columns default to nullable
nullable = false
# default can be any valid SQL value, in this case a string literal
default = "'PLACEHOLDER'"
```
### Add column
The `add_column` action will add a new column to an existing table. You can optionally provide an `up` setting. This should be an SQL expression which will be run for all existing rows to backfill the new column.
*Example: add a new column `reference` to table `products`*
```toml
[[actions]]
type = "add_column"
table = "products"
[actions.column]
name = "reference"
type = "INTEGER"
nullable = false
default = "10"
```
*Example: replace an existing `name` column with two new columns, `first_name` and `last_name`*
```toml
[[actions]]
type = "add_column"
table = "users"
# Extract the first name from the existing name column
up = "(STRING_TO_ARRAY(name, ' '))[1]"
[actions.column]
name = "first_name"
type = "TEXT"
[[actions]]
type = "add_column"
table = "users"
# Extract the last name from the existing name column
up = "(STRING_TO_ARRAY(name, ' '))[2]"
[actions.column]
name = "last_name"
type = "TEXT"
[[actions]]
type = "remove_column"
table = "users"
column = "name"
# Reconstruct name column by concatenating first and last name
down = "first_name || ' ' || last_name"
```
### Alter column
The `alter_column` action enables many different changes to an existing column, for example renaming, changing type and changing existing values.
When performing more complex changes than a rename, `up` and `down` must be provided. These should be set to SQL expressions which determine how to transform between the new and old version of the column. Inside those expressions, you can reference the current column value by the column name.
*Example: rename `last_name` column on `users` table to `family_name`*
```toml
[[actions]]
type = "alter_column"
table = "users"
column = "last_name"
[actions.changes]
name = "family_name"
```
*Example: change the type of `reference` column from `INTEGER` to `TEXT`*
```toml
[[actions]]
type = "alter_column"
table = "users"
column = "reference"
up = "CAST(reference AS TEXT)" # Converts from integer value to text
down = "CAST(reference AS INTEGER)" # Converts from text value to integer
[actions.changes]
type = "TEXT" # Previous type was 'INTEGER'
```
*Example: increment all values of a `index` column by one*
```toml
[[actions]]
type = "alter_column"
table = "users"
up = "index + 1" # Increment for new schema
down = "index - 1" # Decrement to revert for old schema
[actions.changes]
name = "index"
```
## How it works
Reshape works by creating views that encapsulate the underlying tables, which your application will interact with. During a migration, Reshape will automatically create a new set of views and set up triggers to translate inserts and updates between the old and new schema. This means that every migration is a two phase process:
1. **Start migration** (`reshape migrate`): Create new views to ensure both the new and old schema are usable at the same time.
- After phase one is complete, you can start the roll out of your application. Once the roll out is complete, the second phase can be run.
2. **Complete migration** (`reshape complete`): Removes the old schema and any intermediate data.

80
src/db.rs Normal file
View File

@ -0,0 +1,80 @@
use postgres::{types::ToSql, NoTls, Row};
pub trait Conn {
fn run(&mut self, query: &str) -> anyhow::Result<()>;
fn query(&mut self, query: &str) -> anyhow::Result<Vec<Row>>;
fn query_with_params(
&mut self,
query: &str,
params: &[&(dyn ToSql + Sync)],
) -> anyhow::Result<Vec<Row>>;
}
pub struct DbConn {
client: postgres::Client,
}
impl DbConn {
pub fn connect(connection_string: &str) -> anyhow::Result<DbConn> {
let client = postgres::Client::connect(connection_string, NoTls)?;
Ok(DbConn { client })
}
pub fn transaction(&mut self) -> anyhow::Result<Transaction> {
let transaction = self.client.transaction()?;
Ok(Transaction { transaction })
}
}
impl Conn for DbConn {
fn run(&mut self, query: &str) -> anyhow::Result<()> {
self.client.batch_execute(query)?;
Ok(())
}
fn query(&mut self, query: &str) -> anyhow::Result<Vec<Row>> {
let rows = self.client.query(query, &[])?;
Ok(rows)
}
fn query_with_params(
&mut self,
query: &str,
params: &[&(dyn ToSql + Sync)],
) -> anyhow::Result<Vec<Row>> {
let rows = self.client.query(query, params)?;
Ok(rows)
}
}
pub struct Transaction<'a> {
transaction: postgres::Transaction<'a>,
}
impl Transaction<'_> {
pub fn commit(self) -> anyhow::Result<()> {
self.transaction.commit()?;
Ok(())
}
}
impl Conn for Transaction<'_> {
fn run(&mut self, query: &str) -> anyhow::Result<()> {
self.transaction.batch_execute(query)?;
Ok(())
}
fn query(&mut self, query: &str) -> anyhow::Result<Vec<Row>> {
let rows = self.transaction.query(query, &[])?;
Ok(rows)
}
fn query_with_params(
&mut self,
query: &str,
params: &[&(dyn ToSql + Sync)],
) -> anyhow::Result<Vec<Row>> {
let rows = self.transaction.query(query, params)?;
Ok(rows)
}
}

297
src/lib.rs Normal file
View File

@ -0,0 +1,297 @@
use crate::migrations::Migration;
use colored::*;
use db::{Conn, DbConn};
use schema::Table;
mod db;
pub mod migrations;
mod schema;
mod state;
pub use crate::state::{State, Status};
pub struct Reshape {
pub state: State,
db: DbConn,
}
impl Reshape {
pub fn new(connection_string: &str) -> anyhow::Result<Reshape> {
let mut db = DbConn::connect(connection_string)?;
let state = State::load(&mut db);
Ok(Reshape { db, state })
}
pub fn migrate<T>(&mut self, migrations: T) -> anyhow::Result<()>
where
T: IntoIterator<Item = Migration>,
{
self.state.set_migrations(migrations)?;
self.state.save(&mut self.db)?;
// Make sure no migration is in progress
if let state::Status::InProgress {
target_migration: _,
target_schema: _,
} = &self.state.status
{
println!("Migration already in progress, please complete using 'reshape complete'");
return Ok(());
}
let current_migration = &self.state.current_migration.clone();
let remaining_migrations = Self::get_remaining_migrations(&self.state);
if remaining_migrations.is_empty() {
println!("No migrations left to apply");
return Ok(());
}
println!(" Applying {} migrations\n", remaining_migrations.len());
let target_migration = remaining_migrations.last().unwrap().name.to_string();
let mut new_schema = self.state.current_schema.clone();
for migration in remaining_migrations {
println!("Migrating '{}':", migration.name);
for step in &migration.actions {
print!(" + {} ", step.describe());
step.run(&mut self.db, &new_schema)?;
step.update_schema(&mut new_schema)?;
println!("{}", "done".green());
}
println!("");
}
// Create schema and views for migration
self.create_schema_for_migration(&target_migration, &new_schema)?;
// Update state once migrations have been performed
self.state.status = state::Status::InProgress {
target_migration: target_migration.to_string(),
target_schema: new_schema,
};
self.state.save(&mut self.db)?;
// If we started from a blank slate, we can finish the migration immediately
if current_migration.is_none() {
println!("Automatically completing migrations\n");
self.complete_migration()?;
println!("Migrations complete:");
println!(
" - Run '{}' from your application to use the latest schema",
generate_schema_query(&target_migration)
);
} else {
println!("Migrations have been applied and the new schema is ready for use:");
println!(
" - Run '{}' from your application to use the latest schema",
generate_schema_query(&target_migration)
);
println!(
" - Run 'reshape complete' once your application has been updated and the previous schema is no longer in use"
);
}
Ok(())
}
pub fn complete_migration(&mut self) -> anyhow::Result<()> {
// Make sure a migration is in progress
let (target_migration, target_schema) = match &self.state.status {
state::Status::InProgress {
target_migration,
target_schema,
} => (target_migration, target_schema),
_ => {
println!("No migration in progress");
return Ok(());
}
};
let remaining_migrations = Self::get_remaining_migrations(&self.state);
let mut temp_schema = self.state.current_schema.clone();
// Run all the completion changes as a transaction to avoid incomplete updates
let mut transaction = self.db.transaction()?;
// Remove previous migration's schema
if let Some(current_migration) = &self.state.current_migration {
let query = format!(
"DROP SCHEMA IF EXISTS {} CASCADE",
schema_name_for_migration(current_migration)
);
transaction.run(&query)?;
}
for migration in remaining_migrations {
println!("Completing '{}':", migration.name);
for step in &migration.actions {
print!(" + {} ", step.describe());
step.complete(&mut transaction, &temp_schema)?;
step.update_schema(&mut temp_schema)?;
println!("{}", "done".green());
}
println!("");
}
// Remove any temporary is new columns from tables
for table in temp_schema.tables.values_mut() {
if table.has_is_new {
table.has_is_new = false;
let query = format!(
"ALTER TABLE {table} DROP COLUMN __reshape_is_new CASCADE",
table = table.name,
);
transaction.run(&query)?;
// The view will automatically be dropped by CASCADE so let's recreate it
let schema = schema_name_for_migration(target_migration);
Self::create_view_for_table(&mut transaction, table, &schema, false)?;
}
}
self.state.current_migration = Some(target_migration.to_string());
self.state.current_schema = target_schema.clone();
self.state.status = state::Status::Idle;
self.state.save(&mut transaction)?;
transaction.commit()?;
Ok(())
}
fn create_schema_for_migration(
&mut self,
migration_name: &str,
schema: &schema::Schema,
) -> anyhow::Result<()> {
// Create schema for migration
let schema_name = schema_name_for_migration(migration_name);
let create_schema_query = format!("CREATE SCHEMA IF NOT EXISTS {}", schema_name);
self.db.run(&create_schema_query)?;
// Create views inside schema
for table in schema.tables.values() {
Self::create_view_for_table(&mut self.db, table, &schema_name, true)?;
}
Ok(())
}
fn create_view_for_table(
db: &mut impl Conn,
table: &Table,
schema: &str,
use_alias: bool,
) -> anyhow::Result<()> {
let mut select_columns: Vec<String> = table
.columns
.iter()
.map(|column| {
if use_alias {
format!("{} AS {}", column.real_name(), column.name)
} else {
column.name.to_string()
}
})
.collect();
if table.has_is_new {
select_columns.push("__reshape_is_new".to_string());
}
let query = format!(
"CREATE OR REPLACE VIEW {schema}.{table} AS
SELECT {columns}
FROM {table}",
schema = schema,
table = table.name,
columns = select_columns.join(","),
);
db.run(&query)?;
if table.has_is_new {
let query = format!(
"ALTER VIEW {schema}.{view} ALTER __reshape_is_new SET DEFAULT TRUE",
schema = schema,
view = table.name,
);
db.run(&query)?;
}
Ok(())
}
fn get_remaining_migrations(state: &State) -> Vec<&Migration> {
match &state.current_migration {
Some(current_migration) => state
.migrations
.iter()
.skip_while(|migration| &migration.name != current_migration)
.skip(1)
.collect(),
None => state.migrations.iter().collect(),
}
}
pub fn remove(&mut self) -> anyhow::Result<()> {
// Remove migration schemas and views
if let Some(current_migration) = &self.state.current_migration {
let query = format!(
"DROP SCHEMA IF EXISTS {} CASCADE",
schema_name_for_migration(current_migration)
);
self.db.run(&query)?;
}
if let Status::InProgress {
target_migration,
target_schema: _,
} = &self.state.status
{
let query = format!(
"DROP SCHEMA IF EXISTS {} CASCADE",
schema_name_for_migration(target_migration)
);
self.db.run(&query)?;
}
// Remove all tables
let schema = &self.state.current_schema;
for table in schema.tables.values() {
let query = format!("DROP TABLE IF EXISTS {} CASCADE", table.name);
self.db.run(&query)?;
}
// Reset state
self.state.clear(&mut self.db)?;
println!("Reshape and all data has been removed");
Ok(())
}
pub fn latest_schema(&self) -> Option<String> {
self.state
.migrations
.last()
.map(|migration| schema_name_for_migration(&migration.name))
}
}
pub fn generate_schema_query(migration_name: &str) -> String {
let schema_name = schema_name_for_migration(migration_name);
format!("SET search_path TO {}", schema_name)
}
fn schema_name_for_migration(migration_name: &str) -> String {
format!("migration_{}", migration_name)
}

125
src/main.rs Normal file
View File

@ -0,0 +1,125 @@
use std::{
fs::{self, DirEntry, File},
io::Read,
path::{Path, PathBuf},
};
use clap::Parser;
use reshape::{
migrations::{Action, Migration},
Reshape,
};
use serde::{Deserialize, Serialize};
#[derive(Parser)]
struct Opts {
#[clap(subcommand)]
cmd: Command,
#[clap(default_value = "postgres://postgres:postgres@localhost:5432/postgres")]
url: String,
}
#[derive(Parser)]
enum Command {
Migrate,
Finish,
Remove,
LatestSchema,
}
fn main() {
let opts: Opts = Opts::parse();
let result = run(opts);
if let Err(e) = result {
println!("Error: {}", e);
}
}
fn run(opts: Opts) -> anyhow::Result<()> {
let mut reshape = Reshape::new(&opts.url)?;
match opts.cmd {
Command::Migrate => migrate(&mut reshape),
Command::Finish => reshape.complete_migration(),
Command::Remove => reshape.remove(),
Command::LatestSchema => {
println!(
"{}",
reshape.latest_schema().unwrap_or_else(|| "".to_string())
);
Ok(())
}
}
}
fn migrate(reshape: &mut Reshape) -> anyhow::Result<()> {
let migrations = find_migrations()?;
reshape.migrate(migrations)
}
fn find_migrations() -> anyhow::Result<Vec<Migration>> {
let path = Path::new("migrations");
// Return early if path doesn't exist
if !path.exists() {
return Ok(Vec::new());
}
let mut paths: Vec<PathBuf> = fs::read_dir(path)?
.collect::<std::io::Result<Vec<DirEntry>>>()?
.iter()
.map(|entry| entry.path())
.collect();
// Sort all files by their file names (without extension)
paths.sort_unstable_by_key(|path| path.as_path().file_stem().unwrap().to_os_string());
paths
.iter()
.map(|path| {
let mut file = File::open(path)?;
// Read file data
let mut data = String::new();
file.read_to_string(&mut data)?;
Ok((path, data))
})
.map(|result| {
result.and_then(|(path, data)| {
let extension = path.extension().and_then(|ext| ext.to_str()).unwrap();
let file_migration = decode_migration_file(&data, extension)?;
let file_name = path.file_stem().and_then(|name| name.to_str()).unwrap();
Ok(Migration {
name: file_migration.name.unwrap_or_else(|| file_name.to_string()),
description: file_migration.description,
actions: file_migration.actions,
})
})
})
.collect()
}
fn decode_migration_file(data: &str, extension: &str) -> anyhow::Result<FileMigration> {
let migration: FileMigration = match extension {
"json" => serde_json::from_str(data)?,
"toml" => toml::from_str(data)?,
extension => {
return Err(anyhow::anyhow!(
"unrecognized file extension '{}'",
extension
))
}
};
Ok(migration)
}
#[derive(Serialize, Deserialize)]
struct FileMigration {
name: Option<String>,
description: Option<String>,
actions: Vec<Box<dyn Action>>,
}

View File

@ -0,0 +1,213 @@
use super::{Action, Column};
use crate::{
db::Conn,
schema::{self, Schema},
};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
pub struct AddColumn {
pub table: String,
pub up: Option<String>,
pub column: Column,
}
impl AddColumn {
fn trigger_name(&self) -> String {
format!("add_column_{}_{}", self.table, self.column.name)
}
fn not_null_constraint_name(&self) -> String {
format!("add_column_not_null_{}_{}", self.table, self.column.name)
}
}
#[typetag::serde(name = "add_column")]
impl Action for AddColumn {
fn describe(&self) -> String {
format!(
"Adding column \"{}\" to \"{}\"",
self.column.name, self.table
)
}
fn run(&self, db: &mut dyn Conn, schema: &Schema) -> anyhow::Result<()> {
let table = schema.find_table(&self.table)?;
let mut definition_parts = vec![
self.column.name.to_string(),
self.column.data_type.to_string(),
];
if let Some(default) = &self.column.default {
definition_parts.push("DEFAULT".to_string());
definition_parts.push(default.to_string());
}
// Add temporary column
let query = format!(
"
ALTER TABLE {table}
ADD COLUMN IF NOT EXISTS {definition};
",
table = self.table,
definition = definition_parts.join(" "),
);
db.run(&query)?;
if let Some(up) = &self.up {
let query = format!(
"
ALTER TABLE {table}
ADD COLUMN IF NOT EXISTS __reshape_is_new BOOLEAN DEFAULT FALSE NOT NULL;
",
table = self.table,
);
db.run(&query)?;
let declarations: Vec<String> = table
.columns
.iter()
.map(|column| {
format!(
"{name} public.{table}.{name}%TYPE := NEW.{name};",
table = table.name,
name = column.name,
)
})
.collect();
// Add triggers to fill in values as they are inserted/updated
let query = format!(
"
CREATE OR REPLACE FUNCTION {trigger_name}()
RETURNS TRIGGER AS $$
BEGIN
IF NOT NEW.__reshape_is_new THEN
DECLARE
{declarations}
BEGIN
NEW.{column_name} = {up};
END;
END IF;
RETURN NEW;
END
$$ language 'plpgsql';
DROP TRIGGER IF EXISTS {trigger_name} ON {table};
CREATE TRIGGER {trigger_name} BEFORE UPDATE OR INSERT ON {table} FOR EACH ROW EXECUTE PROCEDURE {trigger_name}();
",
column_name = self.column.name,
trigger_name = self.trigger_name(),
up = up,
table = self.table,
declarations = declarations.join("\n"),
);
db.run(&query)?;
}
// Add a temporary NOT NULL constraint if the column shouldn't be nullable.
// This constraint is set as NOT VALID so it doesn't apply to existing rows and
// the existing rows don't need to be scanned under an exclusive lock.
// Thanks to this, we can set the full column as NOT NULL later with minimal locking.
if !self.column.nullable {
let query = format!(
"
ALTER TABLE {table}
ADD CONSTRAINT {constraint_name}
CHECK ({column} IS NOT NULL) NOT VALID
",
table = self.table,
constraint_name = self.not_null_constraint_name(),
column = self.column.name,
);
db.run(&query)?;
}
// Fill in values
if let Some(up) = &self.up {
let query = format!(
"
UPDATE {table} SET {column_name} = {up}
",
table = self.table,
column_name = self.column.name,
up = up,
);
db.run(&query)?;
}
Ok(())
}
fn complete(&self, db: &mut dyn Conn, _schema: &Schema) -> anyhow::Result<()> {
// Remove triggers, procedures and temporary column
let query = format!(
"
DROP TRIGGER IF EXISTS {trigger_name} ON {table};
DROP FUNCTION IF EXISTS {trigger_name};
",
table = self.table,
trigger_name = self.trigger_name(),
);
db.run(&query)?;
// Update column to be NOT NULL if necessary
if !self.column.nullable {
// Validate the temporary constraint (should always be valid).
// This performs a sequential scan but does not take an exclusive lock.
let query = format!(
"
ALTER TABLE {table}
VALIDATE CONSTRAINT {constraint_name}
",
table = self.table,
constraint_name = self.not_null_constraint_name(),
);
db.run(&query)?;
// Update the column to be NOT NULL.
// This requires an exclusive lock but since PG 12 it can check
// the existing constraint for correctness which makes the lock short-lived.
// Source: https://dba.stackexchange.com/a/268128
let query = format!(
"
ALTER TABLE {table}
ALTER COLUMN {column} SET NOT NULL
",
table = self.table,
column = self.column.name,
);
db.run(&query)?;
// Drop the temporary constraint
let query = format!(
"
ALTER TABLE {table}
DROP CONSTRAINT {constraint_name}
",
table = self.table,
constraint_name = self.not_null_constraint_name(),
);
db.run(&query)?;
}
Ok(())
}
fn update_schema(&self, schema: &mut Schema) -> anyhow::Result<()> {
let table = schema.find_table_mut(&self.table)?;
if self.up.is_some() {
table.has_is_new = true;
}
table.add_column(schema::Column {
name: self.column.name.to_string(),
real_name: None,
data_type: self.column.data_type.to_string(),
nullable: self.column.nullable,
});
Ok(())
}
}

View File

@ -0,0 +1,323 @@
use super::Action;
use crate::{db::Conn, schema::Schema};
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
pub struct AlterColumn {
pub table: String,
pub column: String,
pub up: Option<String>,
pub down: Option<String>,
pub changes: ColumnChanges,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ColumnChanges {
pub name: Option<String>,
#[serde(rename = "type")]
pub data_type: Option<String>,
pub nullable: Option<bool>,
}
impl AlterColumn {
fn insert_trigger_name(&self) -> String {
format!("alter_column_insert_trigger_{}_{}", self.table, self.column)
}
fn update_old_trigger_name(&self) -> String {
format!(
"alter_column_update_old_trigger_{}_{}",
self.table, self.column
)
}
fn update_new_trigger_name(&self) -> String {
format!(
"alter_column_update_new_trigger_{}_{}",
self.table, self.column
)
}
fn not_null_constraint_name(&self) -> String {
format!(
"alter_column_temporary_not_null_{}_{}",
self.table, self.column
)
}
fn can_short_circuit(&self) -> bool {
self.changes.name.is_some()
&& self.changes.data_type.is_none()
&& self.changes.nullable.is_none()
}
}
#[typetag::serde(name = "uppercase_column")]
impl Action for AlterColumn {
fn describe(&self) -> String {
format!("Altering column \"{}\" on \"{}\"", self.column, self.table)
}
fn run(&self, db: &mut dyn Conn, schema: &Schema) -> anyhow::Result<()> {
let column = schema
.find_table(&self.table)
.and_then(|table| table.find_column(&self.column))?;
// If we are only changing the name of a column, we can do that without creating a temporary column
if self.can_short_circuit() {
if let Some(new_name) = &self.changes.name {
let query = format!(
"
ALTER TABLE {table}
RENAME COLUMN {existing_name} TO {new_name}
",
table = self.table,
existing_name = column.real_name(),
new_name = new_name,
);
db.run(&query)?;
}
return Ok(());
}
// If we couldn't short circuit, then we need up and down functions
let (up, down) = match (&self.up, &self.down) {
(Some(up), Some(down)) => (up, down),
_ => return Err(anyhow!("missing up or down values")),
};
let temporary_column = format!("__new__{}", column.real_name());
let temporary_column_type = self.changes.data_type.as_ref().unwrap_or(&column.data_type);
// Add temporary, nullable column
let query = format!(
"
ALTER TABLE {table}
ADD COLUMN IF NOT EXISTS {temp_column} {temp_column_type};
ALTER TABLE {table}
ADD COLUMN IF NOT EXISTS __reshape_is_new BOOLEAN DEFAULT FALSE NOT NULL;
",
table = self.table,
temp_column = temporary_column,
temp_column_type = temporary_column_type,
);
db.run(&query)?;
// Add triggers to fill in values as they are inserted/updated
let query = format!(
"
CREATE OR REPLACE FUNCTION {insert_trigger}()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.__reshape_is_new THEN
DECLARE
{existing_column} public.{table}.{temp_column}%TYPE := NEW.{temp_column};
BEGIN
{existing_column} = NEW.{temp_column};
NEW.{existing_column} = {down};
END;
ELSIF NOT NEW.__reshape_is_new THEN
DECLARE
{existing_column} public.{table}.{existing_column}%TYPE := NEW.{existing_column};
BEGIN
{existing_column} = NEW.{existing_column};
NEW.{temp_column} = {up};
END;
END IF;
RETURN NEW;
END
$$ language 'plpgsql';
DROP TRIGGER IF EXISTS {insert_trigger} ON {table};
CREATE TRIGGER {insert_trigger} BEFORE INSERT ON {table} FOR EACH ROW EXECUTE PROCEDURE {insert_trigger}();
CREATE OR REPLACE FUNCTION {update_old_trigger}()
RETURNS TRIGGER AS $$
BEGIN
NEW.{temp_column} = UPPER(NEW.{existing_column});
RETURN NEW;
END
$$ language 'plpgsql';
DROP TRIGGER IF EXISTS {update_old_trigger} ON {table};
CREATE TRIGGER {update_old_trigger} BEFORE UPDATE OF {existing_column} ON {table} FOR EACH ROW EXECUTE PROCEDURE {update_old_trigger}();
CREATE OR REPLACE FUNCTION {update_new_trigger}()
RETURNS TRIGGER AS $$
BEGIN
NEW.{existing_column} = LOWER(NEW.{temp_column});
RETURN NEW;
END
$$ language 'plpgsql';
DROP TRIGGER IF EXISTS {update_new_trigger} ON {table};
CREATE TRIGGER {update_new_trigger} BEFORE UPDATE OF {temp_column} ON {table} FOR EACH ROW EXECUTE PROCEDURE {update_new_trigger}();
",
existing_column = column.real_name(),
temp_column = temporary_column,
up = up,
down = down,
table = self.table,
insert_trigger = self.insert_trigger_name(),
update_old_trigger = self.update_old_trigger_name(),
update_new_trigger = self.update_new_trigger_name(),
);
db.run(&query)?;
// Add a temporary NOT NULL constraint if the column shouldn't be nullable.
// This constraint is set as NOT VALID so it doesn't apply to existing rows and
// the existing rows don't need to be scanned under an exclusive lock.
// Thanks to this, we can set the full column as NOT NULL later with minimal locking.
if !column.nullable {
let query = format!(
"
ALTER TABLE {table}
ADD CONSTRAINT {constraint_name}
CHECK ({column} IS NOT NULL) NOT VALID
",
table = self.table,
constraint_name = self.not_null_constraint_name(),
column = temporary_column,
);
db.run(&query)?;
}
// Fill in values
let query = format!(
"
UPDATE {table} SET {temp_column} = {up}
",
table = self.table,
temp_column = temporary_column,
up = up,
);
db.run(&query)?;
Ok(())
}
fn complete(&self, db: &mut dyn Conn, schema: &Schema) -> anyhow::Result<()> {
if self.can_short_circuit() {
return Ok(());
}
let column = schema
.find_table(&self.table)
.and_then(|table| table.find_column(&self.column))?;
let column_name = self.changes.name.as_deref().unwrap_or(column.real_name());
// Remove old column
let query = format!(
"
ALTER TABLE {} DROP COLUMN {} CASCADE
",
self.table,
column.real_name()
);
db.run(&query)?;
// Rename temporary column
let query = format!(
"
ALTER TABLE {table} RENAME COLUMN __new__{real_name} TO {name}
",
table = self.table,
real_name = column.real_name(),
name = column_name,
);
db.run(&query)?;
// Remove triggers and procedures
let query = format!(
"
DROP TRIGGER IF EXISTS {insert_trigger} ON {table};
DROP FUNCTION IF EXISTS {insert_trigger};
DROP TRIGGER IF EXISTS {update_old_trigger} ON {table};
DROP FUNCTION IF EXISTS {update_old_trigger};
DROP TRIGGER IF EXISTS {update_new_trigger} ON {table};
DROP FUNCTION IF EXISTS {update_new_trigger};
",
table = self.table,
insert_trigger = self.insert_trigger_name(),
update_old_trigger = self.update_old_trigger_name(),
update_new_trigger = self.update_new_trigger_name(),
);
db.run(&query)?;
// Update column to be NOT NULL if necessary
if !column.nullable {
// Validate the temporary constraint (should always be valid).
// This performs a sequential scan but does not take an exclusive lock.
let query = format!(
"
ALTER TABLE {table}
VALIDATE CONSTRAINT {constraint_name}
",
table = self.table,
constraint_name = self.not_null_constraint_name(),
);
db.run(&query)?;
// Update the column to be NOT NULL.
// This requires an exclusive lock but since PG 12 it can check
// the existing constraint for correctness which makes the lock short-lived.
// Source: https://dba.stackexchange.com/a/268128
let query = format!(
"
ALTER TABLE {table}
ALTER COLUMN {column} SET NOT NULL
",
table = self.table,
column = column_name,
);
db.run(&query)?;
// Drop the temporary constraint
let query = format!(
"
ALTER TABLE {table}
DROP CONSTRAINT {constraint_name}
",
table = self.table,
constraint_name = self.not_null_constraint_name(),
);
db.run(&query)?;
}
Ok(())
}
fn update_schema(&self, schema: &mut Schema) -> anyhow::Result<()> {
let table = schema.find_table_mut(&self.table)?;
let column = table.find_column_mut(&self.column)?;
// If we are only changing the name of a column, we haven't created a temporary column
if self.can_short_circuit() {
if let Some(new_name) = &self.changes.name {
column.real_name = None;
column.name = new_name.to_string();
}
return Ok(());
}
column.name = self
.changes
.name
.as_ref()
.map(|n| n.to_string())
.unwrap_or(self.column.to_string());
column.real_name = Some(format!("__new__{}", self.column));
table.has_is_new = true;
Ok(())
}
}

18
src/migrations/common.rs Normal file
View File

@ -0,0 +1,18 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
pub struct Column {
pub name: String,
#[serde(rename = "type")]
pub data_type: String,
#[serde(default = "nullable_default")]
pub nullable: bool,
pub default: Option<String>,
}
fn nullable_default() -> bool {
true
}

View File

@ -0,0 +1,70 @@
use super::{Action, Column};
use crate::{
db::Conn,
schema::{Schema, Table},
};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
pub struct CreateTable {
pub name: String,
pub columns: Vec<Column>,
}
#[typetag::serde(name = "create_table")]
impl Action for CreateTable {
fn describe(&self) -> String {
format!("Creating table \"{}\"", self.name)
}
fn run(&self, db: &mut dyn Conn, _schema: &Schema) -> anyhow::Result<()> {
let column_definitions: Vec<String> = self
.columns
.iter()
.map(|column| {
let mut parts = vec![column.name.to_string(), column.data_type.to_string()];
if let Some(default) = &column.default {
parts.push("DEFAULT".to_string());
parts.push(default.to_string());
}
if !column.nullable {
parts.push("NOT NULL".to_string());
}
parts.join(" ")
})
.collect();
let query = format!(
"CREATE TABLE {} (
{}
)",
self.name,
column_definitions.join(",\n"),
);
db.run(&query)?;
Ok(())
}
fn complete(&self, _db: &mut dyn Conn, _schema: &Schema) -> anyhow::Result<()> {
// Do nothing
Ok(())
}
fn update_schema(&self, schema: &mut Schema) -> anyhow::Result<()> {
let mut table = Table::new(self.name.to_string());
for column in &self.columns {
table.add_column(crate::schema::Column {
name: column.name.to_string(),
real_name: None,
data_type: column.data_type.to_string(),
nullable: column.nullable,
});
}
schema.add_table(table);
Ok(())
}
}

64
src/migrations/mod.rs Normal file
View File

@ -0,0 +1,64 @@
use crate::{db::Conn, schema::Schema};
use core::fmt::Debug;
use serde::{Deserialize, Serialize};
// Re-export migration types
mod common;
pub use common::Column;
mod create_table;
pub use create_table::CreateTable;
mod alter_column;
pub use alter_column::{AlterColumn, ColumnChanges};
mod add_column;
pub use add_column::AddColumn;
mod remove_column;
pub use remove_column::RemoveColumn;
#[derive(Serialize, Deserialize, Debug)]
pub struct Migration {
pub name: String,
pub description: Option<String>,
pub actions: Vec<Box<dyn Action>>,
}
impl Migration {
pub fn new(name: impl Into<String>, description: Option<String>) -> Migration {
Migration {
name: name.into(),
description,
actions: vec![],
}
}
pub fn with_action(mut self, action: impl Action + 'static) -> Self {
self.actions.push(Box::new(action));
self
}
}
impl PartialEq for Migration {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
}
}
impl Eq for Migration {}
impl Clone for Migration {
fn clone(&self) -> Self {
let serialized = serde_json::to_string(self).unwrap();
serde_json::from_str(&serialized).unwrap()
}
}
#[typetag::serde(tag = "type")]
pub trait Action: Debug {
fn describe(&self) -> String;
fn run(&self, db: &mut dyn Conn, schema: &Schema) -> anyhow::Result<()>;
fn complete(&self, db: &mut dyn Conn, schema: &Schema) -> anyhow::Result<()>;
fn update_schema(&self, schema: &mut Schema) -> anyhow::Result<()>;
}

View File

@ -0,0 +1,100 @@
use super::Action;
use crate::{db::Conn, schema::Schema};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
pub struct RemoveColumn {
pub table: String,
pub column: String,
pub down: Option<String>,
}
impl RemoveColumn {
fn trigger_name(&self) -> String {
format!("remove_column_{}_{}", self.table, self.column)
}
}
#[typetag::serde(name = "remove_column")]
impl Action for RemoveColumn {
fn describe(&self) -> String {
format!(
"Removing column \"{}\" from \"{}\"",
self.column, self.table
)
}
fn run(&self, db: &mut dyn Conn, schema: &Schema) -> anyhow::Result<()> {
// Add down trigger
if let Some(down) = &self.down {
let table = schema.find_table(&self.table)?;
let declarations: Vec<String> = table
.columns
.iter()
.map(|column| {
format!(
"{name} public.{table}.{name}%TYPE := NEW.{name};",
table = table.name,
name = column.name,
)
})
.collect();
let query = format!(
"
CREATE OR REPLACE FUNCTION {trigger_name}()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.{column_name} IS NULL THEN
DECLARE
{declarations}
BEGIN
NEW.{column_name} = {down};
END;
END IF;
RETURN NEW;
END
$$ language 'plpgsql';
DROP TRIGGER IF EXISTS {trigger_name} ON {table};
CREATE TRIGGER {trigger_name} BEFORE UPDATE OR INSERT ON {table} FOR EACH ROW EXECUTE PROCEDURE {trigger_name}();
",
column_name = self.column,
trigger_name = self.trigger_name(),
down = down,
table = self.table,
declarations = declarations.join("\n"),
);
db.run(&query)?;
}
Ok(())
}
fn complete(&self, db: &mut dyn Conn, _schema: &Schema) -> anyhow::Result<()> {
// Remove column, function and trigger
let query = format!(
"
ALTER TABLE {table}
DROP COLUMN {column};
DROP TRIGGER IF EXISTS {trigger_name} ON {table};
DROP FUNCTION IF EXISTS {trigger_name};
",
table = self.table,
column = self.column,
trigger_name = self.trigger_name(),
);
db.run(&query)?;
Ok(())
}
fn update_schema(&self, schema: &mut Schema) -> anyhow::Result<()> {
let table = schema.find_table_mut(&self.table)?;
table.remove_column(&self.column);
Ok(())
}
}

112
src/schema.rs Normal file
View File

@ -0,0 +1,112 @@
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Schema {
pub tables: HashMap<String, Table>,
}
impl Schema {
pub fn new() -> Schema {
Schema {
tables: HashMap::new(),
}
}
pub fn add_table(&mut self, table: Table) -> &mut Self {
self.tables.insert(table.name.to_string(), table);
self
}
pub fn find_table(&self, name: &str) -> anyhow::Result<&Table> {
self.tables
.get(name)
.ok_or_else(|| anyhow!("no table {}", name))
}
pub fn find_table_mut(&mut self, name: &str) -> anyhow::Result<&mut Table> {
self.tables
.get_mut(name)
.ok_or_else(|| anyhow!("no table {}", name))
}
}
impl Default for Schema {
fn default() -> Self {
Self::new()
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Table {
pub name: String,
pub columns: Vec<Column>,
#[serde(skip)]
pub has_is_new: bool,
}
impl Table {
pub fn new(name: impl Into<String>) -> Table {
Table {
name: name.into(),
columns: vec![],
has_is_new: false,
}
}
pub fn add_column(&mut self, column: Column) -> &mut Self {
self.columns.push(column);
self
}
pub fn remove_column(&mut self, column_name: &str) -> &mut Self {
if let Some(index) = self.columns.iter().position(|c| c.name == column_name) {
self.columns.remove(index);
}
self
}
pub fn find_column(&self, name: &str) -> anyhow::Result<&Column> {
self.columns
.iter()
.find(|column| column.name == name)
.ok_or_else(|| anyhow!("no column {} on table {}", name, self.name))
}
pub fn find_column_mut(&mut self, name: &str) -> anyhow::Result<&mut Column> {
self.columns
.iter_mut()
.find(|column| column.name == name)
.ok_or(anyhow!("no column {} on table {}", name, self.name))
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Column {
pub name: String,
#[serde(skip)]
pub real_name: Option<String>,
pub data_type: String,
pub nullable: bool,
}
impl Column {
pub fn real_name(&self) -> &str {
self.real_name.as_ref().unwrap_or(&self.name)
}
//pub fn new(
// name: impl Into<String>,
// real_name: impl Into<String>,
// datatype: impl Into<String>,
// nullable: bool,
//) -> Column {
// Column {
// name: name.into(),
// real_name: real_name.into(),
// datatype: datatype.into(),
// nullable,
// }
//}
}

111
src/state.rs Normal file
View File

@ -0,0 +1,111 @@
use crate::schema::Schema;
use crate::{db::Conn, migrations::Migration};
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use version::version;
#[derive(Serialize, Deserialize, Debug)]
pub struct State {
pub version: String,
pub status: Status,
pub current_schema: Schema,
pub current_migration: Option<String>,
pub migrations: Vec<Migration>,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "status")]
pub enum Status {
#[serde(rename = "idle")]
Idle,
#[serde(rename = "in_progress")]
InProgress {
target_migration: String,
target_schema: Schema,
},
}
impl State {
pub fn load(db: &mut impl Conn) -> State {
Self::ensure_schema_and_table(db);
let results = db
.query("SELECT value FROM reshape.data WHERE key = 'state'")
.unwrap();
match results.first() {
Some(row) => {
let json: serde_json::Value = row.get(0);
serde_json::from_value(json).unwrap()
}
None => Default::default(),
}
}
pub fn save(&self, db: &mut impl Conn) -> anyhow::Result<()> {
Self::ensure_schema_and_table(db);
let json = serde_json::to_value(self)?;
db.query_with_params(
"INSERT INTO reshape.data (key, value) VALUES ('state', $1) ON CONFLICT (key) DO UPDATE SET value = $1",
&[&json]
)?;
Ok(())
}
pub fn clear(&mut self, db: &mut impl Conn) -> anyhow::Result<()> {
db.run("DROP SCHEMA reshape CASCADE")?;
let default = Self::default();
self.status = default.status;
self.current_migration = default.current_migration;
self.current_schema = default.current_schema;
self.migrations = default.migrations;
Ok(())
}
pub fn set_migrations<T>(&mut self, migrations: T) -> Result<(), anyhow::Error>
where
T: IntoIterator<Item = Migration>,
{
let mut new_iter = migrations.into_iter();
// Ensure the new migration match up with the existing ones
for pair in self.migrations.iter().zip(new_iter.by_ref()) {
let (existing, ref new) = pair;
if existing != new {
return Err(anyhow!(
"existing migration {} does not match new migration {}",
existing.name,
new.name
));
}
}
// Add any new migrations
self.migrations.extend(new_iter);
Ok(())
}
fn ensure_schema_and_table(db: &mut impl Conn) {
db.run("CREATE SCHEMA IF NOT EXISTS reshape").unwrap();
db.run("CREATE TABLE IF NOT EXISTS reshape.data (key TEXT PRIMARY KEY, value JSONB)")
.unwrap();
}
}
impl Default for State {
fn default() -> Self {
State {
version: version!().to_string(),
status: Status::Idle,
current_migration: None,
current_schema: Schema::new(),
migrations: vec![],
}
}
}

293
tests/add_column.rs Normal file
View File

@ -0,0 +1,293 @@
use reshape::migrations::{AddColumn, Column, CreateTable, Migration};
use reshape::Status;
mod common;
#[test]
fn add_column() {
let (mut reshape, mut old_db, mut new_db) = common::setup();
let create_users_table = Migration::new("create_users_table", None).with_action(CreateTable {
name: "users".to_string(),
columns: vec![
Column {
name: "id".to_string(),
data_type: "SERIAL".to_string(),
nullable: true,
default: None,
},
Column {
name: "name".to_string(),
data_type: "TEXT".to_string(),
nullable: false,
default: None,
},
],
});
let add_first_last_name_columns = Migration::new("add_first_and_last_name_columns", None)
.with_action(AddColumn {
table: "users".to_string(),
column: Column {
name: "first".to_string(),
data_type: "TEXT".to_string(),
nullable: false,
default: None,
},
up: Some("(STRING_TO_ARRAY(name, ' '))[1]".to_string()),
})
.with_action(AddColumn {
table: "users".to_string(),
column: Column {
name: "last".to_string(),
data_type: "TEXT".to_string(),
nullable: false,
default: None,
},
up: Some("(STRING_TO_ARRAY(name, ' '))[2]".to_string()),
});
let first_migrations = vec![create_users_table.clone()];
let second_migrations = vec![
create_users_table.clone(),
add_first_last_name_columns.clone(),
];
// Run first migration, should automatically finish
reshape.migrate(first_migrations.clone()).unwrap();
assert!(matches!(reshape.state.status, Status::Idle));
assert_eq!(
Some(&create_users_table.name),
reshape.state.current_migration.as_ref()
);
// Update search paths
old_db
.simple_query(&reshape::generate_schema_query(
&first_migrations.last().unwrap().name,
))
.unwrap();
new_db
.simple_query(&reshape::generate_schema_query(
&first_migrations.last().unwrap().name,
))
.unwrap();
// Insert some test users
new_db
.simple_query(
"
INSERT INTO users (id, name) VALUES
(1, 'John Doe'),
(2, 'Jane Doe');
",
)
.unwrap();
// Run second migration
reshape.migrate(second_migrations.clone()).unwrap();
new_db
.simple_query(&reshape::generate_schema_query(
&second_migrations.last().unwrap().name,
))
.unwrap();
// Check that the existing users have the new columns populated
let expected = vec![("John", "Doe"), ("Jane", "Doe")];
assert!(new_db
.query("SELECT first, last FROM users ORDER BY id", &[],)
.unwrap()
.iter()
.map(|row| (row.get("first"), row.get("last")))
.eq(expected));
// Insert data using old schema and make sure the new columns are populated
old_db
.simple_query("INSERT INTO users (id, name) VALUES (3, 'Test Testsson')")
.unwrap();
let (first_name, last_name): (String, String) = new_db
.query_one("SELECT first, last from users WHERE id = 3", &[])
.map(|row| (row.get("first"), row.get("last")))
.unwrap();
assert_eq!(
("Test", "Testsson"),
(first_name.as_ref(), last_name.as_ref())
);
reshape.complete_migration().unwrap();
}
#[test]
fn add_column_nullable() {
let (mut reshape, mut old_db, mut new_db) = common::setup();
let create_users_table = Migration::new("create_users_table", None).with_action(CreateTable {
name: "users".to_string(),
columns: vec![Column {
name: "id".to_string(),
data_type: "SERIAL".to_string(),
nullable: true,
default: None,
}],
});
let add_name_column = Migration::new("add_nullable_name_column", None).with_action(AddColumn {
table: "users".to_string(),
column: Column {
name: "name".to_string(),
data_type: "TEXT".to_string(),
nullable: true,
default: None,
},
up: None,
});
let first_migrations = vec![create_users_table.clone()];
let second_migrations = vec![create_users_table.clone(), add_name_column.clone()];
// Run first migration, should automatically finish
reshape.migrate(first_migrations.clone()).unwrap();
assert!(matches!(reshape.state.status, Status::Idle));
assert_eq!(
Some(&create_users_table.name),
reshape.state.current_migration.as_ref()
);
// Update search paths
old_db
.simple_query(&reshape::generate_schema_query(
&first_migrations.last().unwrap().name,
))
.unwrap();
new_db
.simple_query(&reshape::generate_schema_query(
&first_migrations.last().unwrap().name,
))
.unwrap();
// Insert some test values
new_db
.simple_query(
"
INSERT INTO users (id) VALUES
(1),
(2);
",
)
.unwrap();
// Run second migration
reshape.migrate(second_migrations.clone()).unwrap();
new_db
.simple_query(&reshape::generate_schema_query(
&second_migrations.last().unwrap().name,
))
.unwrap();
// Ensure existing data got updated
let expected: Vec<Option<String>> = vec![None, None];
assert!(new_db
.query("SELECT name FROM users ORDER BY id", &[],)
.unwrap()
.iter()
.map(|row| row.get::<_, Option<String>>("name"))
.eq(expected));
// Insert data using old schema and ensure new column is NULL
old_db
.simple_query("INSERT INTO users (id) VALUES (3)")
.unwrap();
let name: Option<String> = new_db
.query_one("SELECT name from users WHERE id = 3", &[])
.map(|row| (row.get("name")))
.unwrap();
assert_eq!(None, name);
// Ensure data can be inserted against new schema
new_db
.simple_query("INSERT INTO users (id, name) VALUES (3, 'Test Testsson'), (4, NULL)")
.unwrap();
reshape.complete_migration().unwrap();
}
#[test]
fn add_column_with_default() {
let (mut reshape, mut old_db, mut new_db) = common::setup();
let create_users_table = Migration::new("create_users_table", None).with_action(CreateTable {
name: "users".to_string(),
columns: vec![Column {
name: "id".to_string(),
data_type: "SERIAL".to_string(),
nullable: true,
default: None,
}],
});
let add_name_column =
Migration::new("add_name_column_with_default", None).with_action(AddColumn {
table: "users".to_string(),
column: Column {
name: "name".to_string(),
data_type: "TEXT".to_string(),
nullable: false,
default: Some("'DEFAULT'".to_string()),
},
up: None,
});
let first_migrations = vec![create_users_table.clone()];
let second_migrations = vec![create_users_table.clone(), add_name_column.clone()];
// Run first migration, should automatically finish
reshape.migrate(first_migrations.clone()).unwrap();
assert!(matches!(reshape.state.status, Status::Idle));
assert_eq!(
Some(&create_users_table.name),
reshape.state.current_migration.as_ref()
);
// Update search paths
old_db
.simple_query(&reshape::generate_schema_query(
&first_migrations.last().unwrap().name,
))
.unwrap();
new_db
.simple_query(&reshape::generate_schema_query(
&first_migrations.last().unwrap().name,
))
.unwrap();
// Insert some test values
new_db
.simple_query("INSERT INTO users (id) VALUES (1), (2)")
.unwrap();
// Run second migration
reshape.migrate(second_migrations.clone()).unwrap();
new_db
.simple_query(&reshape::generate_schema_query(
&second_migrations.last().unwrap().name,
))
.unwrap();
// Ensure existing data got updated with defaults
let expected = vec!["DEFAULT".to_string(), "DEFAULT".to_string()];
assert!(new_db
.query("SELECT name FROM users ORDER BY id", &[],)
.unwrap()
.iter()
.map(|row| row.get::<_, String>("name"))
.eq(expected));
// Insert data using old schema and ensure new column gets the default value
old_db
.simple_query("INSERT INTO users (id) VALUES (3)")
.unwrap();
let name: String = new_db
.query_one("SELECT name from users WHERE id = 3", &[])
.map(|row| row.get("name"))
.unwrap();
assert_eq!("DEFAULT", name);
reshape.complete_migration().unwrap();
}

287
tests/alter_column.rs Normal file
View File

@ -0,0 +1,287 @@
use reshape::migrations::{AlterColumn, Column, ColumnChanges, CreateTable, Migration};
use reshape::Status;
mod common;
#[test]
fn alter_column_data() {
let (mut reshape, mut old_db, mut new_db) = common::setup();
let create_users_table = Migration::new("create_users_table", None).with_action(CreateTable {
name: "users".to_string(),
columns: vec![
Column {
name: "id".to_string(),
data_type: "SERIAL".to_string(),
nullable: true,
default: None,
},
Column {
name: "name".to_string(),
data_type: "TEXT".to_string(),
nullable: false,
default: None,
},
],
});
let uppercase_name = Migration::new("uppercase_name", None).with_action(AlterColumn {
table: "users".to_string(),
column: "name".to_string(),
up: Some("UPPER(name)".to_string()),
down: Some("LOWER(name)".to_string()),
changes: ColumnChanges {
data_type: None,
nullable: None,
name: None,
},
});
let first_migrations = vec![create_users_table.clone()];
let second_migrations = vec![create_users_table.clone(), uppercase_name.clone()];
// Run first migration, should automatically finish
reshape.migrate(first_migrations.clone()).unwrap();
assert!(matches!(reshape.state.status, Status::Idle));
assert_eq!(
Some(&create_users_table.name),
reshape.state.current_migration.as_ref()
);
// Update search paths
old_db
.simple_query(&reshape::generate_schema_query(
&first_migrations.last().unwrap().name,
))
.unwrap();
// Insert some test users
old_db
.simple_query(
"
INSERT INTO users (id, name) VALUES
(1, 'john Doe'),
(2, 'jane Doe');
",
)
.unwrap();
// Run second migration
reshape.migrate(second_migrations.clone()).unwrap();
new_db
.simple_query(&reshape::generate_schema_query(
&second_migrations.last().unwrap().name,
))
.unwrap();
// Check that the existing users has the altered data
let expected = vec!["JOHN DOE", "JANE DOE"];
assert!(new_db
.query("SELECT name FROM users ORDER BY id", &[],)
.unwrap()
.iter()
.map(|row| row.get::<_, String>("name"))
.eq(expected));
// Insert data using old schema and make sure the new schema gets correct values
old_db
.simple_query("INSERT INTO users (id, name) VALUES (3, 'test testsson')")
.unwrap();
let result = new_db
.query_one("SELECT name from users WHERE id = 3", &[])
.unwrap();
assert_eq!("TEST TESTSSON", result.get::<_, &str>("name"));
// Insert data using new schema and make sure the old schema gets correct values
new_db
.simple_query("INSERT INTO users (id, name) VALUES (4, 'TEST TESTSSON')")
.unwrap();
let result = old_db
.query_one("SELECT name from users WHERE id = 4", &[])
.unwrap();
assert_eq!("test testsson", result.get::<_, &str>("name"));
reshape.complete_migration().unwrap();
}
#[test]
fn alter_column_set_not_null() {
let (mut reshape, mut old_db, mut new_db) = common::setup();
let create_users_table = Migration::new("create_users_table", None).with_action(CreateTable {
name: "users".to_string(),
columns: vec![
Column {
name: "id".to_string(),
data_type: "SERIAL".to_string(),
nullable: true,
default: None,
},
Column {
name: "name".to_string(),
data_type: "TEXT".to_string(),
nullable: true,
default: None,
},
],
});
let set_name_not_null = Migration::new("set_name_not_null", None).with_action(AlterColumn {
table: "users".to_string(),
column: "name".to_string(),
up: Some("COALESCE(name, 'TEST_DEFAULT_VALUE')".to_string()),
down: Some("name".to_string()),
changes: ColumnChanges {
data_type: None,
nullable: Some(false),
name: None,
},
});
let first_migrations = vec![create_users_table.clone()];
let second_migrations = vec![create_users_table.clone(), set_name_not_null.clone()];
// Run first migration, should automatically finish
reshape.migrate(first_migrations.clone()).unwrap();
assert!(matches!(reshape.state.status, Status::Idle));
assert_eq!(
Some(&create_users_table.name),
reshape.state.current_migration.as_ref()
);
// Update search paths
old_db
.simple_query(&reshape::generate_schema_query(
&first_migrations.last().unwrap().name,
))
.unwrap();
// Insert some test users
old_db
.simple_query(
"
INSERT INTO users (id, name) VALUES
(1, 'John Doe'),
(2, NULL);
",
)
.unwrap();
// Run second migration
reshape.migrate(second_migrations.clone()).unwrap();
new_db
.simple_query(&reshape::generate_schema_query(
&second_migrations.last().unwrap().name,
))
.unwrap();
// Check that existing users got the correct values
let expected = vec!["John Doe", "TEST_DEFAULT_VALUE"];
assert!(new_db
.query("SELECT name FROM users ORDER BY id", &[],)
.unwrap()
.iter()
.map(|row| row.get::<_, String>("name"))
.eq(expected));
// Insert data using old schema and make sure the new schema gets correct values
old_db
.simple_query("INSERT INTO users (id, name) VALUES (3, NULL)")
.unwrap();
let result = new_db
.query_one("SELECT name from users WHERE id = 3", &[])
.unwrap();
assert_eq!("TEST_DEFAULT_VALUE", result.get::<_, &str>("name"));
// Insert data using new schema and make sure the old schema gets correct values
new_db
.simple_query("INSERT INTO users (id, name) VALUES (4, 'Jane Doe')")
.unwrap();
let result = old_db
.query_one("SELECT name from users WHERE id = 4", &[])
.unwrap();
assert_eq!("Jane Doe", result.get::<_, &str>("name"));
reshape.complete_migration().unwrap();
}
#[test]
fn alter_column_rename() {
let (mut reshape, mut old_db, mut new_db) = common::setup();
let create_users_table = Migration::new("create_users_table", None).with_action(CreateTable {
name: "users".to_string(),
columns: vec![
Column {
name: "id".to_string(),
data_type: "SERIAL".to_string(),
nullable: true,
default: None,
},
Column {
name: "name".to_string(),
data_type: "TEXT".to_string(),
nullable: true,
default: None,
},
],
});
let rename_to_full_name =
Migration::new("rename_to_full_name", None).with_action(AlterColumn {
table: "users".to_string(),
column: "name".to_string(),
up: None, // up and down are not required when only renaming a column
down: None,
changes: ColumnChanges {
data_type: None,
nullable: None,
name: Some("full_name".to_string()),
},
});
let first_migrations = vec![create_users_table.clone()];
let second_migrations = vec![create_users_table.clone(), rename_to_full_name.clone()];
// Run first migration, should automatically finish
reshape.migrate(first_migrations.clone()).unwrap();
assert!(matches!(reshape.state.status, Status::Idle));
assert_eq!(
Some(&create_users_table.name),
reshape.state.current_migration.as_ref()
);
// Update search paths
old_db
.simple_query(&reshape::generate_schema_query(
&first_migrations.last().unwrap().name,
))
.unwrap();
// Insert some test data
old_db
.simple_query(
"
INSERT INTO users (id, name) VALUES
(1, 'John Doe'),
(2, 'Jane Doe');
",
)
.unwrap();
// Run second migration
reshape.migrate(second_migrations.clone()).unwrap();
new_db
.simple_query(&reshape::generate_schema_query(
&second_migrations.last().unwrap().name,
))
.unwrap();
// Check that existing values can be queried using new column name
let expected = vec!["John Doe", "Jane Doe"];
assert!(new_db
.query("SELECT full_name FROM users ORDER BY id", &[],)
.unwrap()
.iter()
.map(|row| row.get::<_, String>("full_name"))
.eq(expected));
reshape.complete_migration().unwrap();
}

15
tests/common.rs Normal file
View File

@ -0,0 +1,15 @@
use postgres::{Client, NoTls};
use reshape::Reshape;
pub fn setup() -> (Reshape, Client, Client) {
let connection_string = std::env::var("POSTGRES_CONNECTION_STRING")
.unwrap_or("postgres://postgres:postgres@localhost/reshape_test".to_string());
let old_db = Client::connect(&connection_string, NoTls).unwrap();
let new_db = Client::connect(&connection_string, NoTls).unwrap();
let mut reshape = Reshape::new(&connection_string).unwrap();
reshape.remove().unwrap();
(reshape, old_db, new_db)
}

100
tests/create_table.rs Normal file
View File

@ -0,0 +1,100 @@
use reshape::{
migrations::{Column, CreateTable, Migration},
Status,
};
mod common;
#[test]
fn create_table() {
let (mut reshape, mut db, _) = common::setup();
let create_table_migration =
Migration::new("create_users_table", None).with_action(CreateTable {
name: "users".to_string(),
columns: vec![
Column {
name: "id".to_string(),
data_type: "SERIAL".to_string(),
nullable: true, // Will be ignored by Postgres as the column is a SERIAL
default: None,
},
Column {
name: "name".to_string(),
data_type: "TEXT".to_string(),
nullable: true,
default: None,
},
Column {
name: "created_at".to_string(),
data_type: "TIMESTAMP".to_string(),
nullable: false,
default: Some("NOW()".to_string()),
},
],
});
reshape
.migrate(vec![create_table_migration.clone()])
.unwrap();
assert!(matches!(reshape.state.status, Status::Idle));
assert_eq!(
Some(&create_table_migration.name),
reshape.state.current_migration.as_ref()
);
// Ensure table was created
let result = db
.query_opt(
"
SELECT table_name
FROM information_schema.tables
WHERE table_name = 'users' AND table_schema = 'public'",
&[],
)
.unwrap();
assert!(result.is_some());
// Ensure table has the right columns
let result = db
.query(
"
SELECT column_name, column_default, is_nullable, data_type
FROM information_schema.columns
WHERE table_name = 'users' AND table_schema = 'public'
ORDER BY ordinal_position",
&[],
)
.unwrap();
// id column
let id_row = &result[0];
assert_eq!("id", id_row.get::<_, String>("column_name"));
assert!(id_row.get::<_, Option<String>>("column_default").is_some());
assert_eq!("NO", id_row.get::<_, String>("is_nullable"));
assert_eq!("integer", id_row.get::<_, String>("data_type"));
// name column
let name_row = &result[1];
assert_eq!("name", name_row.get::<_, String>("column_name"));
assert!(name_row
.get::<_, Option<String>>("column_default")
.is_none());
assert_eq!("YES", name_row.get::<_, String>("is_nullable"));
assert_eq!("text", name_row.get::<_, String>("data_type"));
// created_at column
let created_at_column = &result[2];
assert_eq!(
"created_at",
created_at_column.get::<_, String>("column_name")
);
assert!(created_at_column
.get::<_, Option<String>>("column_default")
.is_some());
assert_eq!("NO", created_at_column.get::<_, String>("is_nullable"));
assert_eq!(
"timestamp without time zone",
created_at_column.get::<_, String>("data_type")
);
}

76
tests/remove_column.rs Normal file
View File

@ -0,0 +1,76 @@
use reshape::migrations::{Column, CreateTable, Migration, RemoveColumn};
mod common;
#[test]
fn remove_column() {
let (mut reshape, mut old_db, mut new_db) = common::setup();
let create_table_migration =
Migration::new("create_users_table", None).with_action(CreateTable {
name: "users".to_string(),
columns: vec![
Column {
name: "id".to_string(),
data_type: "SERIAL".to_string(),
nullable: true, // Will be ignored by Postgres as the column is a SERIAL
default: None,
},
Column {
name: "name".to_string(),
data_type: "TEXT".to_string(),
nullable: false,
default: None,
},
],
});
let remove_column_migration =
Migration::new("remove_name_column", None).with_action(RemoveColumn {
table: "users".to_string(),
column: "name".to_string(),
down: Some("'TEST_DOWN_VALUE'".to_string()),
});
let first_migrations = vec![create_table_migration.clone()];
let second_migrations = vec![
create_table_migration.clone(),
remove_column_migration.clone(),
];
// Run migrations
reshape.migrate(first_migrations.clone()).unwrap();
reshape.migrate(second_migrations.clone()).unwrap();
// Update schemas of Postgres connections
let old_schema_query = reshape::generate_schema_query(&first_migrations.last().unwrap().name);
let new_schema_query = reshape::generate_schema_query(&second_migrations.last().unwrap().name);
old_db.simple_query(&old_schema_query).unwrap();
new_db.simple_query(&new_schema_query).unwrap();
// Insert using old schema and ensure it can be retrieved through new schema
old_db
.simple_query("INSERT INTO users(name) VALUES ('John Doe')")
.unwrap();
let results = new_db
.query("SELECT id FROM users WHERE id = 1", &[])
.unwrap();
assert_eq!(1, results.len());
assert_eq!(1, results[0].get::<_, i32>("id"));
// Ensure the name column is not accesible through the new schema
assert!(new_db.query("SELECT id, name FROM users", &[]).is_err());
// Insert using new schema and ensure the down function is correctly applied
new_db
.simple_query("INSERT INTO users DEFAULT VALUES")
.unwrap();
let result = old_db
.query_opt("SELECT name FROM users WHERE id = 2", &[])
.unwrap();
assert_eq!(
Some("TEST_DOWN_VALUE"),
result.as_ref().map(|row| row.get("name"))
);
reshape.complete_migration().unwrap();
}