From 4f4d54955b00cd99bd8c33e21b7fd40738cf1ea7 Mon Sep 17 00:00:00 2001 From: Andrew Farries Date: Wed, 8 Nov 2023 09:23:57 +0000 Subject: [PATCH] Add a 'set replica identity' operation (#201) Add support for a new **set replica identity** operation type. The new operation looks like this: ```json { "name": "29_set_replica_identity", "operations": [ { "set_replica_identity": { "table": "fruits", "identity": { "type": "full" } } } ] } ``` This sets the replica identity for the `fruits` table to `FULL`. The other supported operation types are 'nothing', 'default', and 'index'. If the replica identity is being set to an index, the operation looks like: ```json { "name": "29_set_replica_identity", "operations": [ { "set_replica_identity": { "table": "fruits", "identity": { "type": "index", "index": "some_index_name" } } } ] } ``` The replica identity is set directly on the underlying table on operation start. This means that both versions of the table exposed in the new and old versioned views will have the new replica identity set. The docs have been updated with the details of the new operation [here](https://github.com/xataio/pgroll/blob/set-replica-identity/docs/README.md#set-replica-identity). --- docs/README.md | 26 +++ examples/29_set_replica_identity.json | 14 ++ pkg/migrations/errors.go | 9 + pkg/migrations/op_common.go | 27 ++- pkg/migrations/op_common_test.go | 21 ++ pkg/migrations/op_set_replica_identity.go | 70 ++++++ .../op_set_replica_identity_test.go | 217 ++++++++++++++++++ 7 files changed, 374 insertions(+), 10 deletions(-) create mode 100644 examples/29_set_replica_identity.json create mode 100644 pkg/migrations/op_set_replica_identity.go create mode 100644 pkg/migrations/op_set_replica_identity_test.go diff --git a/docs/README.md b/docs/README.md index 74688d0..05f330f 100644 --- a/docs/README.md +++ b/docs/README.md @@ -29,6 +29,7 @@ * [Drop table](#drop-table) * [Raw SQL](#raw-sql) * [Rename table](#rename-table) + * [Set replica identity](#set-replica-identity) ## Concepts @@ -674,6 +675,7 @@ See the [examples](../examples) directory for examples of each kind of operation * [Drop table](#drop-table) * [Raw SQL](#raw-sql) * [Rename table](#rename-table) +* [Set replica identity](#set-replica-identity) ### Add column @@ -1048,3 +1050,27 @@ A rename table operation renames a table. Example **rename table** migrations: * [04_rename_table.json](../examples/04_rename_table.json) + +### Set replica identity + +A set replica identity operation sets the replica identity for a table. + +**set replica identity** operations have this structure: + +```json +{ + "set_replica_identity": { + "table": "name of the table", + "identity": { + "type": "full | default | nothing | index" + "index": "name of the index, if type is 'index'" + } + } +} +``` + +:warning: A **set replica identity** operation is applied directly to the underlying table on migration start. This means that both versions of the table exposed in the old and new version schemas will have the new replica identity set. :warning: + +Example **set replica identity** migrations: + +* [29_set_replica_identity.json](../examples/29_set_replica_identity.json) diff --git a/examples/29_set_replica_identity.json b/examples/29_set_replica_identity.json new file mode 100644 index 0000000..16c5843 --- /dev/null +++ b/examples/29_set_replica_identity.json @@ -0,0 +1,14 @@ +{ + "name": "29_set_replica_identity", + "operations": [ + { + "set_replica_identity": { + "table": "fruits", + "identity": { + "type": "index", + "index": "_pgroll_new_fruits_pkey" + } + } + } + ] +} diff --git a/pkg/migrations/errors.go b/pkg/migrations/errors.go index 1eb544d..1b3a5ea 100644 --- a/pkg/migrations/errors.go +++ b/pkg/migrations/errors.go @@ -147,3 +147,12 @@ type InvalidPrimaryKeyError struct { func (e InvalidPrimaryKeyError) Error() string { return fmt.Sprintf("primary key on table %q must be defined on exactly one column, found %d", e.Table, e.Fields) } + +type InvalidReplicaIdentityError struct { + Table string + Identity string +} + +func (e InvalidReplicaIdentityError) Error() string { + return fmt.Sprintf("replica identity on table %q must be one of 'NOTHING', 'DEFAULT', 'INDEX' or 'FULL', found %q", e.Table, e.Identity) +} diff --git a/pkg/migrations/op_common.go b/pkg/migrations/op_common.go index c8b5803..acf6d3d 100644 --- a/pkg/migrations/op_common.go +++ b/pkg/migrations/op_common.go @@ -12,16 +12,17 @@ import ( type OpName string const ( - OpNameCreateTable OpName = "create_table" - OpNameRenameTable OpName = "rename_table" - OpNameDropTable OpName = "drop_table" - OpNameAddColumn OpName = "add_column" - OpNameDropColumn OpName = "drop_column" - OpNameAlterColumn OpName = "alter_column" - OpNameCreateIndex OpName = "create_index" - OpNameDropIndex OpName = "drop_index" - OpNameDropConstraint OpName = "drop_constraint" - OpRawSQLName OpName = "sql" + OpNameCreateTable OpName = "create_table" + OpNameRenameTable OpName = "rename_table" + OpNameDropTable OpName = "drop_table" + OpNameAddColumn OpName = "add_column" + OpNameDropColumn OpName = "drop_column" + OpNameAlterColumn OpName = "alter_column" + OpNameCreateIndex OpName = "create_index" + OpNameDropIndex OpName = "drop_index" + OpNameDropConstraint OpName = "drop_constraint" + OpNameSetReplicaIdentity OpName = "set_replica_identity" + OpRawSQLName OpName = "sql" // Internal operation types used by `alter_column` OpNameRenameColumn OpName = "rename_column" @@ -95,6 +96,9 @@ func (v *Operations) UnmarshalJSON(data []byte) error { case OpNameDropConstraint: item = &OpDropConstraint{} + case OpNameSetReplicaIdentity: + item = &OpSetReplicaIdentity{} + case OpNameAlterColumn: item = &OpAlterColumn{} @@ -172,6 +176,9 @@ func OperationName(op Operation) OpName { case *OpDropConstraint: return OpNameDropConstraint + case *OpSetReplicaIdentity: + return OpNameSetReplicaIdentity + case *OpAlterColumn: return OpNameAlterColumn diff --git a/pkg/migrations/op_common_test.go b/pkg/migrations/op_common_test.go index 24ba61c..e0c17a8 100644 --- a/pkg/migrations/op_common_test.go +++ b/pkg/migrations/op_common_test.go @@ -282,6 +282,27 @@ func IndexMustNotExist(t *testing.T, db *sql.DB, schema, table, index string) { } } +func ReplicaIdentityMustBe(t *testing.T, db *sql.DB, schema, table, replicaIdentity string) { + t.Helper() + + var actualReplicaIdentity string + err := db.QueryRow(` + SELECT c.relreplident + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE c.relkind = 'r' -- regular table + AND n.nspname = $1 + AND c.relname = $2; + `, schema, table).Scan(&actualReplicaIdentity) + if err != nil { + t.Fatal(err) + } + + if replicaIdentity != actualReplicaIdentity { + t.Fatalf("Expected replica identity to be %q, got %q", replicaIdentity, actualReplicaIdentity) + } +} + func indexExists(t *testing.T, db *sql.DB, schema, table, index string) bool { t.Helper() diff --git a/pkg/migrations/op_set_replica_identity.go b/pkg/migrations/op_set_replica_identity.go new file mode 100644 index 0000000..82794a1 --- /dev/null +++ b/pkg/migrations/op_set_replica_identity.go @@ -0,0 +1,70 @@ +// SPDX-License-Identifier: Apache-2.0 + +package migrations + +import ( + "context" + "database/sql" + "fmt" + "slices" + "strings" + + "github.com/lib/pq" + "github.com/xataio/pgroll/pkg/schema" +) + +type OpSetReplicaIdentity struct { + Table string `json:"table"` + Identity ReplicaIdentity `json:"identity"` +} + +type ReplicaIdentity struct { + Type string `json:"type"` + Index string `json:"index"` +} + +var _ Operation = (*OpSetReplicaIdentity)(nil) + +func (o *OpSetReplicaIdentity) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error { + // build the correct form of the `SET REPLICA IDENTITY` statement based on the`identity type + identitySQL := strings.ToUpper(o.Identity.Type) + if identitySQL == "INDEX" { + identitySQL = fmt.Sprintf("USING INDEX %s", pq.QuoteIdentifier(o.Identity.Index)) + } + + // set the replica identity on the underlying table + _, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY %s", o.Table, identitySQL)) + return err +} + +func (o *OpSetReplicaIdentity) Complete(ctx context.Context, conn *sql.DB) error { + // No-op + return nil +} + +func (o *OpSetReplicaIdentity) Rollback(ctx context.Context, conn *sql.DB) error { + // No-op + return nil +} + +func (o *OpSetReplicaIdentity) Validate(ctx context.Context, s *schema.Schema) error { + identityType := strings.ToUpper(o.Identity.Type) + + table := s.GetTable(o.Table) + if table == nil { + return TableDoesNotExistError{Name: o.Table} + } + + identities := []string{"NOTHING", "DEFAULT", "INDEX", "FULL"} + if !slices.Contains(identities, identityType) { + return InvalidReplicaIdentityError{Table: o.Table, Identity: o.Identity.Type} + } + + if identityType == "INDEX" { + if _, ok := table.Indexes[o.Identity.Index]; !ok { + return IndexDoesNotExistError{Name: o.Identity.Index} + } + } + + return nil +} diff --git a/pkg/migrations/op_set_replica_identity_test.go b/pkg/migrations/op_set_replica_identity_test.go new file mode 100644 index 0000000..a9124b0 --- /dev/null +++ b/pkg/migrations/op_set_replica_identity_test.go @@ -0,0 +1,217 @@ +// SPDX-License-Identifier: Apache-2.0 + +package migrations_test + +import ( + "database/sql" + "testing" + + "github.com/xataio/pgroll/pkg/migrations" +) + +func TestSetReplicaIdentity(t *testing.T) { + t.Parallel() + + createTableMigration := migrations.Migration{ + Name: "01_add_table", + Operations: migrations.Operations{ + &migrations.OpCreateTable{ + Name: "users", + Columns: []migrations.Column{ + { + Name: "id", + Type: "serial", + PrimaryKey: true, + }, + { + Name: "name", + Type: "varchar(255)", + Nullable: false, + }, + }, + }, + }, + } + + ExecuteTests(t, TestCases{ + { + name: "set replica identity to FULL", + migrations: []migrations.Migration{ + createTableMigration, + { + Name: "02_set_replica_identity", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "users", + Identity: migrations.ReplicaIdentity{Type: "full"}, + }, + }, + }, + }, + afterStart: func(t *testing.T, db *sql.DB) { + // The replica identity has been set to 'f' (full). + ReplicaIdentityMustBe(t, db, "public", "users", "f") + }, + afterRollback: func(t *testing.T, db *sql.DB) { + // Rollback is a no-op + }, + afterComplete: func(t *testing.T, db *sql.DB) { + // Complete is a no-op + }, + }, + { + name: "set replica identity to NOTHING", + migrations: []migrations.Migration{ + createTableMigration, + { + Name: "02_set_replica_identity", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "users", + Identity: migrations.ReplicaIdentity{Type: "nothing"}, + }, + }, + }, + }, + afterStart: func(t *testing.T, db *sql.DB) { + // The replica identity has been set to 'n' (nothing). + ReplicaIdentityMustBe(t, db, "public", "users", "n") + }, + afterRollback: func(t *testing.T, db *sql.DB) { + // Rollback is a no-op + }, + afterComplete: func(t *testing.T, db *sql.DB) { + // Complete is a no-op + }, + }, + { + name: "set replica identity to DEFAULT", + migrations: []migrations.Migration{ + createTableMigration, + { + Name: "02_set_replica_identity", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "users", + Identity: migrations.ReplicaIdentity{Type: "default"}, + }, + }, + }, + }, + afterStart: func(t *testing.T, db *sql.DB) { + // The replica identity has been set to 'd' (default). + ReplicaIdentityMustBe(t, db, "public", "users", "d") + }, + afterRollback: func(t *testing.T, db *sql.DB) { + // Rollback is a no-op + }, + afterComplete: func(t *testing.T, db *sql.DB) { + // Complete is a no-op + }, + }, + { + name: "set replica identity to USING INDEX", + migrations: []migrations.Migration{ + createTableMigration, + { + Name: "02_set_replica_identity", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "users", + Identity: migrations.ReplicaIdentity{ + Type: "index", + Index: "_pgroll_new_users_pkey", + }, + }, + }, + }, + }, + afterStart: func(t *testing.T, db *sql.DB) { + // The replica identity has been set to 'i' (index). + ReplicaIdentityMustBe(t, db, "public", "users", "i") + }, + afterRollback: func(t *testing.T, db *sql.DB) { + // Rollback is a no-op + }, + afterComplete: func(t *testing.T, db *sql.DB) { + // Complete is a no-op + }, + }, + }) +} + +func TestSetReplicaIdentityValidation(t *testing.T) { + t.Parallel() + + addTableMigration := migrations.Migration{ + Name: "01_add_table", + Operations: migrations.Operations{ + &migrations.OpCreateTable{ + Name: "users", + Columns: []migrations.Column{ + { + Name: "id", + Type: "serial", + PrimaryKey: true, + }, + { + Name: "name", + Type: "varchar(255)", + Unique: true, + }, + }, + }, + }, + } + + ExecuteTests(t, TestCases{ + { + name: "table must exist", + migrations: []migrations.Migration{ + addTableMigration, + { + Name: "02_add_column", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "doesntexist", + Identity: migrations.ReplicaIdentity{Type: "default"}, + }, + }, + }, + }, + wantStartErr: migrations.TableDoesNotExistError{Name: "doesntexist"}, + }, + { + name: "identity must be valid", + migrations: []migrations.Migration{ + addTableMigration, + { + Name: "02_add_column", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "users", + Identity: migrations.ReplicaIdentity{Type: "invalid_identity"}, + }, + }, + }, + }, + wantStartErr: migrations.InvalidReplicaIdentityError{Table: "users", Identity: "invalid_identity"}, + }, + { + name: "index name must be valid", + migrations: []migrations.Migration{ + addTableMigration, + { + Name: "02_add_column", + Operations: migrations.Operations{ + &migrations.OpSetReplicaIdentity{ + Table: "users", + Identity: migrations.ReplicaIdentity{Type: "index", Index: "invalid_index"}, + }, + }, + }, + }, + wantStartErr: migrations.IndexDoesNotExistError{Name: "invalid_index"}, + }, + }) +}