Add SQL onComplete flag, allow sql migration to run with others (#280)

This change updates the `sql` operation to:

* Allow for a new `onComplete` flag, that will make it run on the
Complete phase, rather than doing it during Start (default behavior).
* Allows for `sql` operations next to others in the same migration. We
added this limitation to ensure this operation doesn't affect others,
especially around schema state management.

Having `sql` next to other operations has proven convenient sometimes,
by adding `onComplete` flag, we can allow for these migrations to run
and rely on views recreation based on the final state.

---------

Co-authored-by: Andrew Farries <andyrb@gmail.com>
This commit is contained in:
Carlos Pérez-Aradros Herce 2024-02-12 16:37:15 +01:00 committed by GitHub
parent 889946b26d
commit 58fa7ae970
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 244 additions and 20 deletions

View File

@ -1068,9 +1068,29 @@ A raw SQL operation runs arbitrary SQL against the database. This is intended as
}
```
By default, a `sql` operation cannot run together with other operations in the same migration. This is to ensure pgroll can correctly track the state of the database. However, it is possible to run a `sql` operation together with other operations by setting the `onComplete` flag to `true`.
The `onComplete` flag will make this operation run the `up` expression on the complete phase (instead of the default, which is to run it on the start phase).
`onComplete` flag is incompatible with `down` expression, as `pgroll` does not support running rollback after complete was executed.
```json
{
"sql": {
"up": "SQL expression",
"onComplete": true
}
}
```
Example **raw SQL** migrations:
* [05_sql.json](../examples/05_sql.json)
* [32_sql_on_complete.json](../examples/32_sql_on_complete.json)
### Rename table

View File

@ -0,0 +1,11 @@
{
"name": "32_sql_on_complete",
"operations": [
{
"sql": {
"up": "ALTER TABLE people ADD COLUMN birth_date timestamp",
"onComplete": true
}
}
]
}

18
pkg/jsonschema/testdata/sql-3.txtar vendored Normal file
View File

@ -0,0 +1,18 @@
This is a valid 'sql' migration.
It specifies `up`, and `on_complete`
-- create_table.json --
{
"name": "migration_name",
"operations": [
{
"sql": {
"up": "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)",
"onComplete": true
}
}
]
}
-- valid --
true

19
pkg/jsonschema/testdata/sql-4.txtar vendored Normal file
View File

@ -0,0 +1,19 @@
This is an invalid 'sql' migration.
It specifies `up`, `down` and `on_complete`
-- create_table.json --
{
"name": "migration_name",
"operations": [
{
"sql": {
"up": "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)",
"down": "DROP TABLE users",
"onComplete": true
}
}
]
}
-- valid --
false

View File

@ -35,11 +35,13 @@ type Operation interface {
// IsolatedOperation is an operation that cannot be executed with other operations
// in the same migration
type IsolatedOperation interface {
IsIsolated()
// this operation is isolated when executed on start, cannot be executed with other operations
IsIsolated() bool
}
// RequiresSchemaRefreshOperation is an operation that requires the resulting schema to be refreshed
type RequiresSchemaRefreshOperation interface {
// this operation requires the resulting schema to be refreshed when executed on start
RequiresSchemaRefresh()
}
@ -56,8 +58,8 @@ type (
// returns a descriptive error if the migration is invalid
func (m *Migration) Validate(ctx context.Context, s *schema.Schema) error {
for _, op := range m.Operations {
if _, ok := op.(IsolatedOperation); ok {
if len(m.Operations) > 1 {
if isolatedOp, ok := op.(IsolatedOperation); ok {
if isolatedOp.IsIsolated() && len(m.Operations) > 1 {
return InvalidMigrationError{Reason: fmt.Sprintf("operation %q cannot be executed with other operations", OperationName(op))}
}
}

View File

@ -17,7 +17,7 @@ func TestMigrationsIsolated(t *testing.T) {
&OpRawSQL{
Up: `foo`,
},
&OpRenameColumn{},
&OpCreateTable{Name: "foo"},
},
}
@ -38,3 +38,18 @@ func TestMigrationsIsolatedValid(t *testing.T) {
err := migration.Validate(context.TODO(), schema.New())
assert.NoError(t, err)
}
func TestOnCompleteSQLMigrationsAreNotIsolated(t *testing.T) {
migration := Migration{
Name: "sql",
Operations: Operations{
&OpRawSQL{
Up: `foo`,
OnComplete: true,
},
&OpCreateTable{Name: "foo"},
},
}
err := migration.Validate(context.TODO(), schema.New())
assert.NoError(t, err)
}

View File

@ -12,14 +12,18 @@ import (
var _ Operation = (*OpRawSQL)(nil)
func (o *OpRawSQL) Start(ctx context.Context, conn *sql.DB, stateSchema string, s *schema.Schema, cbs ...CallbackFn) error {
_, err := conn.ExecContext(ctx, o.Up)
if err != nil {
if !o.OnComplete {
_, err := conn.ExecContext(ctx, o.Up)
return err
}
return nil
}
func (o *OpRawSQL) Complete(ctx context.Context, conn *sql.DB, s *schema.Schema) error {
if o.OnComplete {
_, err := conn.ExecContext(ctx, o.Up)
return err
}
return nil
}
@ -36,11 +40,15 @@ func (o *OpRawSQL) Validate(ctx context.Context, s *schema.Schema) error {
return EmptyMigrationError{}
}
if o.OnComplete && o.Down != "" {
return InvalidMigrationError{Reason: "down is not allowed with onComplete"}
}
return nil
}
// this operation is isolated, cannot be executed with other operations
func (o *OpRawSQL) IsIsolated() {}
func (o *OpRawSQL) IsIsolated() bool {
return !o.OnComplete
}
// this operation requires the resulting schema to be refreshed
func (o *OpRawSQL) RequiresSchemaRefresh() {}

View File

@ -53,6 +53,81 @@ func TestRawSQL(t *testing.T) {
})
},
},
{
name: "raw SQL with onComplete",
migrations: []migrations.Migration{
{
Name: "01_create_table",
Operations: migrations.Operations{
&migrations.OpRawSQL{
OnComplete: true,
Up: `
CREATE TABLE test_table (
id serial,
name text
)
`,
},
},
},
},
afterStart: func(t *testing.T, db *sql.DB, schema string) {
// SQL didn't run yet
TableMustNotExist(t, db, schema, "test_table")
},
afterComplete: func(t *testing.T, db *sql.DB, schema string) {
// table can be accessed after start
TableMustExist(t, db, schema, "test_table")
// inserts work
MustInsert(t, db, schema, "01_create_table", "test_table", map[string]string{
"name": "foo",
})
},
},
{
name: "raw SQL after a migration with onComplete",
migrations: []migrations.Migration{
{
Name: "01_create_table",
Operations: migrations.Operations{
&migrations.OpCreateTable{
Name: "test_table",
Columns: []migrations.Column{
{Name: "id", Type: "serial"},
{Name: "name", Type: "text"},
},
},
&migrations.OpRawSQL{
OnComplete: true,
Up: `
ALTER TABLE test_table ADD COLUMN age int
`,
},
},
},
},
afterStart: func(t *testing.T, db *sql.DB, schema string) {
// SQL didn't run yet
ViewMustExist(t, db, schema, "01_create_table", "test_table")
ColumnMustNotExist(t, db, schema, "test_table", "age")
},
afterRollback: func(t *testing.T, db *sql.DB, schema string) {
// table is dropped after rollback
TableMustNotExist(t, db, schema, "test_table")
},
afterComplete: func(t *testing.T, db *sql.DB, schema string) {
// table can be accessed after start
TableMustExist(t, db, schema, "test_table")
ColumnMustExist(t, db, schema, "test_table", "age")
// inserts work
MustInsert(t, db, schema, "01_create_table", "test_table", map[string]string{
"name": "foo",
"age": "42",
})
},
},
{
name: "migration on top of raw SQL",
migrations: []migrations.Migration{

View File

@ -171,6 +171,9 @@ type OpRawSQL struct {
// SQL expression for down migration
Down string `json:"down,omitempty"`
// SQL expression will run on complete step (rather than on start)
OnComplete bool `json:"onComplete,omitempty"`
// SQL expression for up migration
Up string `json:"up"`
}

View File

@ -49,12 +49,15 @@ func (m *Roll) Start(ctx context.Context, migration *migrations.Migration, cbs .
fmt.Errorf("unable to execute start operation: %w", err),
errRollback)
}
// refresh schema when the op is isolated and requires a refresh (for example raw sql)
// we don't want to refresh the schema if the operation is not isolated as it would
// override changes made by other operations
if _, ok := op.(migrations.RequiresSchemaRefreshOperation); ok {
// refresh schema
newSchema, err = m.state.ReadSchema(ctx, m.schema)
if err != nil {
return fmt.Errorf("unable to refresh schema: %w", err)
if isolatedOp, ok := op.(migrations.IsolatedOperation); ok && isolatedOp.IsIsolated() {
newSchema, err = m.state.ReadSchema(ctx, m.schema)
if err != nil {
return fmt.Errorf("unable to refresh schema: %w", err)
}
}
}
}
@ -64,16 +67,21 @@ func (m *Roll) Start(ctx context.Context, migration *migrations.Migration, cbs .
return nil
}
// create views for the new version
return m.ensureViews(ctx, newSchema, migration.Name)
}
func (m *Roll) ensureViews(ctx context.Context, schema *schema.Schema, version string) error {
// create schema for the new version
versionSchema := VersionedSchemaName(m.schema, migration.Name)
_, err = m.pgConn.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", pq.QuoteIdentifier(versionSchema)))
versionSchema := VersionedSchemaName(m.schema, version)
_, err := m.pgConn.ExecContext(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", pq.QuoteIdentifier(versionSchema)))
if err != nil {
return err
}
// create views in the new schema
for name, table := range newSchema.Tables {
err = m.createView(ctx, migration.Name, name, table)
for name, table := range schema.Tables {
err = m.ensureView(ctx, version, name, table)
if err != nil {
return fmt.Errorf("unable to create view: %w", err)
}
@ -112,11 +120,29 @@ func (m *Roll) Complete(ctx context.Context) error {
}
// execute operations
refreshViews := false
for _, op := range migration.Operations {
err := op.Complete(ctx, m.pgConn, schema)
if err != nil {
return fmt.Errorf("unable to execute complete operation: %w", err)
}
if _, ok := op.(migrations.RequiresSchemaRefreshOperation); ok {
refreshViews = true
}
}
// recreate views for the new version (if some operations require it, ie SQL)
if refreshViews && !m.disableVersionSchemas {
schema, err = m.state.ReadSchema(ctx, m.schema)
if err != nil {
return fmt.Errorf("unable to read schema: %w", err)
}
err = m.ensureViews(ctx, schema, migration.Name)
if err != nil {
return err
}
}
// mark as completed
@ -162,7 +188,7 @@ func (m *Roll) Rollback(ctx context.Context) error {
}
// create view creates a view for the new version of the schema
func (m *Roll) createView(ctx context.Context, version, name string, table schema.Table) error {
func (m *Roll) ensureView(ctx context.Context, version, name string, table schema.Table) error {
columns := make([]string, 0, len(table.Columns))
for k, v := range table.Columns {
columns = append(columns, fmt.Sprintf("%s AS %s", pq.QuoteIdentifier(v.Name), pq.QuoteIdentifier(k)))
@ -179,7 +205,9 @@ func (m *Roll) createView(ctx context.Context, version, name string, table schem
}
_, err := m.pgConn.ExecContext(ctx,
fmt.Sprintf("CREATE OR REPLACE VIEW %s.%s %s AS SELECT %s FROM %s",
fmt.Sprintf("BEGIN; DROP VIEW IF EXISTS %s.%s; CREATE VIEW %s.%s %s AS SELECT %s FROM %s; COMMIT",
pq.QuoteIdentifier(VersionedSchemaName(m.schema, version)),
pq.QuoteIdentifier(name),
pq.QuoteIdentifier(VersionedSchemaName(m.schema, version)),
pq.QuoteIdentifier(name),
withOptions,

View File

@ -312,9 +312,34 @@
"up": {
"description": "SQL expression for up migration",
"type": "string"
},
"onComplete": {
"description": "SQL expression will run on complete step (rather than on start)",
"type": "boolean",
"default": false
}
},
"required": ["up"],
"oneOf": [
{
"required": ["down"]
},
{
"required": ["onComplete"]
},
{
"not": {
"anyOf": [
{
"required": ["down"]
},
{
"required": ["onComplete"]
}
]
}
}
],
"type": "object"
},
"OpRenameTable": {