Add a 'set replica identity' operation (#201)

Add support for a new **set replica identity** operation type. The new
operation looks like this:

```json
{
  "name": "29_set_replica_identity",
  "operations": [
    {
      "set_replica_identity": {
        "table": "fruits",
        "identity": {
          "type": "full"
        }
      }
    }
  ]
}
```

This sets the replica identity for the `fruits` table to `FULL`. The
other supported operation types are 'nothing', 'default', and 'index'.
If the replica identity is being set to an index, the operation looks
like:

```json
{
  "name": "29_set_replica_identity",
  "operations": [
    {
      "set_replica_identity": {
        "table": "fruits",
        "identity": {
          "type": "index",
          "index": "some_index_name"
        }
      }
    }
  ]
}
```

The replica identity is set directly on the underlying table on
operation start. This means that both versions of the table exposed in
the new and old versioned views will have the new replica identity set.

The docs have been updated with the details of the new operation
[here](https://github.com/xataio/pgroll/blob/set-replica-identity/docs/README.md#set-replica-identity).
This commit is contained in:
Andrew Farries 2023-11-08 09:23:57 +00:00 committed by GitHub
parent edc78acf0a
commit 4f4d54955b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 374 additions and 10 deletions

View File

@ -29,6 +29,7 @@
* [Drop table](#drop-table)
* [Raw SQL](#raw-sql)
* [Rename table](#rename-table)
* [Set replica identity](#set-replica-identity)
## Concepts
@ -674,6 +675,7 @@ See the [examples](../examples) directory for examples of each kind of operation
* [Drop table](#drop-table)
* [Raw SQL](#raw-sql)
* [Rename table](#rename-table)
* [Set replica identity](#set-replica-identity)
### Add column
@ -1048,3 +1050,27 @@ A rename table operation renames a table.
Example **rename table** migrations:
* [04_rename_table.json](../examples/04_rename_table.json)
### Set replica identity
A set replica identity operation sets the replica identity for a table.
**set replica identity** operations have this structure:
```json
{
"set_replica_identity": {
"table": "name of the table",
"identity": {
"type": "full | default | nothing | index"
"index": "name of the index, if type is 'index'"
}
}
}
```
:warning: A **set replica identity** operation is applied directly to the underlying table on migration start. This means that both versions of the table exposed in the old and new version schemas will have the new replica identity set. :warning:
Example **set replica identity** migrations:
* [29_set_replica_identity.json](../examples/29_set_replica_identity.json)

View File

@ -0,0 +1,14 @@
{
"name": "29_set_replica_identity",
"operations": [
{
"set_replica_identity": {
"table": "fruits",
"identity": {
"type": "index",
"index": "_pgroll_new_fruits_pkey"
}
}
}
]
}

View File

@ -147,3 +147,12 @@ type InvalidPrimaryKeyError struct {
func (e InvalidPrimaryKeyError) Error() string {
return fmt.Sprintf("primary key on table %q must be defined on exactly one column, found %d", e.Table, e.Fields)
}
type InvalidReplicaIdentityError struct {
Table string
Identity string
}
func (e InvalidReplicaIdentityError) Error() string {
return fmt.Sprintf("replica identity on table %q must be one of 'NOTHING', 'DEFAULT', 'INDEX' or 'FULL', found %q", e.Table, e.Identity)
}

View File

@ -12,16 +12,17 @@ import (
type OpName string
const (
OpNameCreateTable OpName = "create_table"
OpNameRenameTable OpName = "rename_table"
OpNameDropTable OpName = "drop_table"
OpNameAddColumn OpName = "add_column"
OpNameDropColumn OpName = "drop_column"
OpNameAlterColumn OpName = "alter_column"
OpNameCreateIndex OpName = "create_index"
OpNameDropIndex OpName = "drop_index"
OpNameDropConstraint OpName = "drop_constraint"
OpRawSQLName OpName = "sql"
OpNameCreateTable OpName = "create_table"
OpNameRenameTable OpName = "rename_table"
OpNameDropTable OpName = "drop_table"
OpNameAddColumn OpName = "add_column"
OpNameDropColumn OpName = "drop_column"
OpNameAlterColumn OpName = "alter_column"
OpNameCreateIndex OpName = "create_index"
OpNameDropIndex OpName = "drop_index"
OpNameDropConstraint OpName = "drop_constraint"
OpNameSetReplicaIdentity OpName = "set_replica_identity"
OpRawSQLName OpName = "sql"
// Internal operation types used by `alter_column`
OpNameRenameColumn OpName = "rename_column"
@ -95,6 +96,9 @@ func (v *Operations) UnmarshalJSON(data []byte) error {
case OpNameDropConstraint:
item = &OpDropConstraint{}
case OpNameSetReplicaIdentity:
item = &OpSetReplicaIdentity{}
case OpNameAlterColumn:
item = &OpAlterColumn{}
@ -172,6 +176,9 @@ func OperationName(op Operation) OpName {
case *OpDropConstraint:
return OpNameDropConstraint
case *OpSetReplicaIdentity:
return OpNameSetReplicaIdentity
case *OpAlterColumn:
return OpNameAlterColumn

View File

@ -282,6 +282,27 @@ func IndexMustNotExist(t *testing.T, db *sql.DB, schema, table, index string) {
}
}
func ReplicaIdentityMustBe(t *testing.T, db *sql.DB, schema, table, replicaIdentity string) {
t.Helper()
var actualReplicaIdentity string
err := db.QueryRow(`
SELECT c.relreplident
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r' -- regular table
AND n.nspname = $1
AND c.relname = $2;
`, schema, table).Scan(&actualReplicaIdentity)
if err != nil {
t.Fatal(err)
}
if replicaIdentity != actualReplicaIdentity {
t.Fatalf("Expected replica identity to be %q, got %q", replicaIdentity, actualReplicaIdentity)
}
}
func indexExists(t *testing.T, db *sql.DB, schema, table, index string) bool {
t.Helper()

View File

@ -0,0 +1,70 @@
// SPDX-License-Identifier: Apache-2.0
package migrations
import (
"context"
"database/sql"
"fmt"
"slices"
"strings"
"github.com/lib/pq"
"github.com/xataio/pgroll/pkg/schema"
)
type OpSetReplicaIdentity struct {
Table string `json:"table"`
Identity ReplicaIdentity `json:"identity"`
}
type ReplicaIdentity struct {
Type string `json:"type"`
Index string `json:"index"`
}
var _ Operation = (*OpSetReplicaIdentity)(nil)
func (o *OpSetReplicaIdentity) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
// build the correct form of the `SET REPLICA IDENTITY` statement based on the`identity type
identitySQL := strings.ToUpper(o.Identity.Type)
if identitySQL == "INDEX" {
identitySQL = fmt.Sprintf("USING INDEX %s", pq.QuoteIdentifier(o.Identity.Index))
}
// set the replica identity on the underlying table
_, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s REPLICA IDENTITY %s", o.Table, identitySQL))
return err
}
func (o *OpSetReplicaIdentity) Complete(ctx context.Context, conn *sql.DB) error {
// No-op
return nil
}
func (o *OpSetReplicaIdentity) Rollback(ctx context.Context, conn *sql.DB) error {
// No-op
return nil
}
func (o *OpSetReplicaIdentity) Validate(ctx context.Context, s *schema.Schema) error {
identityType := strings.ToUpper(o.Identity.Type)
table := s.GetTable(o.Table)
if table == nil {
return TableDoesNotExistError{Name: o.Table}
}
identities := []string{"NOTHING", "DEFAULT", "INDEX", "FULL"}
if !slices.Contains(identities, identityType) {
return InvalidReplicaIdentityError{Table: o.Table, Identity: o.Identity.Type}
}
if identityType == "INDEX" {
if _, ok := table.Indexes[o.Identity.Index]; !ok {
return IndexDoesNotExistError{Name: o.Identity.Index}
}
}
return nil
}

View File

@ -0,0 +1,217 @@
// SPDX-License-Identifier: Apache-2.0
package migrations_test
import (
"database/sql"
"testing"
"github.com/xataio/pgroll/pkg/migrations"
)
func TestSetReplicaIdentity(t *testing.T) {
t.Parallel()
createTableMigration := migrations.Migration{
Name: "01_add_table",
Operations: migrations.Operations{
&migrations.OpCreateTable{
Name: "users",
Columns: []migrations.Column{
{
Name: "id",
Type: "serial",
PrimaryKey: true,
},
{
Name: "name",
Type: "varchar(255)",
Nullable: false,
},
},
},
},
}
ExecuteTests(t, TestCases{
{
name: "set replica identity to FULL",
migrations: []migrations.Migration{
createTableMigration,
{
Name: "02_set_replica_identity",
Operations: migrations.Operations{
&migrations.OpSetReplicaIdentity{
Table: "users",
Identity: migrations.ReplicaIdentity{Type: "full"},
},
},
},
},
afterStart: func(t *testing.T, db *sql.DB) {
// The replica identity has been set to 'f' (full).
ReplicaIdentityMustBe(t, db, "public", "users", "f")
},
afterRollback: func(t *testing.T, db *sql.DB) {
// Rollback is a no-op
},
afterComplete: func(t *testing.T, db *sql.DB) {
// Complete is a no-op
},
},
{
name: "set replica identity to NOTHING",
migrations: []migrations.Migration{
createTableMigration,
{
Name: "02_set_replica_identity",
Operations: migrations.Operations{
&migrations.OpSetReplicaIdentity{
Table: "users",
Identity: migrations.ReplicaIdentity{Type: "nothing"},
},
},
},
},
afterStart: func(t *testing.T, db *sql.DB) {
// The replica identity has been set to 'n' (nothing).
ReplicaIdentityMustBe(t, db, "public", "users", "n")
},
afterRollback: func(t *testing.T, db *sql.DB) {
// Rollback is a no-op
},
afterComplete: func(t *testing.T, db *sql.DB) {
// Complete is a no-op
},
},
{
name: "set replica identity to DEFAULT",
migrations: []migrations.Migration{
createTableMigration,
{
Name: "02_set_replica_identity",
Operations: migrations.Operations{
&migrations.OpSetReplicaIdentity{
Table: "users",
Identity: migrations.ReplicaIdentity{Type: "default"},
},
},
},
},
afterStart: func(t *testing.T, db *sql.DB) {
// The replica identity has been set to 'd' (default).
ReplicaIdentityMustBe(t, db, "public", "users", "d")
},
afterRollback: func(t *testing.T, db *sql.DB) {
// Rollback is a no-op
},
afterComplete: func(t *testing.T, db *sql.DB) {
// Complete is a no-op
},
},
{
name: "set replica identity to USING INDEX",
migrations: []migrations.Migration{
createTableMigration,
{
Name: "02_set_replica_identity",
Operations: migrations.Operations{
&migrations.OpSetReplicaIdentity{
Table: "users",
Identity: migrations.ReplicaIdentity{
Type: "index",
Index: "_pgroll_new_users_pkey",
},
},
},
},
},
afterStart: func(t *testing.T, db *sql.DB) {
// The replica identity has been set to 'i' (index).
ReplicaIdentityMustBe(t, db, "public", "users", "i")
},
afterRollback: func(t *testing.T, db *sql.DB) {
// Rollback is a no-op
},
afterComplete: func(t *testing.T, db *sql.DB) {
// Complete is a no-op
},
},
})
}
func TestSetReplicaIdentityValidation(t *testing.T) {
t.Parallel()
addTableMigration := migrations.Migration{
Name: "01_add_table",
Operations: migrations.Operations{
&migrations.OpCreateTable{
Name: "users",
Columns: []migrations.Column{
{
Name: "id",
Type: "serial",
PrimaryKey: true,
},
{
Name: "name",
Type: "varchar(255)",
Unique: true,
},
},
},
},
}
ExecuteTests(t, TestCases{
{
name: "table must exist",
migrations: []migrations.Migration{
addTableMigration,
{
Name: "02_add_column",
Operations: migrations.Operations{
&migrations.OpSetReplicaIdentity{
Table: "doesntexist",
Identity: migrations.ReplicaIdentity{Type: "default"},
},
},
},
},
wantStartErr: migrations.TableDoesNotExistError{Name: "doesntexist"},
},
{
name: "identity must be valid",
migrations: []migrations.Migration{
addTableMigration,
{
Name: "02_add_column",
Operations: migrations.Operations{
&migrations.OpSetReplicaIdentity{
Table: "users",
Identity: migrations.ReplicaIdentity{Type: "invalid_identity"},
},
},
},
},
wantStartErr: migrations.InvalidReplicaIdentityError{Table: "users", Identity: "invalid_identity"},
},
{
name: "index name must be valid",
migrations: []migrations.Migration{
addTableMigration,
{
Name: "02_add_column",
Operations: migrations.Operations{
&migrations.OpSetReplicaIdentity{
Table: "users",
Identity: migrations.ReplicaIdentity{Type: "index", Index: "invalid_index"},
},
},
},
},
wantStartErr: migrations.IndexDoesNotExistError{Name: "invalid_index"},
},
})
}