diff --git a/docs/README.md b/docs/README.md index 74688d0..05f330f 100644 --- a/docs/README.md +++ b/docs/README.md @@ -29,6 +29,7 @@ * [Drop table](#drop-table) * [Raw SQL](#raw-sql) * [Rename table](#rename-table) + * [Set replica identity](#set-replica-identity) ## Concepts @@ -674,6 +675,7 @@ See the [examples](../examples) directory for examples of each kind of operation * [Drop table](#drop-table) * [Raw SQL](#raw-sql) * [Rename table](#rename-table) +* [Set replica identity](#set-replica-identity) ### Add column @@ -1048,3 +1050,27 @@ A rename table operation renames a table. Example **rename table** migrations: * [04_rename_table.json](../examples/04_rename_table.json) + +### Set replica identity + +A set replica identity operation sets the replica identity for a table. + +**set replica identity** operations have this structure: + +```json +{ + "set_replica_identity": { + "table": "name of the table", + "identity": { + "type": "full | default | nothing | index" + "index": "name of the index, if type is 'index'" + } + } +} +``` + +:warning: A **set replica identity** operation is applied directly to the underlying table on migration start. This means that both versions of the table exposed in the old and new version schemas will have the new replica identity set. :warning: + +Example **set replica identity** migrations: + +* [29_set_replica_identity.json](../examples/29_set_replica_identity.json) diff --git a/examples/29_set_replica_identity.json b/examples/29_set_replica_identity.json new file mode 100644 index 0000000..16c5843 --- /dev/null +++ b/examples/29_set_replica_identity.json @@ -0,0 +1,14 @@ +{ + "name": "29_set_replica_identity", + "operations": [ + { + "set_replica_identity": { + "table": "fruits", + "identity": { + "type": "index", + "index": "_pgroll_new_fruits_pkey" + } + } + } + ] +} diff --git a/pkg/migrations/errors.go b/pkg/migrations/errors.go index 1eb544d..1b3a5ea 100644 --- a/pkg/migrations/errors.go +++ b/pkg/migrations/errors.go @@ -147,3 +147,12 @@ type InvalidPrimaryKeyError struct { func (e InvalidPrimaryKeyError) Error() string { return fmt.Sprintf("primary key on table %q must be defined on exactly one column, found %d", e.Table, e.Fields) } + +type InvalidReplicaIdentityError struct { + Table string + Identity string +} + +func (e InvalidReplicaIdentityError) Error() string { + return fmt.Sprintf("replica identity on table %q must be one of 'NOTHING', 'DEFAULT', 'INDEX' or 'FULL', found %q", e.Table, e.Identity) +} diff --git a/pkg/migrations/op_common.go b/pkg/migrations/op_common.go index c8b5803..acf6d3d 100644 --- a/pkg/migrations/op_common.go +++ b/pkg/migrations/op_common.go @@ -12,16 +12,17 @@ import ( type OpName string const ( - OpNameCreateTable OpName = "create_table" - OpNameRenameTable OpName = "rename_table" - OpNameDropTable OpName = "drop_table" - OpNameAddColumn OpName = "add_column" - OpNameDropColumn OpName = "drop_column" - OpNameAlterColumn OpName = "alter_column" - OpNameCreateIndex OpName = "create_index" - OpNameDropIndex OpName = "drop_index" - OpNameDropConstraint OpName = "drop_constraint" - OpRawSQLName OpName = "sql" + OpNameCreateTable OpName = "create_table" + OpNameRenameTable OpName = "rename_table" + OpNameDropTable OpName = "drop_table" + OpNameAddColumn OpName = "add_column" + OpNameDropColumn OpName = "drop_column" + OpNameAlterColumn OpName = "alter_column" + OpNameCreateIndex OpName = "create_index" + OpNameDropIndex OpName = "drop_index" + OpNameDropConstraint OpName = "drop_constraint" + OpNameSetReplicaIdentity OpName = "set_replica_identity" + OpRawSQLName OpName = "sql" // Internal operation types used by `alter_column` OpNameRenameColumn OpName = "rename_column" @@ -95,6 +96,9 @@ func (v *Operations) UnmarshalJSON(data []byte) error { case OpNameDropConstraint: item = &OpDropConstraint{} + case OpNameSetReplicaIdentity: + item = &OpSetReplicaIdentity{} + case OpNameAlterColumn: item = &OpAlterColumn{} @@ -172,6 +176,9 @@ func OperationName(op Operation) OpName { case *OpDropConstraint: return OpNameDropConstraint + case *OpSetReplicaIdentity: + return OpNameSetReplicaIdentity + case *OpAlterColumn: return OpNameAlterColumn diff --git a/pkg/migrations/op_common_test.go b/pkg/migrations/op_common_test.go index 24ba61c..e0c17a8 100644 --- a/pkg/migrations/op_common_test.go +++ b/pkg/migrations/op_common_test.go @@ -282,6 +282,27 @@ func IndexMustNotExist(t *testing.T, db *sql.DB, schema, table, index string) { } } +func ReplicaIdentityMustBe(t *testing.T, db *sql.DB, schema, table, replicaIdentity string) { + t.Helper() + + var actualReplicaIdentity string + err := db.QueryRow(` + SELECT c.relreplident + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE c.relkind = 'r' -- regular table + AND n.nspname = $1 + AND c.relname = $2; + `, schema, table).Scan(&actualReplicaIdentity) + if err != nil { + t.Fatal(err) + } + + if replicaIdentity != actualReplicaIdentity { + t.Fatalf("Expected replica identity to be %q, got %q", replicaIdentity, actualReplicaIdentity) + } +} + func indexExists(t *testing.T, db *sql.DB, schema, table, index string) bool { t.Helper() diff --git a/pkg/migrations/op_set_replica_identity.go b/pkg/migrations/op_set_replica_identity.go new file mode 100644 index 0000000..82794a1 --- /dev/null +++ b/pkg/migrations/op_set_replica_identity.go @@ -0,0 +1,70 @@ +// SPDX-License-Identifier: Apache-2.0 + +package migrations + +import ( + "context" + "database/sql" + "fmt" + "slices" + "strings" + + "github.com/lib/pq" + "github.com/xataio/pgroll/pkg/schema" +) + +type OpSetReplicaIdentity struct { + Table string `json:"table"` + Identity ReplicaIdentity `json:"identity"` +} + +type ReplicaIdentity struct { + Type string `json:"type"` + Index string `json:"index"` +} + +var _ Operation = (*OpSetReplicaIdentity)(nil) + +func (o *OpSetReplicaIdentity) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error { + // build the correct form of the `SET REPLICA IDENTITY` statement based on the`identity type + identitySQL := strings.ToUpper(o.Identity.Type) + if identitySQL == "INDEX" { + identitySQL = fmt.Sprintf("USING INDEX %s", pq.QuoteIdentifier(o.Identity.Index)) + } + + // set the replica identity on the underlying table + _, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY %s", o.Table, identitySQL)) + return err +} + +func (o *OpSetReplicaIdentity) Complete(ctx context.Context, conn *sql.DB) error { + // No-op + return nil +} + +func (o *OpSetReplicaIdentity) Rollback(ctx context.Context, conn *sql.DB) error { + // No-op + return nil +} + +func (o *OpSetReplicaIdentity) Validate(ctx context.Context, s *schema.Schema) error { + identityType := strings.ToUpper(o.Identity.Type) + + table := s.GetTable(o.Table) + if table == nil { + return TableDoesNotExistError{Name: o.Table} + } + + identities := []string{"NOTHING", "DEFAULT", "INDEX", "FULL"} + if !slices.Contains(identities, identityType) { + return InvalidReplicaIdentityError{Table: o.Table, Identity: o.Identity.Type} + } + + if identityType == "INDEX" { + if _, ok := table.Indexes[o.Identity.Index]; !ok { + return IndexDoesNotExistError{Name: o.Identity.Index} + } + } + + return nil +} diff --git a/pkg/migrations/op_set_replica_identity_test.go b/pkg/migrations/op_set_replica_identity_test.go new file mode 100644 index 0000000..a9124b0 --- /dev/null +++ b/pkg/migrations/op_set_replica_identity_test.go @@ -0,0 +1,217 @@ +// SPDX-License-Identifier: Apache-2.0 + +package migrations_test + +import ( + "database/sql" + "testing" + + "github.com/xataio/pgroll/pkg/migrations" +) + +func TestSetReplicaIdentity(t *testing.T) { + t.Parallel() + + createTableMigration := migrations.Migration{ + Name: "01_add_table", + Operations: migrations.Operations{ + &migrations.OpCreateTable{ + Name: "users", + Columns: []migrations.Column{ + { + Name: "id", + Type: "serial", + PrimaryKey: true, + }, + { + Name: "name", + Type: "varchar(255)", + Nullable: false, + }, + }, + }, + }, + } + + ExecuteTests(t, TestCases{ + { + name: "set replica identity to FULL", + migrations: []migrations.Migration{ + createTableMigration, + { + Name: "02_set_replica_identity", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "users", + Identity: migrations.ReplicaIdentity{Type: "full"}, + }, + }, + }, + }, + afterStart: func(t *testing.T, db *sql.DB) { + // The replica identity has been set to 'f' (full). + ReplicaIdentityMustBe(t, db, "public", "users", "f") + }, + afterRollback: func(t *testing.T, db *sql.DB) { + // Rollback is a no-op + }, + afterComplete: func(t *testing.T, db *sql.DB) { + // Complete is a no-op + }, + }, + { + name: "set replica identity to NOTHING", + migrations: []migrations.Migration{ + createTableMigration, + { + Name: "02_set_replica_identity", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "users", + Identity: migrations.ReplicaIdentity{Type: "nothing"}, + }, + }, + }, + }, + afterStart: func(t *testing.T, db *sql.DB) { + // The replica identity has been set to 'n' (nothing). + ReplicaIdentityMustBe(t, db, "public", "users", "n") + }, + afterRollback: func(t *testing.T, db *sql.DB) { + // Rollback is a no-op + }, + afterComplete: func(t *testing.T, db *sql.DB) { + // Complete is a no-op + }, + }, + { + name: "set replica identity to DEFAULT", + migrations: []migrations.Migration{ + createTableMigration, + { + Name: "02_set_replica_identity", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "users", + Identity: migrations.ReplicaIdentity{Type: "default"}, + }, + }, + }, + }, + afterStart: func(t *testing.T, db *sql.DB) { + // The replica identity has been set to 'd' (default). + ReplicaIdentityMustBe(t, db, "public", "users", "d") + }, + afterRollback: func(t *testing.T, db *sql.DB) { + // Rollback is a no-op + }, + afterComplete: func(t *testing.T, db *sql.DB) { + // Complete is a no-op + }, + }, + { + name: "set replica identity to USING INDEX", + migrations: []migrations.Migration{ + createTableMigration, + { + Name: "02_set_replica_identity", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "users", + Identity: migrations.ReplicaIdentity{ + Type: "index", + Index: "_pgroll_new_users_pkey", + }, + }, + }, + }, + }, + afterStart: func(t *testing.T, db *sql.DB) { + // The replica identity has been set to 'i' (index). + ReplicaIdentityMustBe(t, db, "public", "users", "i") + }, + afterRollback: func(t *testing.T, db *sql.DB) { + // Rollback is a no-op + }, + afterComplete: func(t *testing.T, db *sql.DB) { + // Complete is a no-op + }, + }, + }) +} + +func TestSetReplicaIdentityValidation(t *testing.T) { + t.Parallel() + + addTableMigration := migrations.Migration{ + Name: "01_add_table", + Operations: migrations.Operations{ + &migrations.OpCreateTable{ + Name: "users", + Columns: []migrations.Column{ + { + Name: "id", + Type: "serial", + PrimaryKey: true, + }, + { + Name: "name", + Type: "varchar(255)", + Unique: true, + }, + }, + }, + }, + } + + ExecuteTests(t, TestCases{ + { + name: "table must exist", + migrations: []migrations.Migration{ + addTableMigration, + { + Name: "02_add_column", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "doesntexist", + Identity: migrations.ReplicaIdentity{Type: "default"}, + }, + }, + }, + }, + wantStartErr: migrations.TableDoesNotExistError{Name: "doesntexist"}, + }, + { + name: "identity must be valid", + migrations: []migrations.Migration{ + addTableMigration, + { + Name: "02_add_column", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "users", + Identity: migrations.ReplicaIdentity{Type: "invalid_identity"}, + }, + }, + }, + }, + wantStartErr: migrations.InvalidReplicaIdentityError{Table: "users", Identity: "invalid_identity"}, + }, + { + name: "index name must be valid", + migrations: []migrations.Migration{ + addTableMigration, + { + Name: "02_add_column", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "users", + Identity: migrations.ReplicaIdentity{Type: "index", Index: "invalid_index"}, + }, + }, + }, + }, + wantStartErr: migrations.IndexDoesNotExistError{Name: "invalid_index"}, + }, + }) +}