From f2bb2f9581f693c3a90f34380154aba0fcfcdf09 Mon Sep 17 00:00:00 2001 From: Andrew Farries Date: Fri, 22 Sep 2023 09:52:15 +0100 Subject: [PATCH] Make `set_unique` operations respect the contract for old schema versions (#118) When `pg-roll` makes a schema change, the contract with the user is that it will leave the old version of schema unchanged. When the operation to add a `UNIQUE` constraint was implemented (https://github.com/xataio/pg-roll/pull/53), this contract was not respected. The operation adds a unique index to the existing column, changing the behaviour for users of the old schema. This PR changes the operation so that it follows a similar pattern to other operations that were implemented later: * On `Start`: * Duplicate the column. * Add a `UNIQUE` index concurrently * Create `up` and `down` triggers to copy values between the old and new columns. * Backfill values from the old column into the new using `up` SQL * On `Complete` * Create a unique constraint on the new column using the unique index. * Drop the old column * Rename the column to its old name. * Remove `up` and `down` triggers. Writing correct `up` SQL for the operation is a little more difficult than for other operations (eg set `NOT NULL`) as it is up to the user to ensure uniqueness of values. The example migration in this PR appends a random suffix to each value. --- examples/15_set_column_unique.json | 4 +- pkg/migrations/op_alter_column.go | 4 +- pkg/migrations/op_set_unique.go | 132 +++++++++++++++++++++++++-- pkg/migrations/op_set_unique_test.go | 37 ++++++-- 4 files changed, 157 insertions(+), 20 deletions(-) diff --git a/examples/15_set_column_unique.json b/examples/15_set_column_unique.json index 1b79f4c..a5c11df 100644 --- a/examples/15_set_column_unique.json +++ b/examples/15_set_column_unique.json @@ -7,7 +7,9 @@ "column": "review", "unique": { "name": "reviews_review_unique" - } + }, + "up": "review || '-' || (random()*1000000)::integer", + "down": "review" } } ] diff --git a/pkg/migrations/op_alter_column.go b/pkg/migrations/op_alter_column.go index dffe124..ea815df 100644 --- a/pkg/migrations/op_alter_column.go +++ b/pkg/migrations/op_alter_column.go @@ -59,7 +59,7 @@ func (o *OpAlterColumn) Validate(ctx context.Context, s *schema.Schema) error { // Apply any special validation rules for the inner operation op := o.innerOperation() switch op.(type) { - case *OpRenameColumn, *OpSetUnique: + case *OpRenameColumn: if o.Up != "" { return NoUpSQLAllowedError{} } @@ -126,6 +126,8 @@ func (o *OpAlterColumn) innerOperation() Operation { Table: o.Table, Column: o.Column, Name: o.Unique.Name, + Up: o.Up, + Down: o.Down, } } return nil diff --git a/pkg/migrations/op_set_unique.go b/pkg/migrations/op_set_unique.go index 06d778d..bf08303 100644 --- a/pkg/migrations/op_set_unique.go +++ b/pkg/migrations/op_set_unique.go @@ -13,32 +13,136 @@ type OpSetUnique struct { Name string `json:"name"` Table string `json:"table"` Column string `json:"column"` + Up string `json:"up"` + Down string `json:"down"` } var _ Operation = (*OpSetUnique)(nil) func (o *OpSetUnique) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema) error { - // create unique index concurrently - _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)", - pq.QuoteIdentifier(o.Name), - pq.QuoteIdentifier(o.Table), - pq.QuoteIdentifier(o.Column))) - return err + table := s.GetTable(o.Table) + column := table.GetColumn(o.Column) + + // create a copy of the column on the underlying table. + if err := duplicateColumn(ctx, conn, table, *column); err != nil { + return fmt.Errorf("failed to duplicate column: %w", err) + } + + // Add a unique index to the new column + if err := o.addUniqueIndex(ctx, conn); err != nil { + return fmt.Errorf("failed to add unique index: %w", err) + } + + // Add a trigger to copy values from the old column to the new, rewriting values using the `up` SQL. + err := createTrigger(ctx, conn, triggerConfig{ + Name: TriggerName(o.Table, o.Column), + Direction: TriggerDirectionUp, + Columns: table.Columns, + SchemaName: s.Name, + TableName: o.Table, + PhysicalColumn: TemporaryName(o.Column), + StateSchema: stateSchema, + SQL: o.Up, + }) + if err != nil { + return fmt.Errorf("failed to create up trigger: %w", err) + } + + // Backfill the new column with values from the old column. + if err := backFill(ctx, conn, o.Table, TemporaryName(o.Column)); err != nil { + return fmt.Errorf("failed to backfill column: %w", err) + } + + // Add the new column to the internal schema representation. This is done + // here, before creation of the down trigger, so that the trigger can declare + // a variable for the new column. + table.AddColumn(o.Column, schema.Column{ + Name: TemporaryName(o.Column), + }) + + // Add a trigger to copy values from the new column to the old, rewriting values using the `down` SQL. + err = createTrigger(ctx, conn, triggerConfig{ + Name: TriggerName(o.Table, TemporaryName(o.Column)), + Direction: TriggerDirectionDown, + Columns: table.Columns, + SchemaName: s.Name, + TableName: o.Table, + PhysicalColumn: o.Column, + StateSchema: stateSchema, + SQL: o.Down, + }) + if err != nil { + return fmt.Errorf("failed to create down trigger: %w", err) + } + + return nil } func (o *OpSetUnique) Complete(ctx context.Context, conn *sql.DB) error { - // create a unique constraint using the unique index + // Create a unique constraint using the unique index _, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s ADD CONSTRAINT %s UNIQUE USING INDEX %s", pq.QuoteIdentifier(o.Table), pq.QuoteIdentifier(o.Name), pq.QuoteIdentifier(o.Name))) + if err != nil { + return err + } + + // Remove the up function and trigger + _, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE", + pq.QuoteIdentifier(TriggerFunctionName(o.Table, o.Column)), + )) + if err != nil { + return err + } + + // Remove the down function and trigger + _, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE", + pq.QuoteIdentifier(TriggerFunctionName(o.Table, TemporaryName(o.Column))), + )) + if err != nil { + return err + } + + // Drop the old column + _, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s DROP COLUMN IF EXISTS %s", + pq.QuoteIdentifier(o.Table), + pq.QuoteIdentifier(o.Column))) + if err != nil { + return err + } + + // Rename the new column to the old column name + _, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s RENAME COLUMN %s TO %s", + pq.QuoteIdentifier(o.Table), + pq.QuoteIdentifier(TemporaryName(o.Column)), + pq.QuoteIdentifier(o.Column))) return err } func (o *OpSetUnique) Rollback(ctx context.Context, conn *sql.DB) error { - // drop the index concurrently - _, err := conn.ExecContext(ctx, fmt.Sprintf("DROP INDEX CONCURRENTLY IF EXISTS %s", o.Name)) + // Drop the new column, taking the unique index on the column with it + _, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s DROP COLUMN IF EXISTS %s", + pq.QuoteIdentifier(o.Table), + pq.QuoteIdentifier(TemporaryName(o.Column)), + )) + if err != nil { + return err + } + + // Remove the up function and trigger + _, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE", + pq.QuoteIdentifier(TriggerFunctionName(o.Table, o.Column)), + )) + if err != nil { + return err + } + + // Remove the down function and trigger + _, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE", + pq.QuoteIdentifier(TriggerFunctionName(o.Table, TemporaryName(o.Column))), + )) return err } @@ -59,3 +163,13 @@ func (o *OpSetUnique) Validate(ctx context.Context, s *schema.Schema) error { return nil } + +func (o *OpSetUnique) addUniqueIndex(ctx context.Context, conn *sql.DB) error { + // create unique index concurrently + _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)", + pq.QuoteIdentifier(o.Name), + pq.QuoteIdentifier(o.Table), + pq.QuoteIdentifier(TemporaryName(o.Column)))) + + return err +} diff --git a/pkg/migrations/op_set_unique_test.go b/pkg/migrations/op_set_unique_test.go index b4b81d5..9d9c312 100644 --- a/pkg/migrations/op_set_unique_test.go +++ b/pkg/migrations/op_set_unique_test.go @@ -52,19 +52,18 @@ func TestSetColumnUnique(t *testing.T) { Unique: &migrations.UniqueConstraint{ Name: "reviews_review_unique", }, + Up: "review || '-' || (random()*1000000)::integer", + Down: "review", }, }, }, }, afterStart: func(t *testing.T, db *sql.DB) { - // The unique index has been created on the underlying table. - IndexMustExist(t, db, "public", "reviews", "reviews_review_unique") - - // Inserting values into the old schema that violate uniqueness should fail. + // Inserting values into the old schema that violate uniqueness should succeed. MustInsert(t, db, "public", "01_add_table", "reviews", map[string]string{ "username": "alice", "product": "apple", "review": "good", }) - MustNotInsert(t, db, "public", "01_add_table", "reviews", map[string]string{ + MustInsert(t, db, "public", "01_add_table", "reviews", map[string]string{ "username": "bob", "product": "banana", "review": "good", }) @@ -77,12 +76,32 @@ func TestSetColumnUnique(t *testing.T) { }) }, afterRollback: func(t *testing.T, db *sql.DB) { - // The unique index has been dropped from the the underlying table. - IndexMustNotExist(t, db, "public", "reviews", "reviews_review_unique") + // The new (temporary) `review` column should not exist on the underlying table. + ColumnMustNotExist(t, db, "public", "reviews", migrations.TemporaryName("review")) + + // The up function no longer exists. + FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", "review")) + // The down function no longer exists. + FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", migrations.TemporaryName("review"))) + + // The up trigger no longer exists. + TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", "review")) + // The down trigger no longer exists. + TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", migrations.TemporaryName("review"))) }, afterComplete: func(t *testing.T, db *sql.DB) { - // The unique constraint has been created on the underlying table. - ConstraintMustExist(t, db, "public", "reviews", "reviews_review_unique") + // The new (temporary) `review` column should not exist on the underlying table. + ColumnMustNotExist(t, db, "public", "reviews", migrations.TemporaryName("review")) + + // The up function no longer exists. + FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", "review")) + // The down function no longer exists. + FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", migrations.TemporaryName("review"))) + + // The up trigger no longer exists. + TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", "review")) + // The down trigger no longer exists. + TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", migrations.TemporaryName("review"))) // Inserting values into the new schema that violate uniqueness should fail. MustInsert(t, db, "public", "02_set_unique", "reviews", map[string]string{