graphql-engine/cli/migrate/migration.go
Mohd Bilal c55cca3061 cli: refactor migrate/ to use internal/errors
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7006
GitOrigin-RevId: bfa2be15cef0460b32576e8297cfa177ed59031a
2022-11-23 17:15:19 +00:00

152 lines
4.3 KiB
Go

package migrate
import (
"bufio"
"io"
"time"
"github.com/hasura/graphql-engine/cli/v2/internal/errors"
)
// DefaultBufferSize sets the in memory buffer size (in Bytes) for every
// pre-read migration (see DefaultPrefetchMigrations).
var DefaultBufferSize = uint64(100000)
// Migration holds information about a migration.
// It is initially created from data coming from the source and then
// used when run against the database.
type Migration struct {
// Identifier can be any string to help identifying
// the migration in the source.
Identifier string
// Version is the version of this migration.
Version uint64
// TargetVersion is the migration version after this migration
// has been applied to the database.
// Can be -1, implying that this is a NilVersion.
TargetVersion int64
// File Type
FileType string
// File name
FileName string
// Body holds an io.ReadCloser to the source.
Body io.ReadCloser
// BufferedBody holds an buffered io.Reader to the underlying Body.
BufferedBody io.Reader
// BufferSize defaults to DefaultBufferSize
BufferSize uint64
// bufferWriter holds an io.WriteCloser and pipes to BufferBody.
// It's an *Closer for flow control.
bufferWriter io.WriteCloser
// Scheduled is the time when the migration was scheduled/ queued.
Scheduled time.Time
// StartedBuffering is the time when buffering of the migration source started.
StartedBuffering time.Time
// FinishedBuffering is the time when buffering of the migration source finished.
FinishedBuffering time.Time
// FinishedReading is the time when the migration source is fully read.
FinishedReading time.Time
// BytesRead holds the number of Bytes read from the migration source.
BytesRead int64
}
// NewMigration returns a new Migration and sets the body, identifier,
// version and targetVersion. Body can be nil, which turns this migration
// into a "NilMigration". If no identifier is provided, it will default to "<empty>".
// targetVersion can be -1, implying it is a NilVersion.
//
// What is a NilMigration?
// Usually each migration version coming from source is expected to have an
// Up and Down migration. This is not a hard requirement though, leading to
// a situation where only the Up or Down migration is present. So let's say
// the user wants to migrate up to a version that doesn't have the actual Up
// migration, in that case we still want to apply the version, but with an empty
// body. We are calling that a NilMigration, a migration with an empty body.
//
// What is a NilVersion?
// NilVersion is a const(-1). When running down migrations and we are at the
// last down migration, there is no next down migration, the targetVersion should
// be nil. Nil in this case is represented by -1 (because type int).
func NewMigration(body io.ReadCloser, identifier string, version uint64, targetVersion int64, fileType string, fileName string) (*Migration, error) {
tnow := time.Now()
m := &Migration{
Identifier: identifier,
Version: version,
TargetVersion: targetVersion,
Scheduled: tnow,
FileType: fileType,
FileName: fileName,
}
if body == nil {
if len(identifier) == 0 {
m.Identifier = "<empty>"
}
m.StartedBuffering = tnow
m.FinishedBuffering = tnow
m.FinishedReading = tnow
return m, nil
}
br, bw := io.Pipe()
m.Body = body // want to simulate low latency? newSlowReader(body)
m.BufferSize = DefaultBufferSize
m.BufferedBody = br
m.bufferWriter = bw
return m, nil
}
// Buffer buffers Body up to BufferSize.
// Calling this function blocks. Call with goroutine.
func (m *Migration) Buffer() error {
var op errors.Op = "migrate.Migration.Buffer"
if m.Body == nil {
return nil
}
m.StartedBuffering = time.Now()
b := bufio.NewReaderSize(m.Body, int(m.BufferSize))
// start reading from body, peek won't move the read pointer though
// poor man's solution?
if _, err := b.Peek(int(m.BufferSize)); err != nil && err != io.EOF {
return errors.E(op, err)
}
m.FinishedBuffering = time.Now()
// write to bufferWriter, this will block until
// something starts reading from m.Buffer
n, err := b.WriteTo(m.bufferWriter)
if err != nil {
return errors.E(op, err)
}
m.FinishedReading = time.Now()
m.BytesRead = n
// close bufferWriter so Buffer knows that there is no
// more data coming
m.bufferWriter.Close()
// it's safe to close the Body too
m.Body.Close()
return nil
}