
254 lines
7.6 KiB
Raw Normal View History

package state
import (
Make migrations schema aware (#12) This change will retrieve and store the resulting schema after a migration is completed. This schema will be used as the base to execute the next migration, making it possible to create views that are aware of the full schema, and not only the one created by the last migration. We use a function to retrieve the schema directly from Postgres instead of building it from the migration files. This allows for more features in the future, like doing an initial sync on top of the existing schema or automatically detecting and storing out of band migrations from triggers. Example JSON stored schema: ``` { "tables": { "bills": { "oid": "18272", "name": "bills", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "date": { "name": "date", "type": "time with time zone", "comment": null, "default": null, "nullable": false }, "quantity": { "name": "quantity", "type": "integer", "comment": null, "default": null, "nullable": false } }, "comment": null }, "products": { "oid": "18286", "name": "products", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": "nextval(_pgroll_new_products_id_seq::regclass)", "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "price": { "name": "price", "type": "numeric(10,2)", "comment": null, "default": null, "nullable": false } }, "comment": null }, "customers": { "oid": "18263", "name": "customers", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "credit_card": { "name": "credit_card", "type": "text", "comment": null, "default": null, "nullable": true } }, "comment": null } } } ``` After this change, I believe that the `create_table` operation is feature complete and can be used for many sequential migrations.
2023-07-05 14:20:59 +03:00
const sqlInit = `
CREATE TABLE IF NOT EXISTS %[1]s.migrations (
Make migrations schema aware (#12) This change will retrieve and store the resulting schema after a migration is completed. This schema will be used as the base to execute the next migration, making it possible to create views that are aware of the full schema, and not only the one created by the last migration. We use a function to retrieve the schema directly from Postgres instead of building it from the migration files. This allows for more features in the future, like doing an initial sync on top of the existing schema or automatically detecting and storing out of band migrations from triggers. Example JSON stored schema: ``` { "tables": { "bills": { "oid": "18272", "name": "bills", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "date": { "name": "date", "type": "time with time zone", "comment": null, "default": null, "nullable": false }, "quantity": { "name": "quantity", "type": "integer", "comment": null, "default": null, "nullable": false } }, "comment": null }, "products": { "oid": "18286", "name": "products", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": "nextval(_pgroll_new_products_id_seq::regclass)", "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "price": { "name": "price", "type": "numeric(10,2)", "comment": null, "default": null, "nullable": false } }, "comment": null }, "customers": { "oid": "18263", "name": "customers", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "credit_card": { "name": "credit_card", "type": "text", "comment": null, "default": null, "nullable": true } }, "comment": null } } } ``` After this change, I believe that the `create_table` operation is feature complete and can be used for many sequential migrations.
2023-07-05 14:20:59 +03:00
migration JSONB NOT NULL,
Make migrations schema aware (#12) This change will retrieve and store the resulting schema after a migration is completed. This schema will be used as the base to execute the next migration, making it possible to create views that are aware of the full schema, and not only the one created by the last migration. We use a function to retrieve the schema directly from Postgres instead of building it from the migration files. This allows for more features in the future, like doing an initial sync on top of the existing schema or automatically detecting and storing out of band migrations from triggers. Example JSON stored schema: ``` { "tables": { "bills": { "oid": "18272", "name": "bills", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "date": { "name": "date", "type": "time with time zone", "comment": null, "default": null, "nullable": false }, "quantity": { "name": "quantity", "type": "integer", "comment": null, "default": null, "nullable": false } }, "comment": null }, "products": { "oid": "18286", "name": "products", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": "nextval(_pgroll_new_products_id_seq::regclass)", "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "price": { "name": "price", "type": "numeric(10,2)", "comment": null, "default": null, "nullable": false } }, "comment": null }, "customers": { "oid": "18263", "name": "customers", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "credit_card": { "name": "credit_card", "type": "text", "comment": null, "default": null, "nullable": true } }, "comment": null } } } ``` After this change, I believe that the `create_table` operation is feature complete and can be used for many sequential migrations.
2023-07-05 14:20:59 +03:00
parent TEXT,
resulting_schema JSONB NOT NULL DEFAULT '{}'::jsonb,
PRIMARY KEY (schema, name),
FOREIGN KEY (schema, parent) REFERENCES %[1]s.migrations(schema, name)
-- Only one migration can be active at a time
CREATE UNIQUE INDEX IF NOT EXISTS only_one_active ON %[1]s.migrations (schema, name, done) WHERE done = false;
-- Only first migration can exist without parent
CREATE UNIQUE INDEX IF NOT EXISTS only_first_migration_without_parent ON %[1]s.migrations ((1)) WHERE parent IS NULL;
-- History is linear
CREATE UNIQUE INDEX IF NOT EXISTS history_is_linear ON %[1]s.migrations (schema, parent);
-- Helper functions
-- Are we in the middle of a migration?
CREATE OR REPLACE FUNCTION %[1]s.is_active_migration_period(schemaname NAME) RETURNS boolean
AS $$ SELECT EXISTS (SELECT 1 FROM %[1]s.migrations WHERE schema=schemaname AND done=false) $$
-- Get the latest version name (this is the one with child migrations)
CREATE OR REPLACE FUNCTION %[1]s.latest_version(schemaname NAME) RETURNS text
AS $$ SELECT FROM %[1]s.migrations p WHERE NOT EXISTS (SELECT 1 FROM %[1]s.migrations c WHERE schema=schemaname AND $$
Make migrations schema aware (#12) This change will retrieve and store the resulting schema after a migration is completed. This schema will be used as the base to execute the next migration, making it possible to create views that are aware of the full schema, and not only the one created by the last migration. We use a function to retrieve the schema directly from Postgres instead of building it from the migration files. This allows for more features in the future, like doing an initial sync on top of the existing schema or automatically detecting and storing out of band migrations from triggers. Example JSON stored schema: ``` { "tables": { "bills": { "oid": "18272", "name": "bills", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "date": { "name": "date", "type": "time with time zone", "comment": null, "default": null, "nullable": false }, "quantity": { "name": "quantity", "type": "integer", "comment": null, "default": null, "nullable": false } }, "comment": null }, "products": { "oid": "18286", "name": "products", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": "nextval(_pgroll_new_products_id_seq::regclass)", "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "price": { "name": "price", "type": "numeric(10,2)", "comment": null, "default": null, "nullable": false } }, "comment": null }, "customers": { "oid": "18263", "name": "customers", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "credit_card": { "name": "credit_card", "type": "text", "comment": null, "default": null, "nullable": true } }, "comment": null } } } ``` After this change, I believe that the `create_table` operation is feature complete and can be used for many sequential migrations.
2023-07-05 14:20:59 +03:00
-- Get the JSON representation of the current schema
CREATE OR REPLACE FUNCTION %[1]s.read_schema(schemaname text) RETURNS jsonb
LANGUAGE plpgsql AS $$
tables jsonb;
SELECT json_build_object(
'tables', (
SELECT json_object_agg(t.relname, jsonb_build_object(
'name', t.relname,
'oid', t.oid,
'comment', descr.description,
'columns', (
SELECT json_object_agg(name, c) FROM (
attr.attname AS name,
pg_get_expr(def.adbin, def.adrelid) AS default,
OR tp.typtype = 'd'
AND tp.typnotnull
) AS nullable,
WHEN 'character varying' :: regtype = ANY(ARRAY [attr.atttypid, tp.typelem]) THEN REPLACE(
format_type(attr.atttypid, attr.atttypmod),
'character varying',
WHEN 'timestamp with time zone' :: regtype = ANY(ARRAY [attr.atttypid, tp.typelem]) THEN REPLACE(
format_type(attr.atttypid, attr.atttypmod),
'timestamp with time zone',
ELSE format_type(attr.atttypid, attr.atttypmod)
END AS type,
descr.description AS comment
pg_attribute AS attr
INNER JOIN pg_type AS tp ON attr.atttypid = tp.oid
LEFT JOIN pg_attrdef AS def ON attr.attrelid = def.adrelid
AND attr.attnum = def.adnum
LEFT JOIN pg_description AS descr ON attr.attrelid = descr.objoid
AND attr.attnum = descr.objsubid
attr.attnum > 0
AND NOT attr.attisdropped
AND attr.attrelid = t.oid
) c
)) FROM pg_class AS t
INNER JOIN pg_namespace AS ns ON t.relnamespace = ns.oid
LEFT JOIN pg_description AS descr ON t.oid = descr.objoid
AND descr.objsubid = 0
ns.nspname = schemaname
AND t.relkind IN ('r', 'p') -- tables only (ignores views, materialized views & foreign tables)
INTO tables;
RETURN tables;
type State struct {
pgConn *sql.DB
schema string
func New(ctx context.Context, pgURL, stateSchema string) (*State, error) {
conn, err := sql.Open("postgres", pgURL)
if err != nil {
return nil, err
return &State{
pgConn: conn,
schema: stateSchema,
}, nil
func (s *State) Init(ctx context.Context) error {
// ensure pg-roll internal tables exist
// TODO: eventually use migrations for this instead of hardcoding
_, err := s.pgConn.ExecContext(ctx, fmt.Sprintf(sqlInit, pq.QuoteIdentifier(s.schema)))
return err
func (s *State) Close() error {
return s.pgConn.Close()
// IsActiveMigrationPeriod returns true if there is an active migration
func (s *State) IsActiveMigrationPeriod(ctx context.Context, schema string) (bool, error) {
var isActive bool
err := s.pgConn.QueryRowContext(ctx, fmt.Sprintf("SELECT %s.is_active_migration_period($1)", pq.QuoteIdentifier(s.schema)), schema).Scan(&isActive)
if err != nil {
return false, err
return isActive, err
// GetActiveMigration returns the name & raw content of the active migration (if any), errors out otherwise
func (s *State) GetActiveMigration(ctx context.Context, schema string) (*migrations.Migration, error) {
var name, rawMigration string
err := s.pgConn.QueryRowContext(ctx, fmt.Sprintf("SELECT name, migration FROM %s.migrations WHERE schema=$1 AND done=false", pq.QuoteIdentifier(s.schema)), schema).Scan(&name, &rawMigration)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrNoActiveMigration
return nil, err
var migration migrations.Migration
err = json.Unmarshal([]byte(rawMigration), &migration)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal migration: %w", err)
return &migration, nil
// Start creates a new migration, storing its name and raw content
// this will effectively activate a new migration period, so `IsActiveMigrationPeriod` will return true
// until the migration is completed
Make migrations schema aware (#12) This change will retrieve and store the resulting schema after a migration is completed. This schema will be used as the base to execute the next migration, making it possible to create views that are aware of the full schema, and not only the one created by the last migration. We use a function to retrieve the schema directly from Postgres instead of building it from the migration files. This allows for more features in the future, like doing an initial sync on top of the existing schema or automatically detecting and storing out of band migrations from triggers. Example JSON stored schema: ``` { "tables": { "bills": { "oid": "18272", "name": "bills", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "date": { "name": "date", "type": "time with time zone", "comment": null, "default": null, "nullable": false }, "quantity": { "name": "quantity", "type": "integer", "comment": null, "default": null, "nullable": false } }, "comment": null }, "products": { "oid": "18286", "name": "products", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": "nextval(_pgroll_new_products_id_seq::regclass)", "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "price": { "name": "price", "type": "numeric(10,2)", "comment": null, "default": null, "nullable": false } }, "comment": null }, "customers": { "oid": "18263", "name": "customers", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "credit_card": { "name": "credit_card", "type": "text", "comment": null, "default": null, "nullable": true } }, "comment": null } } } ``` After this change, I believe that the `create_table` operation is feature complete and can be used for many sequential migrations.
2023-07-05 14:20:59 +03:00
// This method will return the current schema (before the migration is applied)
func (s *State) Start(ctx context.Context, schemaname string, migration *migrations.Migration) (*schema.Schema, error) {
rawMigration, err := json.Marshal(migration)
if err != nil {
Make migrations schema aware (#12) This change will retrieve and store the resulting schema after a migration is completed. This schema will be used as the base to execute the next migration, making it possible to create views that are aware of the full schema, and not only the one created by the last migration. We use a function to retrieve the schema directly from Postgres instead of building it from the migration files. This allows for more features in the future, like doing an initial sync on top of the existing schema or automatically detecting and storing out of band migrations from triggers. Example JSON stored schema: ``` { "tables": { "bills": { "oid": "18272", "name": "bills", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "date": { "name": "date", "type": "time with time zone", "comment": null, "default": null, "nullable": false }, "quantity": { "name": "quantity", "type": "integer", "comment": null, "default": null, "nullable": false } }, "comment": null }, "products": { "oid": "18286", "name": "products", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": "nextval(_pgroll_new_products_id_seq::regclass)", "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "price": { "name": "price", "type": "numeric(10,2)", "comment": null, "default": null, "nullable": false } }, "comment": null }, "customers": { "oid": "18263", "name": "customers", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "credit_card": { "name": "credit_card", "type": "text", "comment": null, "default": null, "nullable": true } }, "comment": null } } } ``` After this change, I believe that the `create_table` operation is feature complete and can be used for many sequential migrations.
2023-07-05 14:20:59 +03:00
return nil, fmt.Errorf("unable to marshal migration: %w", err)
Make migrations schema aware (#12) This change will retrieve and store the resulting schema after a migration is completed. This schema will be used as the base to execute the next migration, making it possible to create views that are aware of the full schema, and not only the one created by the last migration. We use a function to retrieve the schema directly from Postgres instead of building it from the migration files. This allows for more features in the future, like doing an initial sync on top of the existing schema or automatically detecting and storing out of band migrations from triggers. Example JSON stored schema: ``` { "tables": { "bills": { "oid": "18272", "name": "bills", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "date": { "name": "date", "type": "time with time zone", "comment": null, "default": null, "nullable": false }, "quantity": { "name": "quantity", "type": "integer", "comment": null, "default": null, "nullable": false } }, "comment": null }, "products": { "oid": "18286", "name": "products", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": "nextval(_pgroll_new_products_id_seq::regclass)", "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "price": { "name": "price", "type": "numeric(10,2)", "comment": null, "default": null, "nullable": false } }, "comment": null }, "customers": { "oid": "18263", "name": "customers", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "credit_card": { "name": "credit_card", "type": "text", "comment": null, "default": null, "nullable": true } }, "comment": null } } } ``` After this change, I believe that the `create_table` operation is feature complete and can be used for many sequential migrations.
2023-07-05 14:20:59 +03:00
stmt := fmt.Sprintf(`
INSERT INTO %[1]s.migrations (schema, name, parent, migration) VALUES ($1, $2, %[1]s.latest_version($1), $3)
(SELECT resulting_schema FROM %[1]s.migrations WHERE schema=$1 AND name=%[1]s.latest_version($1)),
)`, pq.QuoteIdentifier(s.schema))
Make migrations schema aware (#12) This change will retrieve and store the resulting schema after a migration is completed. This schema will be used as the base to execute the next migration, making it possible to create views that are aware of the full schema, and not only the one created by the last migration. We use a function to retrieve the schema directly from Postgres instead of building it from the migration files. This allows for more features in the future, like doing an initial sync on top of the existing schema or automatically detecting and storing out of band migrations from triggers. Example JSON stored schema: ``` { "tables": { "bills": { "oid": "18272", "name": "bills", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "date": { "name": "date", "type": "time with time zone", "comment": null, "default": null, "nullable": false }, "quantity": { "name": "quantity", "type": "integer", "comment": null, "default": null, "nullable": false } }, "comment": null }, "products": { "oid": "18286", "name": "products", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": "nextval(_pgroll_new_products_id_seq::regclass)", "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "price": { "name": "price", "type": "numeric(10,2)", "comment": null, "default": null, "nullable": false } }, "comment": null }, "customers": { "oid": "18263", "name": "customers", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "credit_card": { "name": "credit_card", "type": "text", "comment": null, "default": null, "nullable": true } }, "comment": null } } } ``` After this change, I believe that the `create_table` operation is feature complete and can be used for many sequential migrations.
2023-07-05 14:20:59 +03:00
var rawSchema string
err = s.pgConn.QueryRowContext(ctx, stmt, schemaname, migration.Name, rawMigration).Scan(&rawSchema)
if err != nil {
return nil, err
var schema schema.Schema
err = json.Unmarshal([]byte(rawSchema), &schema)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal schema: %w", err)
return &schema, nil
// Complete marks a migration as completed
func (s *State) Complete(ctx context.Context, schema, name string) error {
Make migrations schema aware (#12) This change will retrieve and store the resulting schema after a migration is completed. This schema will be used as the base to execute the next migration, making it possible to create views that are aware of the full schema, and not only the one created by the last migration. We use a function to retrieve the schema directly from Postgres instead of building it from the migration files. This allows for more features in the future, like doing an initial sync on top of the existing schema or automatically detecting and storing out of band migrations from triggers. Example JSON stored schema: ``` { "tables": { "bills": { "oid": "18272", "name": "bills", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "date": { "name": "date", "type": "time with time zone", "comment": null, "default": null, "nullable": false }, "quantity": { "name": "quantity", "type": "integer", "comment": null, "default": null, "nullable": false } }, "comment": null }, "products": { "oid": "18286", "name": "products", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": "nextval(_pgroll_new_products_id_seq::regclass)", "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "price": { "name": "price", "type": "numeric(10,2)", "comment": null, "default": null, "nullable": false } }, "comment": null }, "customers": { "oid": "18263", "name": "customers", "columns": { "id": { "name": "id", "type": "integer", "comment": null, "default": null, "nullable": false }, "name": { "name": "name", "type": "varchar(255)", "comment": null, "default": null, "nullable": false }, "credit_card": { "name": "credit_card", "type": "text", "comment": null, "default": null, "nullable": true } }, "comment": null } } } ``` After this change, I believe that the `create_table` operation is feature complete and can be used for many sequential migrations.
2023-07-05 14:20:59 +03:00
res, err := s.pgConn.ExecContext(ctx, fmt.Sprintf("UPDATE %[1]s.migrations SET done=$1, resulting_schema=(SELECT %[1]s.read_schema($2)) WHERE schema=$2 AND name=$3 AND done=$4", pq.QuoteIdentifier(s.schema)), true, schema, name, false)
if err != nil {
return err
rows, err := res.RowsAffected()
if err != nil {
return err
if rows == 0 {
return fmt.Errorf("no migration found with name %s", name)
return err
// Rollback removes a migration from the state (we consider it rolled back, as if it never started)
func (s *State) Rollback(ctx context.Context, schema, name string) error {
res, err := s.pgConn.ExecContext(ctx, fmt.Sprintf("DELETE FROM %s.migrations WHERE schema=$1 AND name=$2 AND done=$3", pq.QuoteIdentifier(s.schema)), schema, name, false)
if err != nil {
return err
rows, err := res.RowsAffected()
if err != nil {
return err
if rows == 0 {
return fmt.Errorf("no migration found with name %s", name)
return nil