diff --git a/cli/migrate/migrate.go b/cli/migrate/migrate.go index 5f822db91ca..8ee66c711a3 100644 --- a/cli/migrate/migrate.go +++ b/cli/migrate/migrate.go @@ -8,7 +8,6 @@ package migrate import ( "bytes" "container/list" - "errors" "fmt" "io" "io/fs" @@ -20,11 +19,12 @@ import ( "github.com/cheggaaa/pb/v3" "golang.org/x/term" - herrors "github.com/hasura/graphql-engine/cli/v2/internal/errors" "github.com/hasura/graphql-engine/cli/v2/internal/hasura" + "github.com/hasura/graphql-engine/cli/v2/util" + "github.com/hasura/graphql-engine/cli/v2/migrate/database" "github.com/hasura/graphql-engine/cli/v2/migrate/source" - "github.com/hasura/graphql-engine/cli/v2/util" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -178,7 +178,6 @@ type NewMigrateOpts struct { // New returns a new Migrate instance from a source URL and a database URL. // The URL scheme is defined by each driver. func New(opts NewMigrateOpts) (*Migrate, error) { - var op herrors.Op = "migrate.New" m := newCommon(opts.cmd) m.stdout = opts.Stdout m.stderr = opts.Stderr @@ -186,7 +185,7 @@ func New(opts NewMigrateOpts) (*Migrate, error) { sourceName, err := schemeFromUrl(opts.sourceUrl) if err != nil { log.Debug(err) - return nil, herrors.E(op, err) + return nil, err } m.sourceName = sourceName m.sourceURL = opts.sourceUrl @@ -194,7 +193,7 @@ func New(opts NewMigrateOpts) (*Migrate, error) { databaseName, err := schemeFromUrl(opts.databaseUrl) if err != nil { log.Debug(err) - return nil, herrors.E(op, err) + return nil, err } m.databaseName = databaseName m.databaseURL = opts.databaseUrl @@ -207,7 +206,7 @@ func New(opts NewMigrateOpts) (*Migrate, error) { sourceDrv, err := source.Open(opts.sourceUrl, opts.logger) if err != nil { log.Debug(err) - return nil, herrors.E(op, err) + return nil, err } m.sourceDrv = sourceDrv if opts.configVersion >= 2 { @@ -217,13 +216,13 @@ func New(opts NewMigrateOpts) (*Migrate, error) { databaseDrv, err := database.Open(opts.databaseUrl, opts.cmd, opts.logger, opts.hasuraOpts) if err != nil { log.Debug(err) - return nil, herrors.E(op, err) + return nil, err } m.databaseDrv = databaseDrv err = m.ReScan() if err != nil { - return nil, herrors.E(op, err) + return nil, err } return m, nil } @@ -239,72 +238,61 @@ func newCommon(cmd bool) *Migrate { } func (m *Migrate) ReScan() error { - var op herrors.Op = "migrate.Migrate.ReScan" err := m.sourceDrv.Scan() if err != nil { m.Logger.Debug(err) - return herrors.E(op, err) + return err } err = m.databaseDrv.Scan() if err != nil { m.Logger.Debug(err) - return herrors.E(op, err) + return err } err = m.calculateStatus() if err != nil { - return herrors.E(op, err) + return err } return nil } // Close closes the source and the database. -func (m *Migrate) Close() error { - var op herrors.Op = "migrate.Migrate.Close" +func (m *Migrate) Close() (source error) { sourceSrvClose := make(chan error) go func() { sourceSrvClose <- m.sourceDrv.Close() }() - err := <-sourceSrvClose - if err != nil { - return herrors.E(op, err) - } - return nil + return <-sourceSrvClose } -func (m *Migrate) calculateStatus() error { - var op herrors.Op = "migrate.Migrate.calculateStatus" +func (m *Migrate) calculateStatus() (err error) { m.status = NewStatus() - err := m.readStatusFromSource() + err = m.readStatusFromSource() if err != nil { - return herrors.E(op, err) + return err } - if err = m.readStatusFromDatabase(); err != nil { - return herrors.E(op, err) - } - return nil + return m.readStatusFromDatabase() } -func (m *Migrate) readStatusFromSource() error { - var op herrors.Op = "migrate.Migrate.readStatusFromSource" +func (m *Migrate) readStatusFromSource() (err error) { firstVersion, err := m.sourceDrv.First() if err != nil { var pathErr *os.PathError if errors.As(err, &pathErr) { return nil } - return herrors.E(op, err) + return err } m.status.Append(m.newMigrationStatus(firstVersion, "source", false)) from := int64(firstVersion) lastVersion, err := m.sourceDrv.GetLocalVersion() if err != nil { - return herrors.E(op, err) + return err } m.status.Append(m.newMigrationStatus(lastVersion, "source", false)) to := int64(lastVersion) @@ -312,7 +300,7 @@ func (m *Migrate) readStatusFromSource() error { for from < to { next, err := m.sourceDrv.Next(suint64(from)) if err != nil { - return herrors.E(op, err) + return err } m.status.Append(m.newMigrationStatus(next, "source", false)) from = int64(next) @@ -321,7 +309,7 @@ func (m *Migrate) readStatusFromSource() error { return nil } -func (m *Migrate) readStatusFromDatabase() error { +func (m *Migrate) readStatusFromDatabase() (err error) { firstVersion, ok := m.databaseDrv.First() if !ok { return nil @@ -344,7 +332,7 @@ func (m *Migrate) readStatusFromDatabase() error { m.status.Append(m.newMigrationStatus(next.Version, "database", next.Dirty)) from = int64(next.Version) } - return nil + return err } func (m *Migrate) newMigrationStatus(version uint64, driverType string, dirty bool) *MigrationStatus { @@ -371,40 +359,33 @@ func (m *Migrate) newMigrationStatus(version uint64, driverType string, dirty bo } func (m *Migrate) GetStatus() (*Status, error) { - var op herrors.Op = "migrate.Migrate.GetStatus" err := m.calculateStatus() if err != nil { - return nil, herrors.E(op, err) + return nil, err } return m.status, nil } func (m *Migrate) GetSetting(name string) (string, error) { - var op herrors.Op = "migrate.Migrate.GetSetting" val, err := m.databaseDrv.GetSetting(name) if err != nil { - return "", herrors.E(op, err) + return "", err } return val, nil } func (m *Migrate) UpdateSetting(name string, value string) error { - var op herrors.Op = "migrate.Migrate.UpdateSetting" - if err := m.databaseDrv.UpdateSetting(name, value); err != nil { - return herrors.E(op, err) - } - return nil + return m.databaseDrv.UpdateSetting(name, value) } func (m *Migrate) Version() (version uint64, dirty bool, err error) { - var op herrors.Op = "migrate.Migrate.Version" v, d, err := m.databaseDrv.Version() if err != nil { - return 0, false, herrors.E(op, err) + return 0, false, err } if v == database.NilVersion { - return 0, false, herrors.E(op, ErrNilVersion) + return 0, false, ErrNilVersion } return suint64(v), d, nil @@ -415,55 +396,42 @@ func (m *Migrate) GetUnappliedMigrations(version uint64) []uint64 { } func (m *Migrate) ExportSchemaDump(includeSchemas []string, excludeSchemas []string, sourceName string, sourceKind hasura.SourceKind) ([]byte, error) { - var op herrors.Op = "migrate.Migrate.ExportSchemaDump" - b, err := m.databaseDrv.ExportSchemaDump(includeSchemas, excludeSchemas, sourceName, sourceKind) - if err != nil { - return b, herrors.E(op, err) - } - return b, nil + return m.databaseDrv.ExportSchemaDump(includeSchemas, excludeSchemas, sourceName, sourceKind) } func (m *Migrate) RemoveVersions(versions []uint64) error { - var op herrors.Op = "migrate.Migrate.RemoveVersions" mode, err := m.databaseDrv.GetSetting("migration_mode") if err != nil { - return herrors.E(op, err) + return err } if mode != "true" { - return herrors.E(op, ErrNoMigrationMode) + return ErrNoMigrationMode } if err := m.lock(); err != nil { - return herrors.E(op, err) + return err } for _, version := range versions { err = m.databaseDrv.RemoveVersion(int64(version)) if err != nil { - return herrors.E(op, err) + return err } } - if err = m.unlockErr(nil); err != nil { - return herrors.E(op, err) - } - return nil + return m.unlockErr(nil) } func (m *Migrate) Query(data interface{}) error { - var op herrors.Op = "migrate.Migrate.Query" mode, err := m.databaseDrv.GetSetting("migration_mode") if err != nil { - return herrors.E(op, err) + return err } if mode == "true" { - return herrors.E(op, ErrMigrationMode) + return ErrMigrationMode } - if err = m.databaseDrv.Query(data); err != nil { - return herrors.E(op, err) - } - return nil + return m.databaseDrv.Query(data) } // Squash migrations from version v into a new migration. @@ -473,16 +441,16 @@ func (m *Migrate) Query(data interface{}) error { // the squashed metadata for all down steps: dm // the squashed SQL for all down steps: ds func (m *Migrate) Squash(v1 uint64, v2 int64) (vs []int64, us []byte, ds []byte, err error) { - var op herrors.Op = "migrate.Migrate.Squash" // check the migration mode on the database mode, err := m.databaseDrv.GetSetting("migration_mode") if err != nil { - return vs, us, ds, herrors.E(op, err) + return } // if migration_mode is false, set err to ErrNoMigrationMode and return if mode != "true" { - return vs, us, ds, herrors.E(op, ErrNoMigrationMode) + err = ErrNoMigrationMode + return } // concurrently squash all the up migrations @@ -584,107 +552,87 @@ func (m *Migrate) Squash(v1 uint64, v2 int64) (vs []int64, us []byte, ds []byte, // check for errors in the error channel for e := range errChn { - return vs, us, ds, herrors.E(op, e) + err = e + return } - if err != nil { - return vs, us, ds, herrors.E(op, err) - } - return vs, us, ds, nil + return } // Migrate looks at the currently active migration version, // then migrates either up or down to the specified version. func (m *Migrate) Migrate(version uint64, direction string) error { - var op herrors.Op = "migrate.Migrate.Migrate" mode, err := m.databaseDrv.GetSetting("migration_mode") if err != nil { - return herrors.E(op, err) + return err } if mode != "true" { - return herrors.E(op, ErrNoMigrationMode) + return ErrNoMigrationMode } if err := m.lock(); err != nil { - return herrors.E(op, err) + return err } ret := make(chan interface{}, m.PrefetchMigrations) bar := newProgressBar(applyingMigrationsMessage, m.stderr, m.ProgressBarLogs) go m.read(version, direction, ret, bar) if m.DryRun { - if e := m.unlockErr(m.runDryRun(ret)); e != nil { - return herrors.E(op, e) - } - return nil + return m.unlockErr(m.runDryRun(ret)) } else { - if e := m.unlockErr(m.runMigrations(ret, bar)); e != nil { - return herrors.E(op, e) - } - return nil + return m.unlockErr(m.runMigrations(ret, bar)) } } func (m *Migrate) QueryWithVersion(version uint64, data io.ReadCloser, skipExecution bool) error { - var op herrors.Op = "migrate.Migrate.QueryWithVersion" mode, err := m.databaseDrv.GetSetting("migration_mode") if err != nil { - return herrors.E(op, err) + return err } if mode != "true" { - return herrors.E(op, ErrNoMigrationMode) + return ErrNoMigrationMode } if err := m.lock(); err != nil { - return herrors.E(op, err) + return err } if !skipExecution { if err := m.databaseDrv.Run(data, "meta", ""); err != nil { m.databaseDrv.ResetQuery() - if e := m.unlockErr(err); e != nil { - return herrors.E(op, e) - } - return nil + return m.unlockErr(err) } } if version != 0 { if err := m.databaseDrv.SetVersion(int64(version), false); err != nil { m.databaseDrv.ResetQuery() - if e := m.unlockErr(err); e != nil { - return herrors.E(op, e) - } - return nil + return m.unlockErr(err) } } - if err := m.unlockErr(nil); err != nil { - return herrors.E(op, err) - } - return nil + return m.unlockErr(nil) } // Steps looks at the currently active migration version. // It will migrate up if n > 0, and down if n < 0. func (m *Migrate) Steps(n int64) error { - var op herrors.Op = "migrate.Migrate.Steps" mode, err := m.databaseDrv.GetSetting("migration_mode") if err != nil { - return herrors.E(op, err) + return err } if mode != "true" { - return herrors.E(op, ErrNoMigrationMode) + return ErrNoMigrationMode } if n == 0 { - return herrors.E(op, ErrNoChange) + return ErrNoChange } if err := m.lock(); err != nil { - return herrors.E(op, err) + return err } ret := make(chan interface{}, m.PrefetchMigrations) @@ -697,42 +645,35 @@ func (m *Migrate) Steps(n int64) error { } if m.DryRun { - if err := m.unlockErr(m.runDryRun(ret)); err != nil { - return herrors.E(op, err) - } - return nil + return m.unlockErr(m.runDryRun(ret)) } else { - if err := m.unlockErr(m.runMigrations(ret, bar)); err != nil { - return herrors.E(op, err) - } - return nil + return m.unlockErr(m.runMigrations(ret, bar)) } } // Up looks at the currently active migration version // and will migrate all the way up (applying all up migrations). func (m *Migrate) Up() error { - var op herrors.Op = "migrate.Migrate.Up" mode, err := m.databaseDrv.GetSetting("migration_mode") if err != nil { - return herrors.E(op, err) + return err } if mode != "true" { - return herrors.E(op, ErrNoMigrationMode) + return ErrNoMigrationMode } if err := m.lock(); err != nil { - return herrors.E(op, err) + return err } curVersion, dirty, err := m.databaseDrv.Version() if err != nil { - return herrors.E(op, err) + return err } if dirty { - return herrors.E(op, ErrDirty{curVersion}) + return ErrDirty{curVersion} } ret := make(chan interface{}, m.PrefetchMigrations) @@ -740,42 +681,35 @@ func (m *Migrate) Up() error { go m.readUp(-1, ret, bar) if m.DryRun { - if err := m.unlockErr(m.runDryRun(ret)); err != nil { - return herrors.E(op, err) - } - return nil + return m.unlockErr(m.runDryRun(ret)) } else { - if err := m.unlockErr(m.runMigrations(ret, bar)); err != nil { - return herrors.E(op, err) - } - return nil + return m.unlockErr(m.runMigrations(ret, bar)) } } // Down looks at the currently active migration version // and will migrate all the way down (applying all down migrations). func (m *Migrate) Down() error { - var op herrors.Op = "migrate.Migrate.Down" mode, err := m.databaseDrv.GetSetting("migration_mode") if err != nil { - return herrors.E(op, err) + return err } if mode != "true" { - return herrors.E(op, ErrNoMigrationMode) + return ErrNoMigrationMode } if err := m.lock(); err != nil { - return herrors.E(op, err) + return err } curVersion, dirty, err := m.databaseDrv.Version() if err != nil { - return herrors.E(op, err) + return err } if dirty { - return herrors.E(op, ErrDirty{curVersion}) + return ErrDirty{curVersion} } ret := make(chan interface{}, m.PrefetchMigrations) @@ -783,15 +717,9 @@ func (m *Migrate) Down() error { go m.readDown(-1, ret, bar) if m.DryRun { - if err := m.unlockErr(m.runDryRun(ret)); err != nil { - return herrors.E(op, err) - } - return nil + return m.unlockErr(m.runDryRun(ret)) } else { - if err := m.unlockErr(m.runMigrations(ret, bar)); err != nil { - return herrors.E(op, err) - } - return nil + return m.unlockErr(m.runMigrations(ret, bar)) } } @@ -1284,7 +1212,6 @@ func (m *Migrate) readDown(limit int64, ret chan<- interface{}, bar *pb.Progress // to stop execution because it might have received a stop signal on the // GracefulStop channel. func (m *Migrate) runMigrations(ret <-chan interface{}, bar *pb.ProgressBar) error { - var op herrors.Op = "migrate.Migrate.runMigrations" startProgressBar(bar) setWidthProgressBar(bar, m.ProgressBarLogs) defer finishProgressBar(bar) @@ -1297,25 +1224,25 @@ func (m *Migrate) runMigrations(ret <-chan interface{}, bar *pb.ProgressBar) err case error: // Clear Migration query m.databaseDrv.ResetQuery() - return herrors.E(op, r) + return r case *Migration: if r.Body != nil { if !m.SkipExecution { m.Logger.Debugf("applying migration: %s", r.FileName) if err := m.databaseDrv.Run(r.BufferedBody, r.FileType, r.FileName); err != nil { - return herrors.E(op, err) + return err } } incrementProgressBar(bar) version := int64(r.Version) // Insert Version number into the table if err := m.databaseDrv.SetVersion(version, false); err != nil { - return herrors.E(op, err) + return err } if version != r.TargetVersion { // Delete Version number from the table if err := m.databaseDrv.RemoveVersion(version); err != nil { - return herrors.E(op, err) + return err } } } @@ -1325,7 +1252,6 @@ func (m *Migrate) runMigrations(ret <-chan interface{}, bar *pb.ProgressBar) err } func (m *Migrate) runDryRun(ret <-chan interface{}) error { - var op herrors.Op = "migrate.Migrate.runDryRun" migrations := make([]*Migration, 0) var lastInsertVersion int64 for r := range ret { @@ -1335,7 +1261,7 @@ func (m *Migrate) runDryRun(ret <-chan interface{}) error { switch r := r.(type) { case error: - return herrors.E(op, r) + return r case *Migration: if r.Body != nil { version := int64(r.Version) @@ -1430,11 +1356,10 @@ func (m *Migrate) squashMigrations(retUp <-chan interface{}, retDown <-chan inte // versionUpExists checks the source if either the up or down migration for // the specified migration version exists. func (m *Migrate) versionUpExists(version uint64) error { - var op herrors.Op = "migrate.Migrate.versionUpExists" // try up migration first directions := m.sourceDrv.GetDirections(version) if !directions[source.Up] && !directions[source.MetaUp] { - return herrors.E(op, fmt.Errorf("%d up migration not found", version)) + return fmt.Errorf("%d up migration not found", version) } if directions[source.Up] { @@ -1446,10 +1371,7 @@ func (m *Migrate) versionUpExists(version uint64) error { if errors.Is(err, fs.ErrExist) { return nil } else if !errors.Is(err, fs.ErrNotExist) { - if err != nil { - return herrors.E(op, err) - } - return nil + return err } } @@ -1462,24 +1384,20 @@ func (m *Migrate) versionUpExists(version uint64) error { if errors.Is(err, fs.ErrExist) { return nil } else if !errors.Is(err, fs.ErrNotExist) { - if err != nil { - return herrors.E(op, err) - } - return nil + return err } } - return herrors.E(op, os.ErrNotExist) + return os.ErrNotExist } // versionDownExists checks the source if either the up or down migration for // the specified migration version exists. func (m *Migrate) versionDownExists(version uint64) error { - var op herrors.Op = "migrate.Migrate.versionDownExists" // try down migration first directions := m.sourceDrv.GetDirections(version) if !directions[source.Down] && !directions[source.MetaDown] { - return herrors.E(op, fmt.Errorf("%d down migration not found", version)) + return fmt.Errorf("%d down migration not found", version) } if directions[source.Down] { @@ -1491,10 +1409,7 @@ func (m *Migrate) versionDownExists(version uint64) error { if errors.Is(err, fs.ErrExist) { return nil } else if !errors.Is(err, fs.ErrNotExist) { - if err != nil { - return herrors.E(op, err) - } - return nil + return err } } @@ -1507,21 +1422,17 @@ func (m *Migrate) versionDownExists(version uint64) error { if errors.Is(err, fs.ErrExist) { return nil } else if !errors.Is(err, fs.ErrNotExist) { - if err != nil { - return herrors.E(op, err) - } - return nil + return err } } - return herrors.E(op, os.ErrNotExist) + return os.ErrNotExist } // newMigration is a helper func that returns a *Migration for the // specified version and targetVersion (sql). // will return the down migration func (m *Migrate) newMigration(version uint64, targetVersion int64) (*Migration, error) { - var op herrors.Op = "migrate.Migrate.newMigration" var migr *Migration if targetVersion >= int64(version) { @@ -1530,17 +1441,17 @@ func (m *Migrate) newMigration(version uint64, targetVersion int64) (*Migration, // create "empty" migration migr, err = NewMigration(nil, "", version, targetVersion, "sql", "") if err != nil { - return nil, herrors.E(op, err) + return nil, err } } else if err != nil { - return nil, herrors.E(op, err) + return nil, err } else { // create migration from up source migr, err = NewMigration(r, identifier, version, targetVersion, "sql", fileName) if err != nil { - return nil, herrors.E(op, err) + return nil, err } } @@ -1550,17 +1461,17 @@ func (m *Migrate) newMigration(version uint64, targetVersion int64) (*Migration, // create "empty" migration migr, err = NewMigration(nil, "", version, targetVersion, "sql", "") if err != nil { - return nil, herrors.E(op, err) + return nil, err } } else if err != nil { - return nil, herrors.E(op, err) + return nil, err } else { // create migration from down source migr, err = NewMigration(r, identifier, version, targetVersion, "sql", fileName) if err != nil { - return nil, herrors.E(op, err) + return nil, err } } } @@ -1588,12 +1499,11 @@ func (m *Migrate) stop() bool { // lock is a thread safe helper function to lock the database. // It should be called as late as possible when running migrations. func (m *Migrate) lock() error { - var op herrors.Op = "migrate.Migrate.lock" m.isLockedMu.Lock() defer m.isLockedMu.Unlock() if m.isLocked { - return herrors.E(op, ErrLocked) + return ErrLocked } // create done channel, used in the timeout goroutine @@ -1633,69 +1543,57 @@ func (m *Migrate) lock() error { if err == nil { m.isLocked = true } - if err != nil { - return herrors.E(op, err) - } - return nil + return err } // unlock is a thread safe helper function to unlock the database. // It should be called as early as possible when no more migrations are // expected to be executed. func (m *Migrate) unlock() error { - var op herrors.Op = "migrate.Migrate.unlock" m.isLockedMu.Lock() defer m.isLockedMu.Unlock() defer func() { m.isLocked = false }() - if err := m.databaseDrv.UnLock(); err != nil { - return herrors.E(op, err) - } - return nil + return m.databaseDrv.UnLock() } // unlockErr calls unlock and returns a combined error // if a prevErr is not nil. func (m *Migrate) unlockErr(prevErr error) error { - var op herrors.Op = "migrate.Migrate.unlockErr" if err := m.unlock(); err != nil { - return herrors.E(op, NewMultiError(prevErr, err)) + return NewMultiError(prevErr, err) } - if prevErr != nil { - return herrors.E(op, prevErr) - } - return nil + return prevErr } // GotoVersion will apply a version also applying the migration chain // leading to it func (m *Migrate) GotoVersion(gotoVersion int64) error { - var op herrors.Op = "migrate.Migrate.GotoVersion" mode, err := m.databaseDrv.GetSetting("migration_mode") if err != nil { - return herrors.E(op, err) + return err } if mode != "true" { - return herrors.E(op, ErrNoMigrationMode) + return ErrNoMigrationMode } currentVersion, dirty, err := m.Version() currVersion := int64(currentVersion) if err != nil { - if errors.Is(err, ErrNilVersion) { + if err == ErrNilVersion { currVersion = database.NilVersion } else { - return herrors.E(op, fmt.Errorf("cannot determine version: %w", err)) + return errors.Wrap(err, "cannot determine version") } } if dirty { - return herrors.E(op, ErrDirty{currVersion}) + return ErrDirty{currVersion} } if err := m.lock(); err != nil { - return herrors.E(op, err) + return err } ret := make(chan interface{}) @@ -1707,15 +1605,9 @@ func (m *Migrate) GotoVersion(gotoVersion int64) error { } if m.DryRun { - if err := m.unlockErr(m.runDryRun(ret)); err != nil { - return herrors.E(op, err) - } - return nil + return m.unlockErr(m.runDryRun(ret)) } else { - if err := m.unlockErr(m.runMigrations(ret, bar)); err != nil { - return herrors.E(op, err) - } - return nil + return m.unlockErr(m.runMigrations(ret, bar)) } }