Remove old schema version on pg-roll complete (#13)

Ensure that the previous version of the schema is removed by `pg-roll
complete`.
This commit is contained in:
Andrew Farries 2023-07-05 13:58:55 +01:00 committed by GitHub
parent 5e61b040e1
commit 7bcbee1256
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 175 additions and 2 deletions

View File

@ -61,6 +61,18 @@ func (m *Roll) Complete(ctx context.Context) error {
return fmt.Errorf("unable to get active migration: %w", err)
}
// Drop the old schema
prevVersion, err := m.state.PreviousVersion(ctx, m.schema)
if err != nil {
return fmt.Errorf("unable to get name of previous version: %w", err)
}
if prevVersion != nil {
_, err = m.pgConn.ExecContext(ctx, fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE", pq.QuoteIdentifier(*prevVersion)))
if err != nil {
return fmt.Errorf("unable to drop previous version: %w", err)
}
}
// execute operations
for _, op := range migration.Operations {
err := op.Complete(ctx, m.pgConn)
@ -69,8 +81,6 @@ func (m *Roll) Complete(ctx context.Context) error {
}
}
// TODO: drop views from previous version
// mark as completed
err = m.state.Complete(ctx, m.schema, migration.Name)
if err != nil {

142
pkg/roll/execute_test.go Normal file
View File

@ -0,0 +1,142 @@
package roll_test
import (
"context"
"database/sql"
"testing"
"time"
"pg-roll/pkg/migrations"
"pg-roll/pkg/roll"
"pg-roll/pkg/state"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/postgres"
"github.com/testcontainers/testcontainers-go/wait"
)
const (
postgresImage = "postgres:15.3"
)
func TestPreviousVersionIsDroppedAfterMigrationCompletion(t *testing.T) {
t.Parallel()
withMigratorAndConnectionToContainer(t, func(mig *roll.Roll, db *sql.DB) {
ctx := context.Background()
const (
firstVersion = "1_create_table"
secondVersion = "2_create_table"
)
if err := mig.Start(ctx, &migrations.Migration{Name: firstVersion, Operations: migrations.Operations{createTableOp("table1")}}); err != nil {
t.Fatalf("Failed to start first migration: %v", err)
}
if err := mig.Complete(ctx); err != nil {
t.Fatalf("Failed to complete first migration: %v", err)
}
if err := mig.Start(ctx, &migrations.Migration{Name: secondVersion, Operations: migrations.Operations{createTableOp("table2")}}); err != nil {
t.Fatalf("Failed to start second migration: %v", err)
}
if err := mig.Complete(ctx); err != nil {
t.Fatalf("Failed to complete second migration: %v", err)
}
//
// Check that the schema for the first version has been dropped
//
var exists bool
err := db.QueryRow(`
SELECT EXISTS(
SELECT 1
FROM pg_catalog.pg_namespace
WHERE nspname = $1
)`, firstVersion).Scan(&exists)
if err != nil {
t.Fatal(err)
}
if exists {
t.Errorf("Expected schema %q to not exist", firstVersion)
}
})
}
func createTableOp(tableName string) *migrations.OpCreateTable {
return &migrations.OpCreateTable{
Name: tableName,
Columns: []migrations.Column{
{
Name: "id",
Type: "integer",
PrimaryKey: true,
},
{
Name: "name",
Type: "varchar(255)",
Unique: true,
},
},
}
}
func withMigratorAndConnectionToContainer(t *testing.T, fn func(mig *roll.Roll, db *sql.DB)) {
t.Helper()
ctx := context.Background()
waitForLogs := wait.
ForLog("database system is ready to accept connections").
WithOccurrence(2).
WithStartupTimeout(5 * time.Second)
ctr, err := postgres.RunContainer(ctx,
testcontainers.WithImage(postgresImage),
testcontainers.WithWaitStrategy(waitForLogs),
)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := ctr.Terminate(ctx); err != nil {
t.Fatalf("Failed to terminate container: %v", err)
}
})
cStr, err := ctr.ConnectionString(ctx, "sslmode=disable")
if err != nil {
t.Fatal(err)
}
st, err := state.New(ctx, cStr, "pgroll")
if err != nil {
t.Fatal(err)
}
err = st.Init(ctx)
if err != nil {
t.Fatal(err)
}
mig, err := roll.New(ctx, cStr, "public", st)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := mig.Close(); err != nil {
t.Fatalf("Failed to close migrator connection: %v", err)
}
})
db, err := sql.Open("postgres", cStr)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
if err := db.Close(); err != nil {
t.Fatalf("Failed to close database connection: %v", err)
}
})
fn(mig, db)
}

View File

@ -54,6 +54,14 @@ AS $$ SELECT p.name FROM %[1]s.migrations p WHERE NOT EXISTS (SELECT 1 FROM %[1]
LANGUAGE SQL
STABLE;
-- Get the name of the previous version of the schema, or NULL if there is none.
CREATE OR REPLACE FUNCTION %[1]s.previous_version(schemaname NAME) RETURNS text
AS $$
SELECT parent FROM %[1]s.migrations WHERE name = (SELECT %[1]s.latest_version('public')) AND schema=schemaname;
$$
LANGUAGE SQL
STABLE;
-- Get the JSON representation of the current schema
CREATE OR REPLACE FUNCTION %[1]s.read_schema(schemaname text) RETURNS jsonb
LANGUAGE plpgsql AS $$
@ -181,6 +189,19 @@ func (s *State) GetActiveMigration(ctx context.Context, schema string) (*migrati
return &migration, nil
}
func (s *State) PreviousVersion(ctx context.Context, schema string) (*string, error) {
var parent *string
err := s.pgConn.QueryRowContext(ctx,
fmt.Sprintf("SELECT %s.previous_version($1)", pq.QuoteIdentifier(s.schema)),
schema).
Scan(&parent)
if err != nil {
return nil, err
}
return parent, nil
}
// Start creates a new migration, storing its name and raw content
// this will effectively activate a new migration period, so `IsActiveMigrationPeriod` will return true
// until the migration is completed