mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-10-05 06:18:04 +03:00
Revert "cli: refactor migrate/migrate.go
to use internal/errors
"
Reverts hasura/graphql-engine-mono#7005 PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7228 Co-authored-by: Aravind K P <8335904+scriptonist@users.noreply.github.com> GitOrigin-RevId: d35202ee97b51db61c4598bb1a42e9551b1a89e3
This commit is contained in:
parent
8eb24229f9
commit
c35add2a9c
@ -8,7 +8,6 @@ package migrate
|
||||
import (
|
||||
"bytes"
|
||||
"container/list"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
@ -20,11 +19,12 @@ import (
|
||||
"github.com/cheggaaa/pb/v3"
|
||||
"golang.org/x/term"
|
||||
|
||||
herrors "github.com/hasura/graphql-engine/cli/v2/internal/errors"
|
||||
"github.com/hasura/graphql-engine/cli/v2/internal/hasura"
|
||||
"github.com/hasura/graphql-engine/cli/v2/util"
|
||||
|
||||
"github.com/hasura/graphql-engine/cli/v2/migrate/database"
|
||||
"github.com/hasura/graphql-engine/cli/v2/migrate/source"
|
||||
"github.com/hasura/graphql-engine/cli/v2/util"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@ -178,7 +178,6 @@ type NewMigrateOpts struct {
|
||||
// New returns a new Migrate instance from a source URL and a database URL.
|
||||
// The URL scheme is defined by each driver.
|
||||
func New(opts NewMigrateOpts) (*Migrate, error) {
|
||||
var op herrors.Op = "migrate.New"
|
||||
m := newCommon(opts.cmd)
|
||||
m.stdout = opts.Stdout
|
||||
m.stderr = opts.Stderr
|
||||
@ -186,7 +185,7 @@ func New(opts NewMigrateOpts) (*Migrate, error) {
|
||||
sourceName, err := schemeFromUrl(opts.sourceUrl)
|
||||
if err != nil {
|
||||
log.Debug(err)
|
||||
return nil, herrors.E(op, err)
|
||||
return nil, err
|
||||
}
|
||||
m.sourceName = sourceName
|
||||
m.sourceURL = opts.sourceUrl
|
||||
@ -194,7 +193,7 @@ func New(opts NewMigrateOpts) (*Migrate, error) {
|
||||
databaseName, err := schemeFromUrl(opts.databaseUrl)
|
||||
if err != nil {
|
||||
log.Debug(err)
|
||||
return nil, herrors.E(op, err)
|
||||
return nil, err
|
||||
}
|
||||
m.databaseName = databaseName
|
||||
m.databaseURL = opts.databaseUrl
|
||||
@ -207,7 +206,7 @@ func New(opts NewMigrateOpts) (*Migrate, error) {
|
||||
sourceDrv, err := source.Open(opts.sourceUrl, opts.logger)
|
||||
if err != nil {
|
||||
log.Debug(err)
|
||||
return nil, herrors.E(op, err)
|
||||
return nil, err
|
||||
}
|
||||
m.sourceDrv = sourceDrv
|
||||
if opts.configVersion >= 2 {
|
||||
@ -217,13 +216,13 @@ func New(opts NewMigrateOpts) (*Migrate, error) {
|
||||
databaseDrv, err := database.Open(opts.databaseUrl, opts.cmd, opts.logger, opts.hasuraOpts)
|
||||
if err != nil {
|
||||
log.Debug(err)
|
||||
return nil, herrors.E(op, err)
|
||||
return nil, err
|
||||
}
|
||||
m.databaseDrv = databaseDrv
|
||||
|
||||
err = m.ReScan()
|
||||
if err != nil {
|
||||
return nil, herrors.E(op, err)
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
@ -239,72 +238,61 @@ func newCommon(cmd bool) *Migrate {
|
||||
}
|
||||
|
||||
func (m *Migrate) ReScan() error {
|
||||
var op herrors.Op = "migrate.Migrate.ReScan"
|
||||
err := m.sourceDrv.Scan()
|
||||
if err != nil {
|
||||
m.Logger.Debug(err)
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = m.databaseDrv.Scan()
|
||||
if err != nil {
|
||||
m.Logger.Debug(err)
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = m.calculateStatus()
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the source and the database.
|
||||
func (m *Migrate) Close() error {
|
||||
var op herrors.Op = "migrate.Migrate.Close"
|
||||
func (m *Migrate) Close() (source error) {
|
||||
sourceSrvClose := make(chan error)
|
||||
|
||||
go func() {
|
||||
sourceSrvClose <- m.sourceDrv.Close()
|
||||
}()
|
||||
|
||||
err := <-sourceSrvClose
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return <-sourceSrvClose
|
||||
}
|
||||
|
||||
func (m *Migrate) calculateStatus() error {
|
||||
var op herrors.Op = "migrate.Migrate.calculateStatus"
|
||||
func (m *Migrate) calculateStatus() (err error) {
|
||||
m.status = NewStatus()
|
||||
err := m.readStatusFromSource()
|
||||
err = m.readStatusFromSource()
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err = m.readStatusFromDatabase(); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.readStatusFromDatabase()
|
||||
}
|
||||
|
||||
func (m *Migrate) readStatusFromSource() error {
|
||||
var op herrors.Op = "migrate.Migrate.readStatusFromSource"
|
||||
func (m *Migrate) readStatusFromSource() (err error) {
|
||||
firstVersion, err := m.sourceDrv.First()
|
||||
if err != nil {
|
||||
var pathErr *os.PathError
|
||||
if errors.As(err, &pathErr) {
|
||||
return nil
|
||||
}
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
m.status.Append(m.newMigrationStatus(firstVersion, "source", false))
|
||||
from := int64(firstVersion)
|
||||
|
||||
lastVersion, err := m.sourceDrv.GetLocalVersion()
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
m.status.Append(m.newMigrationStatus(lastVersion, "source", false))
|
||||
to := int64(lastVersion)
|
||||
@ -312,7 +300,7 @@ func (m *Migrate) readStatusFromSource() error {
|
||||
for from < to {
|
||||
next, err := m.sourceDrv.Next(suint64(from))
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
m.status.Append(m.newMigrationStatus(next, "source", false))
|
||||
from = int64(next)
|
||||
@ -321,7 +309,7 @@ func (m *Migrate) readStatusFromSource() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Migrate) readStatusFromDatabase() error {
|
||||
func (m *Migrate) readStatusFromDatabase() (err error) {
|
||||
firstVersion, ok := m.databaseDrv.First()
|
||||
if !ok {
|
||||
return nil
|
||||
@ -344,7 +332,7 @@ func (m *Migrate) readStatusFromDatabase() error {
|
||||
m.status.Append(m.newMigrationStatus(next.Version, "database", next.Dirty))
|
||||
from = int64(next.Version)
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *Migrate) newMigrationStatus(version uint64, driverType string, dirty bool) *MigrationStatus {
|
||||
@ -371,40 +359,33 @@ func (m *Migrate) newMigrationStatus(version uint64, driverType string, dirty bo
|
||||
}
|
||||
|
||||
func (m *Migrate) GetStatus() (*Status, error) {
|
||||
var op herrors.Op = "migrate.Migrate.GetStatus"
|
||||
err := m.calculateStatus()
|
||||
if err != nil {
|
||||
return nil, herrors.E(op, err)
|
||||
return nil, err
|
||||
}
|
||||
return m.status, nil
|
||||
}
|
||||
|
||||
func (m *Migrate) GetSetting(name string) (string, error) {
|
||||
var op herrors.Op = "migrate.Migrate.GetSetting"
|
||||
val, err := m.databaseDrv.GetSetting(name)
|
||||
if err != nil {
|
||||
return "", herrors.E(op, err)
|
||||
return "", err
|
||||
}
|
||||
return val, nil
|
||||
}
|
||||
|
||||
func (m *Migrate) UpdateSetting(name string, value string) error {
|
||||
var op herrors.Op = "migrate.Migrate.UpdateSetting"
|
||||
if err := m.databaseDrv.UpdateSetting(name, value); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.databaseDrv.UpdateSetting(name, value)
|
||||
}
|
||||
|
||||
func (m *Migrate) Version() (version uint64, dirty bool, err error) {
|
||||
var op herrors.Op = "migrate.Migrate.Version"
|
||||
v, d, err := m.databaseDrv.Version()
|
||||
if err != nil {
|
||||
return 0, false, herrors.E(op, err)
|
||||
return 0, false, err
|
||||
}
|
||||
|
||||
if v == database.NilVersion {
|
||||
return 0, false, herrors.E(op, ErrNilVersion)
|
||||
return 0, false, ErrNilVersion
|
||||
}
|
||||
|
||||
return suint64(v), d, nil
|
||||
@ -415,55 +396,42 @@ func (m *Migrate) GetUnappliedMigrations(version uint64) []uint64 {
|
||||
}
|
||||
|
||||
func (m *Migrate) ExportSchemaDump(includeSchemas []string, excludeSchemas []string, sourceName string, sourceKind hasura.SourceKind) ([]byte, error) {
|
||||
var op herrors.Op = "migrate.Migrate.ExportSchemaDump"
|
||||
b, err := m.databaseDrv.ExportSchemaDump(includeSchemas, excludeSchemas, sourceName, sourceKind)
|
||||
if err != nil {
|
||||
return b, herrors.E(op, err)
|
||||
}
|
||||
return b, nil
|
||||
return m.databaseDrv.ExportSchemaDump(includeSchemas, excludeSchemas, sourceName, sourceKind)
|
||||
}
|
||||
|
||||
func (m *Migrate) RemoveVersions(versions []uint64) error {
|
||||
var op herrors.Op = "migrate.Migrate.RemoveVersions"
|
||||
mode, err := m.databaseDrv.GetSetting("migration_mode")
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if mode != "true" {
|
||||
return herrors.E(op, ErrNoMigrationMode)
|
||||
return ErrNoMigrationMode
|
||||
}
|
||||
|
||||
if err := m.lock(); err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
for _, version := range versions {
|
||||
err = m.databaseDrv.RemoveVersion(int64(version))
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err = m.unlockErr(nil); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(nil)
|
||||
}
|
||||
|
||||
func (m *Migrate) Query(data interface{}) error {
|
||||
var op herrors.Op = "migrate.Migrate.Query"
|
||||
mode, err := m.databaseDrv.GetSetting("migration_mode")
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if mode == "true" {
|
||||
return herrors.E(op, ErrMigrationMode)
|
||||
return ErrMigrationMode
|
||||
}
|
||||
if err = m.databaseDrv.Query(data); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.databaseDrv.Query(data)
|
||||
}
|
||||
|
||||
// Squash migrations from version v into a new migration.
|
||||
@ -473,16 +441,16 @@ func (m *Migrate) Query(data interface{}) error {
|
||||
// the squashed metadata for all down steps: dm
|
||||
// the squashed SQL for all down steps: ds
|
||||
func (m *Migrate) Squash(v1 uint64, v2 int64) (vs []int64, us []byte, ds []byte, err error) {
|
||||
var op herrors.Op = "migrate.Migrate.Squash"
|
||||
// check the migration mode on the database
|
||||
mode, err := m.databaseDrv.GetSetting("migration_mode")
|
||||
if err != nil {
|
||||
return vs, us, ds, herrors.E(op, err)
|
||||
return
|
||||
}
|
||||
|
||||
// if migration_mode is false, set err to ErrNoMigrationMode and return
|
||||
if mode != "true" {
|
||||
return vs, us, ds, herrors.E(op, ErrNoMigrationMode)
|
||||
err = ErrNoMigrationMode
|
||||
return
|
||||
}
|
||||
|
||||
// concurrently squash all the up migrations
|
||||
@ -584,107 +552,87 @@ func (m *Migrate) Squash(v1 uint64, v2 int64) (vs []int64, us []byte, ds []byte,
|
||||
|
||||
// check for errors in the error channel
|
||||
for e := range errChn {
|
||||
return vs, us, ds, herrors.E(op, e)
|
||||
err = e
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return vs, us, ds, herrors.E(op, err)
|
||||
}
|
||||
return vs, us, ds, nil
|
||||
return
|
||||
}
|
||||
|
||||
// Migrate looks at the currently active migration version,
|
||||
// then migrates either up or down to the specified version.
|
||||
func (m *Migrate) Migrate(version uint64, direction string) error {
|
||||
var op herrors.Op = "migrate.Migrate.Migrate"
|
||||
mode, err := m.databaseDrv.GetSetting("migration_mode")
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if mode != "true" {
|
||||
return herrors.E(op, ErrNoMigrationMode)
|
||||
return ErrNoMigrationMode
|
||||
}
|
||||
|
||||
if err := m.lock(); err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
ret := make(chan interface{}, m.PrefetchMigrations)
|
||||
bar := newProgressBar(applyingMigrationsMessage, m.stderr, m.ProgressBarLogs)
|
||||
go m.read(version, direction, ret, bar)
|
||||
if m.DryRun {
|
||||
if e := m.unlockErr(m.runDryRun(ret)); e != nil {
|
||||
return herrors.E(op, e)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(m.runDryRun(ret))
|
||||
} else {
|
||||
if e := m.unlockErr(m.runMigrations(ret, bar)); e != nil {
|
||||
return herrors.E(op, e)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(m.runMigrations(ret, bar))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Migrate) QueryWithVersion(version uint64, data io.ReadCloser, skipExecution bool) error {
|
||||
var op herrors.Op = "migrate.Migrate.QueryWithVersion"
|
||||
mode, err := m.databaseDrv.GetSetting("migration_mode")
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if mode != "true" {
|
||||
return herrors.E(op, ErrNoMigrationMode)
|
||||
return ErrNoMigrationMode
|
||||
}
|
||||
|
||||
if err := m.lock(); err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if !skipExecution {
|
||||
if err := m.databaseDrv.Run(data, "meta", ""); err != nil {
|
||||
m.databaseDrv.ResetQuery()
|
||||
if e := m.unlockErr(err); e != nil {
|
||||
return herrors.E(op, e)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(err)
|
||||
}
|
||||
}
|
||||
|
||||
if version != 0 {
|
||||
if err := m.databaseDrv.SetVersion(int64(version), false); err != nil {
|
||||
m.databaseDrv.ResetQuery()
|
||||
if e := m.unlockErr(err); e != nil {
|
||||
return herrors.E(op, e)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(err)
|
||||
}
|
||||
}
|
||||
if err := m.unlockErr(nil); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(nil)
|
||||
}
|
||||
|
||||
// Steps looks at the currently active migration version.
|
||||
// It will migrate up if n > 0, and down if n < 0.
|
||||
func (m *Migrate) Steps(n int64) error {
|
||||
var op herrors.Op = "migrate.Migrate.Steps"
|
||||
mode, err := m.databaseDrv.GetSetting("migration_mode")
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if mode != "true" {
|
||||
return herrors.E(op, ErrNoMigrationMode)
|
||||
return ErrNoMigrationMode
|
||||
}
|
||||
|
||||
if n == 0 {
|
||||
return herrors.E(op, ErrNoChange)
|
||||
return ErrNoChange
|
||||
}
|
||||
|
||||
if err := m.lock(); err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
ret := make(chan interface{}, m.PrefetchMigrations)
|
||||
@ -697,42 +645,35 @@ func (m *Migrate) Steps(n int64) error {
|
||||
}
|
||||
|
||||
if m.DryRun {
|
||||
if err := m.unlockErr(m.runDryRun(ret)); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(m.runDryRun(ret))
|
||||
} else {
|
||||
if err := m.unlockErr(m.runMigrations(ret, bar)); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(m.runMigrations(ret, bar))
|
||||
}
|
||||
}
|
||||
|
||||
// Up looks at the currently active migration version
|
||||
// and will migrate all the way up (applying all up migrations).
|
||||
func (m *Migrate) Up() error {
|
||||
var op herrors.Op = "migrate.Migrate.Up"
|
||||
mode, err := m.databaseDrv.GetSetting("migration_mode")
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if mode != "true" {
|
||||
return herrors.E(op, ErrNoMigrationMode)
|
||||
return ErrNoMigrationMode
|
||||
}
|
||||
|
||||
if err := m.lock(); err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
curVersion, dirty, err := m.databaseDrv.Version()
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if dirty {
|
||||
return herrors.E(op, ErrDirty{curVersion})
|
||||
return ErrDirty{curVersion}
|
||||
}
|
||||
|
||||
ret := make(chan interface{}, m.PrefetchMigrations)
|
||||
@ -740,42 +681,35 @@ func (m *Migrate) Up() error {
|
||||
go m.readUp(-1, ret, bar)
|
||||
|
||||
if m.DryRun {
|
||||
if err := m.unlockErr(m.runDryRun(ret)); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(m.runDryRun(ret))
|
||||
} else {
|
||||
if err := m.unlockErr(m.runMigrations(ret, bar)); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(m.runMigrations(ret, bar))
|
||||
}
|
||||
}
|
||||
|
||||
// Down looks at the currently active migration version
|
||||
// and will migrate all the way down (applying all down migrations).
|
||||
func (m *Migrate) Down() error {
|
||||
var op herrors.Op = "migrate.Migrate.Down"
|
||||
mode, err := m.databaseDrv.GetSetting("migration_mode")
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if mode != "true" {
|
||||
return herrors.E(op, ErrNoMigrationMode)
|
||||
return ErrNoMigrationMode
|
||||
}
|
||||
|
||||
if err := m.lock(); err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
curVersion, dirty, err := m.databaseDrv.Version()
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if dirty {
|
||||
return herrors.E(op, ErrDirty{curVersion})
|
||||
return ErrDirty{curVersion}
|
||||
}
|
||||
|
||||
ret := make(chan interface{}, m.PrefetchMigrations)
|
||||
@ -783,15 +717,9 @@ func (m *Migrate) Down() error {
|
||||
go m.readDown(-1, ret, bar)
|
||||
|
||||
if m.DryRun {
|
||||
if err := m.unlockErr(m.runDryRun(ret)); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(m.runDryRun(ret))
|
||||
} else {
|
||||
if err := m.unlockErr(m.runMigrations(ret, bar)); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(m.runMigrations(ret, bar))
|
||||
}
|
||||
}
|
||||
|
||||
@ -1284,7 +1212,6 @@ func (m *Migrate) readDown(limit int64, ret chan<- interface{}, bar *pb.Progress
|
||||
// to stop execution because it might have received a stop signal on the
|
||||
// GracefulStop channel.
|
||||
func (m *Migrate) runMigrations(ret <-chan interface{}, bar *pb.ProgressBar) error {
|
||||
var op herrors.Op = "migrate.Migrate.runMigrations"
|
||||
startProgressBar(bar)
|
||||
setWidthProgressBar(bar, m.ProgressBarLogs)
|
||||
defer finishProgressBar(bar)
|
||||
@ -1297,25 +1224,25 @@ func (m *Migrate) runMigrations(ret <-chan interface{}, bar *pb.ProgressBar) err
|
||||
case error:
|
||||
// Clear Migration query
|
||||
m.databaseDrv.ResetQuery()
|
||||
return herrors.E(op, r)
|
||||
return r
|
||||
case *Migration:
|
||||
if r.Body != nil {
|
||||
if !m.SkipExecution {
|
||||
m.Logger.Debugf("applying migration: %s", r.FileName)
|
||||
if err := m.databaseDrv.Run(r.BufferedBody, r.FileType, r.FileName); err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
incrementProgressBar(bar)
|
||||
version := int64(r.Version)
|
||||
// Insert Version number into the table
|
||||
if err := m.databaseDrv.SetVersion(version, false); err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
if version != r.TargetVersion {
|
||||
// Delete Version number from the table
|
||||
if err := m.databaseDrv.RemoveVersion(version); err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1325,7 +1252,6 @@ func (m *Migrate) runMigrations(ret <-chan interface{}, bar *pb.ProgressBar) err
|
||||
}
|
||||
|
||||
func (m *Migrate) runDryRun(ret <-chan interface{}) error {
|
||||
var op herrors.Op = "migrate.Migrate.runDryRun"
|
||||
migrations := make([]*Migration, 0)
|
||||
var lastInsertVersion int64
|
||||
for r := range ret {
|
||||
@ -1335,7 +1261,7 @@ func (m *Migrate) runDryRun(ret <-chan interface{}) error {
|
||||
|
||||
switch r := r.(type) {
|
||||
case error:
|
||||
return herrors.E(op, r)
|
||||
return r
|
||||
case *Migration:
|
||||
if r.Body != nil {
|
||||
version := int64(r.Version)
|
||||
@ -1430,11 +1356,10 @@ func (m *Migrate) squashMigrations(retUp <-chan interface{}, retDown <-chan inte
|
||||
// versionUpExists checks the source if either the up or down migration for
|
||||
// the specified migration version exists.
|
||||
func (m *Migrate) versionUpExists(version uint64) error {
|
||||
var op herrors.Op = "migrate.Migrate.versionUpExists"
|
||||
// try up migration first
|
||||
directions := m.sourceDrv.GetDirections(version)
|
||||
if !directions[source.Up] && !directions[source.MetaUp] {
|
||||
return herrors.E(op, fmt.Errorf("%d up migration not found", version))
|
||||
return fmt.Errorf("%d up migration not found", version)
|
||||
}
|
||||
|
||||
if directions[source.Up] {
|
||||
@ -1446,10 +1371,7 @@ func (m *Migrate) versionUpExists(version uint64) error {
|
||||
if errors.Is(err, fs.ErrExist) {
|
||||
return nil
|
||||
} else if !errors.Is(err, fs.ErrNotExist) {
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@ -1462,24 +1384,20 @@ func (m *Migrate) versionUpExists(version uint64) error {
|
||||
if errors.Is(err, fs.ErrExist) {
|
||||
return nil
|
||||
} else if !errors.Is(err, fs.ErrNotExist) {
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return herrors.E(op, os.ErrNotExist)
|
||||
return os.ErrNotExist
|
||||
}
|
||||
|
||||
// versionDownExists checks the source if either the up or down migration for
|
||||
// the specified migration version exists.
|
||||
func (m *Migrate) versionDownExists(version uint64) error {
|
||||
var op herrors.Op = "migrate.Migrate.versionDownExists"
|
||||
// try down migration first
|
||||
directions := m.sourceDrv.GetDirections(version)
|
||||
if !directions[source.Down] && !directions[source.MetaDown] {
|
||||
return herrors.E(op, fmt.Errorf("%d down migration not found", version))
|
||||
return fmt.Errorf("%d down migration not found", version)
|
||||
}
|
||||
|
||||
if directions[source.Down] {
|
||||
@ -1491,10 +1409,7 @@ func (m *Migrate) versionDownExists(version uint64) error {
|
||||
if errors.Is(err, fs.ErrExist) {
|
||||
return nil
|
||||
} else if !errors.Is(err, fs.ErrNotExist) {
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@ -1507,21 +1422,17 @@ func (m *Migrate) versionDownExists(version uint64) error {
|
||||
if errors.Is(err, fs.ErrExist) {
|
||||
return nil
|
||||
} else if !errors.Is(err, fs.ErrNotExist) {
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return herrors.E(op, os.ErrNotExist)
|
||||
return os.ErrNotExist
|
||||
}
|
||||
|
||||
// newMigration is a helper func that returns a *Migration for the
|
||||
// specified version and targetVersion (sql).
|
||||
// will return the down migration
|
||||
func (m *Migrate) newMigration(version uint64, targetVersion int64) (*Migration, error) {
|
||||
var op herrors.Op = "migrate.Migrate.newMigration"
|
||||
var migr *Migration
|
||||
|
||||
if targetVersion >= int64(version) {
|
||||
@ -1530,17 +1441,17 @@ func (m *Migrate) newMigration(version uint64, targetVersion int64) (*Migration,
|
||||
// create "empty" migration
|
||||
migr, err = NewMigration(nil, "", version, targetVersion, "sql", "")
|
||||
if err != nil {
|
||||
return nil, herrors.E(op, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
} else if err != nil {
|
||||
return nil, herrors.E(op, err)
|
||||
return nil, err
|
||||
|
||||
} else {
|
||||
// create migration from up source
|
||||
migr, err = NewMigration(r, identifier, version, targetVersion, "sql", fileName)
|
||||
if err != nil {
|
||||
return nil, herrors.E(op, err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -1550,17 +1461,17 @@ func (m *Migrate) newMigration(version uint64, targetVersion int64) (*Migration,
|
||||
// create "empty" migration
|
||||
migr, err = NewMigration(nil, "", version, targetVersion, "sql", "")
|
||||
if err != nil {
|
||||
return nil, herrors.E(op, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
} else if err != nil {
|
||||
return nil, herrors.E(op, err)
|
||||
return nil, err
|
||||
|
||||
} else {
|
||||
// create migration from down source
|
||||
migr, err = NewMigration(r, identifier, version, targetVersion, "sql", fileName)
|
||||
if err != nil {
|
||||
return nil, herrors.E(op, err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1588,12 +1499,11 @@ func (m *Migrate) stop() bool {
|
||||
// lock is a thread safe helper function to lock the database.
|
||||
// It should be called as late as possible when running migrations.
|
||||
func (m *Migrate) lock() error {
|
||||
var op herrors.Op = "migrate.Migrate.lock"
|
||||
m.isLockedMu.Lock()
|
||||
defer m.isLockedMu.Unlock()
|
||||
|
||||
if m.isLocked {
|
||||
return herrors.E(op, ErrLocked)
|
||||
return ErrLocked
|
||||
}
|
||||
|
||||
// create done channel, used in the timeout goroutine
|
||||
@ -1633,69 +1543,57 @@ func (m *Migrate) lock() error {
|
||||
if err == nil {
|
||||
m.isLocked = true
|
||||
}
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// unlock is a thread safe helper function to unlock the database.
|
||||
// It should be called as early as possible when no more migrations are
|
||||
// expected to be executed.
|
||||
func (m *Migrate) unlock() error {
|
||||
var op herrors.Op = "migrate.Migrate.unlock"
|
||||
m.isLockedMu.Lock()
|
||||
defer m.isLockedMu.Unlock()
|
||||
|
||||
defer func() {
|
||||
m.isLocked = false
|
||||
}()
|
||||
if err := m.databaseDrv.UnLock(); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.databaseDrv.UnLock()
|
||||
}
|
||||
|
||||
// unlockErr calls unlock and returns a combined error
|
||||
// if a prevErr is not nil.
|
||||
func (m *Migrate) unlockErr(prevErr error) error {
|
||||
var op herrors.Op = "migrate.Migrate.unlockErr"
|
||||
if err := m.unlock(); err != nil {
|
||||
return herrors.E(op, NewMultiError(prevErr, err))
|
||||
return NewMultiError(prevErr, err)
|
||||
}
|
||||
if prevErr != nil {
|
||||
return herrors.E(op, prevErr)
|
||||
}
|
||||
return nil
|
||||
return prevErr
|
||||
}
|
||||
|
||||
// GotoVersion will apply a version also applying the migration chain
|
||||
// leading to it
|
||||
func (m *Migrate) GotoVersion(gotoVersion int64) error {
|
||||
var op herrors.Op = "migrate.Migrate.GotoVersion"
|
||||
mode, err := m.databaseDrv.GetSetting("migration_mode")
|
||||
if err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
if mode != "true" {
|
||||
return herrors.E(op, ErrNoMigrationMode)
|
||||
return ErrNoMigrationMode
|
||||
}
|
||||
|
||||
currentVersion, dirty, err := m.Version()
|
||||
currVersion := int64(currentVersion)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrNilVersion) {
|
||||
if err == ErrNilVersion {
|
||||
currVersion = database.NilVersion
|
||||
} else {
|
||||
return herrors.E(op, fmt.Errorf("cannot determine version: %w", err))
|
||||
return errors.Wrap(err, "cannot determine version")
|
||||
}
|
||||
}
|
||||
if dirty {
|
||||
return herrors.E(op, ErrDirty{currVersion})
|
||||
return ErrDirty{currVersion}
|
||||
}
|
||||
|
||||
if err := m.lock(); err != nil {
|
||||
return herrors.E(op, err)
|
||||
return err
|
||||
}
|
||||
|
||||
ret := make(chan interface{})
|
||||
@ -1707,15 +1605,9 @@ func (m *Migrate) GotoVersion(gotoVersion int64) error {
|
||||
}
|
||||
|
||||
if m.DryRun {
|
||||
if err := m.unlockErr(m.runDryRun(ret)); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(m.runDryRun(ret))
|
||||
} else {
|
||||
if err := m.unlockErr(m.runMigrations(ret, bar)); err != nil {
|
||||
return herrors.E(op, err)
|
||||
}
|
||||
return nil
|
||||
return m.unlockErr(m.runMigrations(ret, bar))
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user