cli: support squashing migrations with remote joins (close #4908) (#4924)

This commit is contained in:
Aravind 2020-06-16 12:10:20 +05:30 committed by GitHub
parent 13e4edbdf8
commit d7ca702542
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 259 additions and 33 deletions

View File

@ -17,6 +17,102 @@ import (
type CustomQuery linq.Query
func (q CustomQuery) MergeRemoteRelationships(squashList *database.CustomList) error {
remoteRelationshipTransition := transition.New(&remoteRelationshipConfig{})
remoteRelationshipTransition.Initial("new")
remoteRelationshipTransition.State("created")
remoteRelationshipTransition.State("updated")
remoteRelationshipTransition.State("deleted")
remoteRelationshipTransition.Event(createRemoteRelationship).To("created").From("new", "deleted")
remoteRelationshipTransition.Event(updateRemoteRelationship).To("updated").From("new", "created", "updated", "deleted")
remoteRelationshipTransition.Event(deleteRemoteRelationship).To("deleted").From("new", "created", "updated")
next := q.Iterate()
for item, ok := next(); ok; item, ok = next() {
g := item.(linq.Group)
if g.Key == nil {
// ignore this because this is the default value for the key
continue
}
key, ok := g.Key.(remoteRelationshipMap)
if !ok {
continue
}
cfg := remoteRelationshipConfig{
tableName: key.tableName,
schemaName: key.schemaName,
name: key.name,
}
prevElems := make([]*list.Element, 0)
for _, val := range g.Group {
// possible inputs
// 1. create, update, update .....
// 2. create, update, update .........., delete
// 3. update, update, update ..........
// 4. update, update, ...., delete
// 5. update, update, ...., delete, create
element := val.(*list.Element)
switch obj := element.Value.(type) {
case *createRemoteRelationshipInput:
err := remoteRelationshipTransition.Trigger(createRemoteRelationship, &cfg, nil)
if err != nil {
return errors.Wrapf(err, "error squashing: %v", obj.Name)
}
prevElems = append(prevElems, element)
case *updateRemoteRelationshipInput:
if len(prevElems) != 0 {
if _, ok := prevElems[0].Value.(*createRemoteRelationshipInput); ok {
squashList.Remove(prevElems[0])
prevElems = prevElems[:0]
err := remoteRelationshipTransition.Trigger(deleteRemoteRelationship, &cfg, nil)
if err != nil {
return errors.Wrapf(err, "error squashing: %v", obj.Name)
}
element.Value = obj.createRemoteRelationshipInput
prevElems = append(prevElems, element)
err = remoteRelationshipTransition.Trigger(createRemoteRelationship, &cfg, nil)
if err != nil {
return errors.Wrapf(err, "error squashing: %v", obj.Name)
}
continue
}
for _, e := range prevElems {
squashList.Remove(e)
}
prevElems = prevElems[:0]
err := remoteRelationshipTransition.Trigger(deleteRemoteRelationship, &cfg, nil)
if err != nil {
return errors.Wrapf(err, "error squashing: %v", obj.Name)
}
}
prevElems = append(prevElems, element)
err := remoteRelationshipTransition.Trigger(updateRemoteRelationship, &cfg, nil)
if err != nil {
return errors.Wrapf(err, "error squashing: %v", obj.Name)
}
case *deleteRemoteRelationshipInput:
if cfg.GetState() == "created" {
prevElems = append(prevElems, element)
}
err := remoteRelationshipTransition.Trigger(deleteRemoteRelationship, &cfg, nil)
if err != nil {
return err
}
for _, e := range prevElems {
squashList.Remove(e)
}
prevElems = prevElems[:0]
}
}
}
return nil
}
func (q CustomQuery) MergeEventTriggers(squashList *database.CustomList) error {
eventTriggerTransition := transition.New(&eventTriggerConfig{})
eventTriggerTransition.Initial("new")
@ -95,7 +191,6 @@ func (q CustomQuery) MergeRelationships(squashList *database.CustomList) error {
relationshipTransition.Event("drop_relationship").To("dropped").From("new", "created")
next := q.Iterate()
for item, ok := next(); ok; item, ok = next() {
g := item.(linq.Group)
if g.Key == nil {
@ -440,6 +535,21 @@ func (q CustomQuery) MergeTables(squashList *database.CustomList) error {
return fmt.Errorf("cannot set table %s on schema %s has a enum when it is untracked", tblCfg.name, tblCfg.schema)
}
prevElems = append(prevElems, element)
case *createRemoteRelationshipInput:
if tblCfg.GetState() == "untracked" {
return fmt.Errorf("cannot create remote relationship on %s when table %s on schema %s is untracked", args.Name, tblCfg.name, tblCfg.schema)
}
prevElems = append(prevElems, element)
case *deleteRemoteRelationshipInput:
if tblCfg.GetState() == "untracked" {
return fmt.Errorf("cannot delete remote relationship on %s when table %s on schema %s is untracked", args.Name, tblCfg.name, tblCfg.schema)
}
prevElems = append(prevElems, element)
case *updateRemoteRelationshipInput:
if tblCfg.GetState() == "untracked" {
return fmt.Errorf("cannot update remote relationship on %s when table %s on schema %s is untracked", args.Name, tblCfg.name, tblCfg.schema)
}
prevElems = append(prevElems, element)
}
}
}
@ -616,13 +726,13 @@ func (q CustomQuery) MergeAllowLists(squashList *database.CustomList) error {
for _, val := range g.Group {
element := val.(*list.Element)
switch element.Value.(type) {
case *addRemoteSchemaInput:
case *addCollectionToAllowListInput:
err := allowListTransition.Trigger("add_collection_to_allowlist", &alCfg, nil)
if err != nil {
return err
}
prevElems = append(prevElems, element)
case *removeRemoteSchemaInput:
case *dropCollectionFromAllowListInput:
if alCfg.GetState() == "added" {
prevElems = append(prevElems, element)
}
@ -720,32 +830,9 @@ func (q CustomQuery) MergeQueryCollections(squashList *database.CustomList) erro
return nil
}
type customList struct {
*list.List
}
func (c *customList) Iterate() linq.Iterator {
length := c.Len()
var prevElem *list.Element
i := 0
return func() (item interface{}, ok bool) {
if length == 0 {
return
}
if i == 0 {
prevElem = c.Front()
i++
} else {
prevElem = prevElem.Next()
if prevElem == nil {
return
}
}
return prevElem, true
}
}
// PushList will read migration from source
// for an sql migration it'll append it to the LinkedList
// for a meta migration it'll append after some processing
func (h *HasuraDB) PushToList(migration io.Reader, fileType string, l *database.CustomList) error {
migr, err := ioutil.ReadAll(migration)
if err != nil {
@ -799,17 +886,40 @@ func (h *HasuraDB) PushToList(migration io.Reader, fileType string, l *database.
}
}
l.PushBack(actionType)
case *createRemoteRelationshipInput, *updateRemoteRelationshipInput:
if v.Type == updateRemoteRelationship {
createRemoteRelationship, ok := v.Args.(*createRemoteRelationshipInput)
if !ok {
continue
}
o := &updateRemoteRelationshipInput{
createRemoteRelationship,
}
l.PushBack(o)
}
if v.Type == createRemoteRelationship {
o, ok := v.Args.(*createRemoteRelationshipInput)
if !ok {
break
}
l.PushBack(o)
}
default:
l.PushBack(actionType)
}
}
default:
return fmt.Errorf("Invalid migration file type")
return fmt.Errorf("invalid migration file type")
}
return nil
}
func (h *HasuraDB) Squash(l *database.CustomList, ret chan<- interface{}) {
// get all event triggers groups
// ie let's say I have 2 event triggers named
// trigger1 and trigger2
// then I'll have two groups each containing elements
// corresponding to each trigger
eventTriggersGroup := CustomQuery(linq.FromIterable(l).GroupByT(
func(element *list.Element) string {
switch args := element.Value.(type) {
@ -828,6 +938,37 @@ func (h *HasuraDB) Squash(l *database.CustomList, ret chan<- interface{}) {
ret <- err
}
remoteRelationShipsGroup := CustomQuery(linq.FromIterable(l).GroupByT(
func(element *list.Element) interface{} {
switch args := element.Value.(type) {
case *createRemoteRelationshipInput:
return remoteRelationshipMap{
tableName: args.Table.Name,
schemaName: args.Table.Schema,
name: args.Name,
}
case *updateRemoteRelationshipInput:
return remoteRelationshipMap{
tableName: args.Table.Name,
schemaName: args.Table.Schema,
name: args.Name,
}
case *deleteRemoteRelationshipInput:
return remoteRelationshipMap{
tableName: args.Table.Name,
schemaName: args.Table.Schema,
name: args.Name,
}
}
return nil
}, func(element *list.Element) *list.Element {
return element
},
))
err = remoteRelationShipsGroup.MergeRemoteRelationships(l)
if err != nil {
ret <- err
}
relationshipsGroup := CustomQuery(linq.FromIterable(l).GroupByT(
func(element *list.Element) interface{} {
switch args := element.Value.(type) {
@ -1280,6 +1421,12 @@ func (h *HasuraDB) Squash(l *database.CustomList, ret chan<- interface{}) {
q.Type = addComputedField
case *dropComputedFieldInput:
q.Type = dropComputedField
case *createRemoteRelationshipInput:
q.Type = createRemoteRelationship
case *updateRemoteRelationshipInput:
q.Type = updateRemoteRelationship
case *deleteRemoteRelationshipInput:
q.Type = deleteRemoteRelationship
case *RunSQLInput:
ret <- []byte(args.SQL)
continue

View File

@ -43,6 +43,25 @@ type newHasuraIntefaceQuery struct {
Args interface{} `json:"args" yaml:"args"`
}
type deleteRemoteRelationshipInput struct {
Name string `json:"name" yaml:"name"`
Table tableSchema `json:"table" yaml:"table"`
}
type remoteRelationshipDefinition struct {
HasuraFields []string `yaml:"hasura_fields" json:"hasura_fields"`
Name string `yaml:"name" json:"name"`
RemoteField map[string]interface{} `yaml:"remote_field" json:"remote_field"`
RemoteSchema string `yaml:"remote_schema" json:"remote_schema"`
}
type createRemoteRelationshipInput struct {
remoteRelationshipDefinition
Table tableSchema `yaml:"table" json:"table"`
}
type updateRemoteRelationshipInput struct {
*createRemoteRelationshipInput
}
func (h *newHasuraIntefaceQuery) UnmarshalJSON(b []byte) error {
type t newHasuraIntefaceQuery
var q t
@ -128,6 +147,12 @@ func (h *newHasuraIntefaceQuery) UnmarshalJSON(b []byte) error {
q.Args = &addComputedFieldInput{}
case dropComputedField:
q.Args = &dropComputedFieldInput{}
case deleteRemoteRelationship:
q.Args = &deleteRemoteRelationshipInput{}
case createRemoteRelationship:
q.Args = &createRemoteRelationshipInput{}
case updateRemoteRelationship:
q.Args = &createRemoteRelationshipInput{}
default:
return fmt.Errorf("cannot squash type %s", q.Type)
}
@ -285,6 +310,9 @@ const (
bulkQuery = "bulk"
addComputedField = "add_computed_field"
dropComputedField = "drop_computed_field"
createRemoteRelationship = "create_remote_relationship"
updateRemoteRelationship = "update_remote_relationship"
deleteRemoteRelationship = "delete_remote_relationship"
)
type tableMap struct {
@ -307,6 +335,10 @@ type queryInCollectionMap struct {
collectionName, queryName string
}
type remoteRelationshipMap struct {
tableName, schemaName, name string
}
type tableSchema struct {
Name string `json:"name" yaml:"name"`
Schema string `json:"schema" yaml:"schema"`
@ -602,6 +634,9 @@ type dropComputedFieldInput struct {
type clearMetadataInput struct {
}
type remoteRelationships []struct {
Definiton remoteRelationshipDefinition `json:"definiton" yaml:"definiton"`
}
type replaceMetadataInput struct {
Tables []struct {
Table tableSchema `json:"table" yaml:"table"`
@ -613,6 +648,7 @@ type replaceMetadataInput struct {
DeletePermissions []*createDeletePermissionInput `json:"delete_permissions" yaml:"delete_permissions"`
EventTriggers []*createEventTriggerInput `json:"event_triggers" yaml:"event_triggers"`
ComputedFields []*addComputedFieldInput `json:"computed_fields" yaml:"computed_fields"`
RemoteRelationships *remoteRelationships `json:"remote_relationships" yaml:"remote_relationships"`
Configuration *tableConfiguration `json:"configuration" yaml:"configuration"`
} `json:"tables" yaml:"tables"`
Functions []*trackFunctionInput `json:"functions" yaml:"functions"`
@ -723,6 +759,20 @@ func (rmi *replaceMetadataInput) convertToMetadataActions(l *database.CustomList
l.PushBack(cf)
}
}
for _, table := range rmi.Tables {
for _, remoteRelationship := range *table.RemoteRelationships {
r := createRemoteRelationshipInput{
remoteRelationshipDefinition: remoteRelationship.Definiton,
Table: tableSchema{
Name: table.Table.Name,
Schema: table.Table.Schema,
},
}
l.PushBack(r)
}
}
// track functions
for _, function := range rmi.Functions {
l.PushBack(function)
@ -917,3 +967,8 @@ type allowListConfig struct {
collection string
transition.Transition
}
type remoteRelationshipConfig struct {
tableName, schemaName, name string
transition.Transition
}

View File

@ -421,10 +421,14 @@ func (m *Migrate) Squash(v uint64) (vs []int64, um []interface{}, us []byte, dm
}
// concurrently squash all the up migrations
// read all up migrations from source and send each migration
// to the returned channel
retUp := make(chan interface{}, m.PrefetchMigrations)
go m.squashUp(v, retUp)
// concurrently squash all down migrations
// read all down migrations from source and send each migration
// to the returned channel
retDown := make(chan interface{}, m.PrefetchMigrations)
go m.squashDown(v, retDown)
@ -696,19 +700,32 @@ func (m *Migrate) squashUp(version uint64, ret chan<- interface{}) {
for limit == -1 {
if currentVersion == version {
// during the first iteration of the loop
// check if a next version exists for "--from" version
if err := m.versionUpExists(version); err != nil {
ret <- err
return
}
// If next version exists this function will return an instance of
// migration.go.Migrate struct
// this reads the SQL up migration
// even if a migration file does'nt exist in the source
// a empty migration will be returned
migr, err := m.newMigration(version, int64(version))
if err != nil {
ret <- err
return
}
ret <- migr
// write the body of the migration to reader
// the migr instance sent via the channel will then start reading
// from it
go migr.Buffer()
// read next version of meta up migration
// even if a migration file does'nt exist in the source
// a empty migration will be returned
migr, err = m.metanewMigration(version, int64(version))
if err != nil {
ret <- err
@ -719,7 +736,9 @@ func (m *Migrate) squashUp(version uint64, ret chan<- interface{}) {
count++
}
// apply next migration
// get the next version using source driver
// earlier in the first iteration we knew what version to operate on
// but here we have to find the next version
next, err := m.sourceDrv.Next(currentVersion)
if os.IsNotExist(err) {
// no limit, but no migrations applied?
@ -727,7 +746,7 @@ func (m *Migrate) squashUp(version uint64, ret chan<- interface{}) {
ret <- ErrNoChange
return
}
// when there is no more migrations return
if limit == -1 {
return
}
@ -769,6 +788,7 @@ func (m *Migrate) squashUp(version uint64, ret chan<- interface{}) {
func (m *Migrate) squashDown(version uint64, ret chan<- interface{}) {
defer close(ret)
// get the last version from the source driver
from, err := m.sourceDrv.GetLocalVersion()
if err != nil {
ret <- err
@ -1261,6 +1281,7 @@ func (m *Migrate) squashMigrations(retUp <-chan interface{}, retDown <-chan inte
case *Migration:
migr := r.(*Migration)
if migr.Body != nil {
// read migration body and push it to squash list
if err = m.databaseDrv.PushToList(migr.BufferedBody, migr.FileType, &squashList); err != nil {
dataUp <- err
return
@ -1389,6 +1410,7 @@ func (m *Migrate) versionDownExists(version uint64) error {
// newMigration is a helper func that returns a *Migration for the
// specified version and targetVersion (sql).
// will return the down migration
func (m *Migrate) newMigration(version uint64, targetVersion int64) (*Migration, error) {
var migr *Migration

View File

@ -85,7 +85,9 @@ export const saveRemoteRelationship = (
}
// Apply migrations
const migrationName = `table_${table.name}_create_remote_relationship_${state.name}`;
const migrationName = `table_${table.name}_${
isNew ? 'create' : 'update'
}_remote_relationship_${state.name}`;
const requestMsg = `${
isNew ? 'Creating' : 'Updating'