Make set_unique operations respect the contract for old schema versions (#118)

When `pg-roll` makes a schema change, the contract with the user is that
it will leave the old version of schema unchanged.

When the operation to add a `UNIQUE` constraint was implemented
(https://github.com/xataio/pg-roll/pull/53), this contract was not
respected. The operation adds a unique index to the existing column,
changing the behaviour for users of the old schema.

This PR changes the operation so that it follows a similar pattern to
other operations that were implemented later:
* On `Start`:
  * Duplicate the column.
  * Add a `UNIQUE` index concurrently
* Create `up` and `down` triggers to copy values between the old and new
columns.
  * Backfill values from the old column into the new using `up` SQL
* On `Complete`
  * Create a unique constraint on the new column using the unique index.
  * Drop the old column
  * Rename the column to its old name.
  * Remove `up` and `down` triggers.

Writing correct `up` SQL for the operation is a little more difficult
than for other operations (eg set `NOT NULL`) as it is up to the user to
ensure uniqueness of values. The example migration in this PR appends a
random suffix to each value.
This commit is contained in:
Andrew Farries 2023-09-22 09:52:15 +01:00 committed by GitHub
parent 99e34b1977
commit f2bb2f9581
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 157 additions and 20 deletions

View File

@ -7,7 +7,9 @@
"column": "review",
"unique": {
"name": "reviews_review_unique"
}
},
"up": "review || '-' || (random()*1000000)::integer",
"down": "review"
}
}
]

View File

@ -59,7 +59,7 @@ func (o *OpAlterColumn) Validate(ctx context.Context, s *schema.Schema) error {
// Apply any special validation rules for the inner operation
op := o.innerOperation()
switch op.(type) {
case *OpRenameColumn, *OpSetUnique:
case *OpRenameColumn:
if o.Up != "" {
return NoUpSQLAllowedError{}
}
@ -126,6 +126,8 @@ func (o *OpAlterColumn) innerOperation() Operation {
Table: o.Table,
Column: o.Column,
Name: o.Unique.Name,
Up: o.Up,
Down: o.Down,
}
}
return nil

View File

@ -13,32 +13,136 @@ type OpSetUnique struct {
Name string `json:"name"`
Table string `json:"table"`
Column string `json:"column"`
Up string `json:"up"`
Down string `json:"down"`
}
var _ Operation = (*OpSetUnique)(nil)
func (o *OpSetUnique) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema) error {
// create unique index concurrently
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
pq.QuoteIdentifier(o.Name),
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(o.Column)))
return err
table := s.GetTable(o.Table)
column := table.GetColumn(o.Column)
// create a copy of the column on the underlying table.
if err := duplicateColumn(ctx, conn, table, *column); err != nil {
return fmt.Errorf("failed to duplicate column: %w", err)
}
// Add a unique index to the new column
if err := o.addUniqueIndex(ctx, conn); err != nil {
return fmt.Errorf("failed to add unique index: %w", err)
}
// Add a trigger to copy values from the old column to the new, rewriting values using the `up` SQL.
err := createTrigger(ctx, conn, triggerConfig{
Name: TriggerName(o.Table, o.Column),
Direction: TriggerDirectionUp,
Columns: table.Columns,
SchemaName: s.Name,
TableName: o.Table,
PhysicalColumn: TemporaryName(o.Column),
StateSchema: stateSchema,
SQL: o.Up,
})
if err != nil {
return fmt.Errorf("failed to create up trigger: %w", err)
}
// Backfill the new column with values from the old column.
if err := backFill(ctx, conn, o.Table, TemporaryName(o.Column)); err != nil {
return fmt.Errorf("failed to backfill column: %w", err)
}
// Add the new column to the internal schema representation. This is done
// here, before creation of the down trigger, so that the trigger can declare
// a variable for the new column.
table.AddColumn(o.Column, schema.Column{
Name: TemporaryName(o.Column),
})
// Add a trigger to copy values from the new column to the old, rewriting values using the `down` SQL.
err = createTrigger(ctx, conn, triggerConfig{
Name: TriggerName(o.Table, TemporaryName(o.Column)),
Direction: TriggerDirectionDown,
Columns: table.Columns,
SchemaName: s.Name,
TableName: o.Table,
PhysicalColumn: o.Column,
StateSchema: stateSchema,
SQL: o.Down,
})
if err != nil {
return fmt.Errorf("failed to create down trigger: %w", err)
}
return nil
}
func (o *OpSetUnique) Complete(ctx context.Context, conn *sql.DB) error {
// create a unique constraint using the unique index
// Create a unique constraint using the unique index
_, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s ADD CONSTRAINT %s UNIQUE USING INDEX %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(o.Name),
pq.QuoteIdentifier(o.Name)))
if err != nil {
return err
}
// Remove the up function and trigger
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, o.Column)),
))
if err != nil {
return err
}
// Remove the down function and trigger
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, TemporaryName(o.Column))),
))
if err != nil {
return err
}
// Drop the old column
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(o.Column)))
if err != nil {
return err
}
// Rename the new column to the old column name
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s RENAME COLUMN %s TO %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(TemporaryName(o.Column)),
pq.QuoteIdentifier(o.Column)))
return err
}
func (o *OpSetUnique) Rollback(ctx context.Context, conn *sql.DB) error {
// drop the index concurrently
_, err := conn.ExecContext(ctx, fmt.Sprintf("DROP INDEX CONCURRENTLY IF EXISTS %s", o.Name))
// Drop the new column, taking the unique index on the column with it
_, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(TemporaryName(o.Column)),
))
if err != nil {
return err
}
// Remove the up function and trigger
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, o.Column)),
))
if err != nil {
return err
}
// Remove the down function and trigger
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, TemporaryName(o.Column))),
))
return err
}
@ -59,3 +163,13 @@ func (o *OpSetUnique) Validate(ctx context.Context, s *schema.Schema) error {
return nil
}
func (o *OpSetUnique) addUniqueIndex(ctx context.Context, conn *sql.DB) error {
// create unique index concurrently
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
pq.QuoteIdentifier(o.Name),
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(TemporaryName(o.Column))))
return err
}

View File

@ -52,19 +52,18 @@ func TestSetColumnUnique(t *testing.T) {
Unique: &migrations.UniqueConstraint{
Name: "reviews_review_unique",
},
Up: "review || '-' || (random()*1000000)::integer",
Down: "review",
},
},
},
},
afterStart: func(t *testing.T, db *sql.DB) {
// The unique index has been created on the underlying table.
IndexMustExist(t, db, "public", "reviews", "reviews_review_unique")
// Inserting values into the old schema that violate uniqueness should fail.
// Inserting values into the old schema that violate uniqueness should succeed.
MustInsert(t, db, "public", "01_add_table", "reviews", map[string]string{
"username": "alice", "product": "apple", "review": "good",
})
MustNotInsert(t, db, "public", "01_add_table", "reviews", map[string]string{
MustInsert(t, db, "public", "01_add_table", "reviews", map[string]string{
"username": "bob", "product": "banana", "review": "good",
})
@ -77,12 +76,32 @@ func TestSetColumnUnique(t *testing.T) {
})
},
afterRollback: func(t *testing.T, db *sql.DB) {
// The unique index has been dropped from the the underlying table.
IndexMustNotExist(t, db, "public", "reviews", "reviews_review_unique")
// The new (temporary) `review` column should not exist on the underlying table.
ColumnMustNotExist(t, db, "public", "reviews", migrations.TemporaryName("review"))
// The up function no longer exists.
FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", "review"))
// The down function no longer exists.
FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", migrations.TemporaryName("review")))
// The up trigger no longer exists.
TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", "review"))
// The down trigger no longer exists.
TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", migrations.TemporaryName("review")))
},
afterComplete: func(t *testing.T, db *sql.DB) {
// The unique constraint has been created on the underlying table.
ConstraintMustExist(t, db, "public", "reviews", "reviews_review_unique")
// The new (temporary) `review` column should not exist on the underlying table.
ColumnMustNotExist(t, db, "public", "reviews", migrations.TemporaryName("review"))
// The up function no longer exists.
FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", "review"))
// The down function no longer exists.
FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", migrations.TemporaryName("review")))
// The up trigger no longer exists.
TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", "review"))
// The down trigger no longer exists.
TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", migrations.TemporaryName("review")))
// Inserting values into the new schema that violate uniqueness should fail.
MustInsert(t, db, "public", "02_set_unique", "reviews", map[string]string{