Implement the 'change column type' operation (#74)

Implement the **change column type** operation. A change type migration
looks like this:

```json
{
  "name": "18_change_column_type",
  "operations": [
    {
      "change_type": {
        "table": "reviews",
        "column": "rating",
        "type": "integer",
        "up": "CAST(rating AS integer)",
        "down": "CAST(rating AS text)"
      }
    }
  ]
}
```
This migration changes the type of the `rating` column from `TEXT` to
`INTEGER`.

The implementation is very similar to the **set NOT NULL** operation
(#63):

* On `Start`:
  *  Create a new column having the new type
* Backfill the new column with values from the existing column,
converting the types using the `up` SQL.
* Create a trigger to populate the new column when values are written to
the old column, converting types with `up`.
* Create a trigger to populate the old column when values are written to
the new column, converting types with `down`.
* On `Complete`
  * Remove triggers
  * Drop the old column
  * Rename the new column to the old column name.
* On `Rollback`
  * Remove the new column and both triggers.

The migration can fail in at least 2 ways:
* The initial backfill of existing rows on `Start` fails due to the type
conversion not being possible on one or more rows. In the above example,
any existing rows with `rating` values not representable as an `INTEGER`
will cause a failure on `Start`.
* In this case, the failure is reported and the migration rolled back
(#73)
* During the rollout period, unconvertible values are written to the old
version schema. The `up` trigger will fail to convert the values and the
`INSERT`/`UPDATE` will fail.
* Some form of data quarantine needs to be implemented here, copying the
invalid rows elsewhere and blocking completion of the migration until
those rows are handled in some way).

The PR also adds example migrations to the `/examples` directory.
This commit is contained in:
Andrew Farries 2023-09-01 13:37:43 +01:00 committed by GitHub
parent 275cc9089c
commit 6b91f4322b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 481 additions and 0 deletions

View File

@ -0,0 +1,15 @@
{
"name": "17_add_rating_column",
"operations": [
{
"add_column": {
"table": "reviews",
"column": {
"name": "rating",
"type": "text",
"default": "0"
}
}
}
]
}

View File

@ -0,0 +1,14 @@
{
"name": "18_change_column_type",
"operations": [
{
"change_type": {
"table": "reviews",
"column": "rating",
"type": "integer",
"up": "CAST(rating AS integer)",
"down": "CAST(rating AS text)"
}
}
]
}

View File

@ -0,0 +1,164 @@
package migrations
import (
"context"
"database/sql"
"fmt"
"github.com/lib/pq"
"github.com/xataio/pg-roll/pkg/schema"
)
type OpChangeType struct {
Table string `json:"table"`
Column string `json:"column"`
Type string `json:"type"`
Up string `json:"up"`
Down string `json:"down"`
}
var _ Operation = (*OpChangeType)(nil)
func (o *OpChangeType) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema) error {
table := s.GetTable(o.Table)
column := table.GetColumn(o.Column)
// Create a copy of the column on the underlying table.
if err := duplicateColumnForTypeChange(ctx, conn, table, *column, o.Type); err != nil {
return fmt.Errorf("failed to duplicate column: %w", err)
}
// Add a trigger to copy values from the old column to the new, rewriting values using the `up` SQL.
err := createTrigger(ctx, conn, triggerConfig{
Name: TriggerName(o.Table, o.Column),
Direction: TriggerDirectionUp,
Columns: table.Columns,
SchemaName: s.Name,
TableName: o.Table,
PhysicalColumn: TemporaryName(o.Column),
StateSchema: stateSchema,
SQL: o.Up,
})
if err != nil {
return fmt.Errorf("failed to create up trigger: %w", err)
}
// Backfill the new column with values from the old column.
if err := backFill(ctx, conn, o.Table, TemporaryName(o.Column)); err != nil {
return fmt.Errorf("failed to backfill column: %w", err)
}
// Add the new column to the internal schema representation. This is done
// here, before creation of the down trigger, so that the trigger can declare
// a variable for the new column.
table.AddColumn(o.Column, schema.Column{
Name: TemporaryName(o.Column),
})
// Add a trigger to copy values from the new column to the old, rewriting values using the `down` SQL.
err = createTrigger(ctx, conn, triggerConfig{
Name: TriggerName(o.Table, TemporaryName(o.Column)),
Direction: TriggerDirectionDown,
Columns: table.Columns,
SchemaName: s.Name,
TableName: o.Table,
PhysicalColumn: o.Column,
StateSchema: stateSchema,
SQL: o.Down,
})
if err != nil {
return fmt.Errorf("failed to create down trigger: %w", err)
}
return nil
}
func (o *OpChangeType) Complete(ctx context.Context, conn *sql.DB) error {
// Remove the up function and trigger
_, err := conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, o.Column))))
if err != nil {
return err
}
// Remove the down function and trigger
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, TemporaryName(o.Column)))))
if err != nil {
return err
}
// Drop the old column
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(o.Column)))
if err != nil {
return err
}
// Rename the new column to the old column name
_, err = conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE IF EXISTS %s RENAME COLUMN %s TO %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(TemporaryName(o.Column)),
pq.QuoteIdentifier(o.Column)))
return err
}
func (o *OpChangeType) Rollback(ctx context.Context, conn *sql.DB) error {
// Drop the new column
_, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s DROP COLUMN IF EXISTS %s",
pq.QuoteIdentifier(o.Table),
pq.QuoteIdentifier(TemporaryName(o.Column)),
))
if err != nil {
return err
}
// Remove the up function and trigger
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, o.Column)),
))
if err != nil {
return err
}
// Remove the down function and trigger
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP FUNCTION IF EXISTS %s CASCADE",
pq.QuoteIdentifier(TriggerFunctionName(o.Table, TemporaryName(o.Column))),
))
return err
}
func (o *OpChangeType) Validate(ctx context.Context, s *schema.Schema) error {
table := s.GetTable(o.Table)
if table == nil {
return TableDoesNotExistError{Name: o.Table}
}
if table.GetColumn(o.Column) == nil {
return ColumnDoesNotExistError{Table: o.Table, Name: o.Column}
}
if o.Up == "" {
return FieldRequiredError{Name: "up"}
}
if o.Down == "" {
return FieldRequiredError{Name: "down"}
}
return nil
}
func duplicateColumnForTypeChange(ctx context.Context, conn *sql.DB, table *schema.Table, column schema.Column, newType string) error {
column.Name = TemporaryName(column.Name)
column.Type = newType
_, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s",
pq.QuoteIdentifier(table.Name),
schemaColumnToSQL(column),
))
return err
}

View File

@ -0,0 +1,255 @@
package migrations_test
import (
"database/sql"
"testing"
"github.com/stretchr/testify/assert"
"github.com/xataio/pg-roll/pkg/migrations"
"github.com/xataio/pg-roll/pkg/roll"
)
func TestChangeColumnType(t *testing.T) {
t.Parallel()
ExecuteTests(t, TestCases{{
name: "change column type",
migrations: []migrations.Migration{
{
Name: "01_add_table",
Operations: migrations.Operations{
&migrations.OpCreateTable{
Name: "reviews",
Columns: []migrations.Column{
{
Name: "id",
Type: "serial",
PrimaryKey: true,
},
{
Name: "username",
Type: "text",
},
{
Name: "product",
Type: "text",
},
{
Name: "rating",
Type: "text",
Default: ptr("0"),
},
},
},
},
},
{
Name: "02_change_type",
Operations: migrations.Operations{
&migrations.OpChangeType{
Table: "reviews",
Column: "rating",
Type: "integer",
Up: "CAST (rating AS integer)",
Down: "CAST (rating AS text)",
},
},
},
},
afterStart: func(t *testing.T, db *sql.DB) {
newVersionSchema := roll.VersionedSchemaName("public", "02_change_type")
// The new (temporary) `rating` column should exist on the underlying table.
ColumnMustExist(t, db, "public", "reviews", migrations.TemporaryName("rating"))
// The `rating` column in the new view must have the correct type.
ColumnMustHaveType(t, db, newVersionSchema, "reviews", "rating", "integer")
// Inserting into the new `rating` column should work.
MustInsert(t, db, "public", "02_change_type", "reviews", map[string]string{
"username": "alice",
"product": "apple",
"rating": "5",
})
// The value inserted into the new `rating` column has been backfilled into
// the old `rating` column.
rows := MustSelect(t, db, "public", "01_add_table", "reviews")
assert.Equal(t, []map[string]any{
{"id": 1, "username": "alice", "product": "apple", "rating": "5"},
}, rows)
// Inserting into the old `rating` column should work.
MustInsert(t, db, "public", "01_add_table", "reviews", map[string]string{
"username": "bob",
"product": "banana",
"rating": "8",
})
// The value inserted into the old `rating` column has been backfilled into
// the new `rating` column.
rows = MustSelect(t, db, "public", "02_change_type", "reviews")
assert.Equal(t, []map[string]any{
{"id": 1, "username": "alice", "product": "apple", "rating": 5},
{"id": 2, "username": "bob", "product": "banana", "rating": 8},
}, rows)
},
afterRollback: func(t *testing.T, db *sql.DB) {
// The new (temporary) `rating` column should not exist on the underlying table.
ColumnMustNotExist(t, db, "public", "reviews", migrations.TemporaryName("rating"))
// The up function no longer exists.
FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", "rating"))
// The down function no longer exists.
FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", migrations.TemporaryName("rating")))
// The up trigger no longer exists.
TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", "rating"))
// The down trigger no longer exists.
TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", migrations.TemporaryName("rating")))
},
afterComplete: func(t *testing.T, db *sql.DB) {
newVersionSchema := roll.VersionedSchemaName("public", "02_change_type")
// The new (temporary) `rating` column should not exist on the underlying table.
ColumnMustNotExist(t, db, "public", "reviews", migrations.TemporaryName("rating"))
// The `rating` column in the new view must have the correct type.
ColumnMustHaveType(t, db, newVersionSchema, "reviews", "rating", "integer")
// Inserting into the new view should work.
MustInsert(t, db, "public", "02_change_type", "reviews", map[string]string{
"username": "carl",
"product": "carrot",
"rating": "3",
})
// Selecting from the new view should succeed.
rows := MustSelect(t, db, "public", "02_change_type", "reviews")
assert.Equal(t, []map[string]any{
{"id": 1, "username": "alice", "product": "apple", "rating": 5},
{"id": 2, "username": "bob", "product": "banana", "rating": 8},
{"id": 3, "username": "carl", "product": "carrot", "rating": 3},
}, rows)
// The up function no longer exists.
FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", "rating"))
// The down function no longer exists.
FunctionMustNotExist(t, db, "public", migrations.TriggerFunctionName("reviews", migrations.TemporaryName("rating")))
// The up trigger no longer exists.
TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", "rating"))
// The down trigger no longer exists.
TriggerMustNotExist(t, db, "public", "reviews", migrations.TriggerName("reviews", migrations.TemporaryName("rating")))
},
}})
}
func TestChangeColumnTypeValidation(t *testing.T) {
t.Parallel()
createTableMigration := migrations.Migration{
Name: "01_add_table",
Operations: migrations.Operations{
&migrations.OpCreateTable{
Name: "reviews",
Columns: []migrations.Column{
{
Name: "id",
Type: "serial",
PrimaryKey: true,
},
{
Name: "username",
Type: "text",
},
{
Name: "product",
Type: "text",
},
{
Name: "rating",
Type: "text",
},
},
},
},
}
ExecuteTests(t, TestCases{
{
name: "up SQL is mandatory",
migrations: []migrations.Migration{
createTableMigration,
{
Name: "02_change_type",
Operations: migrations.Operations{
&migrations.OpChangeType{
Table: "reviews",
Column: "rating",
Type: "integer",
Down: "CAST (rating AS text)",
},
},
},
},
wantStartErr: migrations.FieldRequiredError{Name: "up"},
},
{
name: "down SQL is mandatory",
migrations: []migrations.Migration{
createTableMigration,
{
Name: "02_change_type",
Operations: migrations.Operations{
&migrations.OpChangeType{
Table: "reviews",
Column: "rating",
Type: "integer",
Up: "CAST (rating AS integer)",
},
},
},
},
wantStartErr: migrations.FieldRequiredError{Name: "down"},
},
{
name: "table must exist",
migrations: []migrations.Migration{
createTableMigration,
{
Name: "02_change_type",
Operations: migrations.Operations{
&migrations.OpChangeType{
Table: "doesntexist",
Column: "rating",
Type: "integer",
Up: "CAST (rating AS integer)",
Down: "CAST (rating AS text)",
},
},
},
},
wantStartErr: migrations.TableDoesNotExistError{Name: "doesntexist"},
},
{
name: "column must exist",
migrations: []migrations.Migration{
createTableMigration,
{
Name: "02_change_type",
Operations: migrations.Operations{
&migrations.OpChangeType{
Table: "reviews",
Column: "doesntexist",
Type: "integer",
Up: "CAST (rating AS integer)",
Down: "CAST (rating AS text)",
},
},
},
},
wantStartErr: migrations.ColumnDoesNotExistError{Table: "reviews", Name: "doesntexist"},
},
})
}

View File

@ -21,6 +21,7 @@ const (
OpNameRenameColumn OpName = "rename_column"
OpNameSetUnique OpName = "set_unique"
OpNameSetNotNull OpName = "set_not_null"
OpNameChangeType OpName = "change_type"
OpRawSQLName OpName = "sql"
)
@ -107,6 +108,9 @@ func (v *Operations) UnmarshalJSON(data []byte) error {
case OpNameSetNotNull:
item = &OpSetNotNull{}
case OpNameChangeType:
item = &OpChangeType{}
case OpRawSQLName:
item = &OpRawSQL{}
@ -184,6 +188,9 @@ func OperationName(op Operation) OpName {
case *OpSetNotNull:
return OpNameSetNotNull
case *OpChangeType:
return OpNameChangeType
case *OpRawSQL:
return OpRawSQLName

View File

@ -208,6 +208,13 @@ func ColumnMustNotExist(t *testing.T, db *sql.DB, schema, table, column string)
}
}
func ColumnMustHaveType(t *testing.T, db *sql.DB, schema, table, column, expectedType string) {
t.Helper()
if !columnHasType(t, db, schema, table, column, expectedType) {
t.Fatalf("Expected column %q to have type %q", column, expectedType)
}
}
func TableMustHaveColumnCount(t *testing.T, db *sql.DB, schema, table string, n int) {
t.Helper()
if !tableMustHaveColumnCount(t, db, schema, table, n) {
@ -422,6 +429,25 @@ func columnExists(t *testing.T, db *sql.DB, schema, table, column string) bool {
return exists
}
func columnHasType(t *testing.T, db *sql.DB, schema, table, column, expectedType string) bool {
t.Helper()
var actualType string
err := db.QueryRow(`
SELECT data_type
FROM information_schema.columns
WHERE table_schema = $1
AND table_name = $2
AND column_name = $3
`,
schema, table, column).Scan(&actualType)
if err != nil {
t.Fatal(err)
}
return expectedType == actualType
}
func MustInsert(t *testing.T, db *sql.DB, schema, version, table string, record map[string]string) {
t.Helper()