Dont duplicate CHECK constraints and DEFAULTs when altering column type (#349)

When a column is duplicated with a new type, check constraints and
defaults defined on the column *may* be incompatible with the new type.

In this case column duplication should not fail, but rather the
incompatible constraints and defaults should be ignored and not be
duplicated to the new column.

This PR changes column duplication to ignore errors on `DEFAULT` and
`CHECK` constraint duplication that look as though they are caused by a
change of column type.

Fixes https://github.com/xataio/pgroll/issues/348
This commit is contained in:
Andrew Farries 2024-05-08 18:52:47 +01:00 committed by GitHub
parent 5c1aef2f24
commit 4c1bc6a03f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 183 additions and 22 deletions

View File

@ -4,6 +4,7 @@ package migrations
import (
"context"
"errors"
"fmt"
"slices"
"strings"
@ -23,6 +24,11 @@ type Duplicator struct {
withoutConstraint string
}
const (
dataTypeMismatchErrorCode pq.ErrorCode = "42804"
undefinedFunctionErrorCode pq.ErrorCode = "42883"
)
// NewColumnDuplicator creates a new Duplicator for a column.
func NewColumnDuplicator(conn db.DB, table *schema.Table, column *schema.Column) *Duplicator {
return &Duplicator{
@ -49,16 +55,17 @@ func (d *Duplicator) WithoutNotNull() *Duplicator {
return d
}
// Duplicate creates a new column with the same type and foreign key
// constraints as the original column.
// Duplicate duplicates a column in the table, including all constraints and
// comments.
func (d *Duplicator) Duplicate(ctx context.Context) error {
const (
cAlterTableSQL = `ALTER TABLE %s ADD COLUMN %s %s`
cSetDefaultSQL = `ALTER COLUMN %s SET DEFAULT %s`
cAddForeignKeySQL = `ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s (%s) ON DELETE %s`
cAddCheckConstraintSQL = `ADD CONSTRAINT %s %s NOT VALID`
cCreateUniqueIndexSQL = `CREATE UNIQUE INDEX CONCURRENTLY %s ON %s (%s)`
cCommentOnColumnSQL = `COMMENT ON COLUMN %s.%s IS %s`
cAlterTableSQL = `ALTER TABLE %s ADD COLUMN %s %s`
cAddForeignKeySQL = `ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s (%s) ON DELETE %s`
cAddCheckConstraintSQL = `ADD CONSTRAINT %s %s NOT VALID`
cCreateUniqueIndexSQL = `CREATE UNIQUE INDEX CONCURRENTLY %s ON %s (%s)`
cSetDefaultSQL = `ALTER TABLE %s ALTER COLUMN %s SET DEFAULT %s`
cAlterTableAddCheckConstraintSQL = `ALTER TABLE %s ADD CONSTRAINT %s %s NOT VALID`
cCommentOnColumnSQL = `COMMENT ON COLUMN %s.%s IS %s`
)
// Generate SQL to duplicate the column's name and type
@ -67,11 +74,6 @@ func (d *Duplicator) Duplicate(ctx context.Context) error {
pq.QuoteIdentifier(d.asName),
d.withType)
// Generate SQL to duplicate the column's default value
if d.column.Default != nil {
sql += fmt.Sprintf(", "+cSetDefaultSQL, d.asName, *d.column.Default)
}
// Generate SQL to add an unchecked NOT NULL constraint if the original column
// is NOT NULL. The constraint will be validated on migration completion.
if !d.column.Nullable && !d.withoutNotNull {
@ -98,23 +100,47 @@ func (d *Duplicator) Duplicate(ctx context.Context) error {
}
}
// Generate SQL to duplicate any check constraints on the column
_, err := d.conn.ExecContext(ctx, sql)
if err != nil {
return err
}
// Generate SQL to duplicate any default value on the column. This may fail
// if the default value is not valid for the new column type, in which case
// the error is ignored.
if d.column.Default != nil {
sql := fmt.Sprintf(cSetDefaultSQL, pq.QuoteIdentifier(d.table.Name), d.asName, *d.column.Default)
_, err := d.conn.ExecContext(ctx, sql)
err = errorIgnoringErrorCode(err, dataTypeMismatchErrorCode)
if err != nil {
return err
}
}
// Generate SQL to duplicate any check constraints on the column. This may faile
// if the check constraint is not valid for the new column type, in which case
// the error is ignored.
for _, cc := range d.table.CheckConstraints {
if cc.Name == d.withoutConstraint {
continue
}
if slices.Contains(cc.Columns, d.column.Name) {
sql += fmt.Sprintf(", "+cAddCheckConstraintSQL,
sql := fmt.Sprintf(cAlterTableAddCheckConstraintSQL,
pq.QuoteIdentifier(d.table.Name),
pq.QuoteIdentifier(DuplicationName(cc.Name)),
rewriteCheckExpression(cc.Definition, d.column.Name, d.asName),
)
}
}
_, err := d.conn.ExecContext(ctx, sql)
if err != nil {
return err
_, err := d.conn.ExecContext(ctx, sql)
err = errorIgnoringErrorCode(err, undefinedFunctionErrorCode)
if err != nil {
return err
}
}
}
// Generate SQL to duplicate the column's comment
@ -178,3 +204,14 @@ func copyAndReplace(xs []string, oldValue, newValue string) []string {
}
return ys
}
func errorIgnoringErrorCode(err error, code pq.ErrorCode) error {
pqErr := &pq.Error{}
if ok := errors.As(err, &pqErr); ok {
if pqErr.Code == code {
return nil
}
}
return err
}

View File

@ -225,7 +225,72 @@ func TestChangeColumnType(t *testing.T) {
},
},
{
name: "changing column type preserves any defaults on the column",
name: "changing column type removes any incompatible defaults on the column",
migrations: []migrations.Migration{
{
Name: "01_add_table",
Operations: migrations.Operations{
&migrations.OpCreateTable{
Name: "users",
Columns: []migrations.Column{
{
Name: "id",
Type: "integer",
Pk: ptr(true),
},
{
Name: "age",
Type: "text",
Default: ptr("'0'"),
Nullable: ptr(true),
},
},
},
},
},
{
Name: "02_change_type",
Operations: migrations.Operations{
&migrations.OpAlterColumn{
Table: "users",
Column: "age",
Type: ptr("integer"),
Up: "CAST(age AS integer)",
Down: "CAST(age AS text)",
},
},
},
},
afterStart: func(t *testing.T, db *sql.DB, schema string) {
// A row can be inserted into the new version of the table.
MustInsert(t, db, schema, "02_change_type", "users", map[string]string{
"id": "1",
})
// The newly inserted row contains a NULL instead of the old default value.
rows := MustSelect(t, db, schema, "02_change_type", "users")
assert.Equal(t, []map[string]any{
{"id": 1, "age": nil},
}, rows)
},
afterRollback: func(t *testing.T, db *sql.DB, schema string) {
},
afterComplete: func(t *testing.T, db *sql.DB, schema string) {
// A row can be inserted into the new version of the table.
MustInsert(t, db, schema, "02_change_type", "users", map[string]string{
"id": "2",
})
// The newly inserted row contains a NULL instead of the old default value.
rows := MustSelect(t, db, schema, "02_change_type", "users")
assert.Equal(t, []map[string]any{
{"id": 1, "age": nil},
{"id": 2, "age": nil},
}, rows)
},
},
{
name: "changing column type preserves any compatible defaults on the column",
migrations: []migrations.Migration{
{
Name: "01_add_table",
@ -290,7 +355,66 @@ func TestChangeColumnType(t *testing.T) {
},
},
{
name: "changing column type preserves any check constraints on the column",
name: "changing column type removes any incompatile check constraints on the column",
migrations: []migrations.Migration{
{
Name: "01_add_table",
Operations: migrations.Operations{
&migrations.OpCreateTable{
Name: "users",
Columns: []migrations.Column{
{
Name: "id",
Type: "integer",
Pk: ptr(true),
},
{
Name: "age",
Type: "text",
Nullable: ptr(true),
Check: &migrations.CheckConstraint{
Name: "age_length",
Constraint: "length(age) < 3",
},
},
},
},
},
},
{
Name: "02_change_type",
Operations: migrations.Operations{
&migrations.OpAlterColumn{
Table: "users",
Column: "age",
Type: ptr("integer"),
Up: "CAST(age AS integer)",
Down: "(SELECT CASE WHEN age < 100 THEN age::text ELSE '0' END)",
},
},
},
},
afterStart: func(t *testing.T, db *sql.DB, schema string) {
// Inserting a row into the new schema that violates the check
// constraint on the old schema should succeed.
MustInsert(t, db, schema, "02_change_type", "users", map[string]string{
"id": "1",
"age": "1111",
})
},
afterRollback: func(t *testing.T, db *sql.DB, schema string) {
},
afterComplete: func(t *testing.T, db *sql.DB, schema string) {
// Inserting a row into the new schema that violates the check
// constraint on the old schema should succeed.
MustInsert(t, db, schema, "02_change_type", "users", map[string]string{
"id": "2",
"age": "2222",
})
},
},
{
name: "changing column type preserves any compatible check constraints on the column",
migrations: []migrations.Migration{
{
Name: "01_add_table",