cli(migrations): new folder structure and squash (#3072)

### Description
This PR introduces three new features:

- Support for a new migrations folder structure.
- Add `squash` command in preview.
- ~List of migrations on the Console and ability to squash them from console.~

#### New migrations folder structure

Starting with this commit, Hasura CLI supports a new directory structure for migrations folder and defaults to that for all new migrations created. 

Each migration will get a new directory with the name format `timestamp_name` and inside the directory, there will be four files:

```bash
└── migrations
    ├── 1572237730898_squashed
    │   ├── up.sql
    │   ├── up.yaml
    │   ├── down.yaml
    │   └── down.sql
```

Existing files old migration format `timestamp_name.up|down.yaml|sql` will continue to work alongside new migration files.

#### Squash command

Lots of users have expressed their interest in squashing migrations (see #2724 and #2254) and some even built [their own tools](https://github.com/domasx2/hasura-squasher) to do squash. In this PR, we take a systematic approach to squash migrations.

A new command called `migrate squash` is introduced. Note that this command is in **PREVIEW** and the correctness of squashed migration is not guaranteed (especially for down migrations). From our tests, **it works for most use cases**, but we have found some issues with squashing all the down migrations, partly because the console doesn't generate down migrations for all actions.

Hence, until we add an extensive test suite for squashing, we'll keep the command in preview. We recommend you to confirm the correctness yourself by diffing the SQL and Metadata before and after applying the squashed migrations (we're also thinking about embedding some checks into the command itself).

```bash
$ hasura migrate squash --help
(PREVIEW) Squash multiple migrations leading upto the latest one into a single migration file

Usage:
  hasura migrate squash [flags]

Examples:
  # NOTE: This command is in PREVIEW, correctness is not guaranteed and the usage may change.

  # squash all migrations from version 1572238297262 to the latest one:
  hasura migrate squash --from 1572238297262

Flags:
      --from uint             start squashing form this version
      --name string           name for the new squashed migration (default "squashed")
      --delete-source         delete the source files after squashing without any confirmation
```

### Affected components 
<!-- Remove non-affected components from the list -->

- CLI

### Related Issues
<!-- Please make sure you have an issue associated with this Pull Request -->
<!-- And then add `(close #<issue-no>)` to the pull request title -->
<!-- Add the issue number below (e.g. #234) -->
Close #2724, Close #2254, 

### Solution and Design
<!-- How is this issue solved/fixed? What is the design? -->
<!-- It's better if we elaborate -->

For the squash command, a state machine is implemented to track changes to Hasura metadata. After applying each action on the metadata state, a list of incremental changes is created.

### Steps to test and verify
1. Open console via cli and create some migrations.
2. Run `hasura migrate squash --from <version>`

### Limitations, known bugs & workarounds
<!-- Limitations of the PR, known bugs and suggested workarounds -->

<!-- Feel free to delete these comment lines -->
- The `squash` command is in preview
- Support for squashing from the console is WIP
- Support for squashing migrations that are not committed yet is planned.
- Un-tracking or dropping a table will cause inconsistent squashed down migration since console doesn't generate correct down migration.
- If cascade setting is set to `true` on any of the metadata action, generated migration may be wrong
This commit is contained in:
Aravind Shankar 2019-10-31 07:51:15 +05:30 committed by Shahidh K Muhammed
parent d6a649095c
commit 980c65dbe2
17 changed files with 2859 additions and 46 deletions

206
cli/Gopkg.lock generated
View File

@ -17,6 +17,22 @@
revision = "7da180ee92d8bd8bb8c37fc560e673e6557c392f"
version = "v0.4.7"
[[projects]]
digest = "1:31bd9d70492aec37a6bf45a42e96ed0f37fab74aa619d539b5acb9477500eb79"
name = "github.com/ahmetb/go-linq"
packages = ["."]
pruneopts = "UT"
revision = "b1b02a14bd27fab06f03ca1e4eed893f5011c152"
version = "v3.1.0"
[[projects]]
digest = "1:320e7ead93de9fd2b0e59b50fd92a4d50c1f8ab455d96bc2eb083267453a9709"
name = "github.com/asaskevich/govalidator"
packages = ["."]
pruneopts = "UT"
revision = "ccb8e960c48f04d6935e72476ae4a51028f9e22f"
version = "v9"
[[projects]]
branch = "master"
digest = "1:67f5c19d64f788aa79a8e612eefa1cc68dca46c3436c93b84af9c95bd7e1a556"
@ -204,6 +220,38 @@
pruneopts = "UT"
revision = "5a0f697c9ed9d68fef0116532c6e05cfeae00e55"
[[projects]]
digest = "1:c79fb010be38a59d657c48c6ba1d003a8aa651fa56b579d959d74573b7dff8e1"
name = "github.com/gorilla/context"
packages = ["."]
pruneopts = "UT"
revision = "08b5f424b9271eedf6f9f0ce86cb9396ed337a42"
version = "v1.1.1"
[[projects]]
digest = "1:e72d1ebb8d395cf9f346fd9cbc652e5ae222dd85e0ac842dc57f175abed6d195"
name = "github.com/gorilla/securecookie"
packages = ["."]
pruneopts = "UT"
revision = "e59506cc896acb7f7bf732d4fdf5e25f7ccd8983"
version = "v1.1.1"
[[projects]]
digest = "1:172c862eabc72e90f461bcef223c49869628bec6d989386dfb03281ae3222148"
name = "github.com/gorilla/sessions"
packages = ["."]
pruneopts = "UT"
revision = "4355a998706e83fe1d71c31b07af94e34f68d74a"
version = "v1.2.0"
[[projects]]
digest = "1:17275ec8407e731ae22cc511d52391b2ac31982f23b53b1100265c575dc5cc9f"
name = "github.com/gosimple/slug"
packages = ["."]
pruneopts = "UT"
revision = "984b6d1a0ae5d1ecf6d718f4e990883d281ba3b8"
version = "v1.7.0"
[[projects]]
branch = "master"
digest = "1:a361611b8c8c75a1091f00027767f7779b29cb37c456a71b8f2604c88057ab40"
@ -231,6 +279,30 @@
revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75"
version = "v1.0"
[[projects]]
digest = "1:0ba2632215132e946413632241c2f9e67439addbe5f243b86bb81199bd15c8e9"
name = "github.com/jinzhu/gorm"
packages = ["."]
pruneopts = "UT"
revision = "81c17a7e2529c59efc4e74c5b32c1fb71fb12fa2"
version = "v1.9.11"
[[projects]]
digest = "1:01ed62f8f4f574d8aff1d88caee113700a2b44c42351943fa73cc1808f736a50"
name = "github.com/jinzhu/inflection"
packages = ["."]
pruneopts = "UT"
revision = "f5c5f50e6090ae76a29240b61ae2a90dd810112e"
version = "v1.0.0"
[[projects]]
digest = "1:e668ebc4cec3a084c222b4255a1565ee652614f029a7b2f4b1bc9557565ed473"
name = "github.com/jinzhu/now"
packages = ["."]
pruneopts = "UT"
revision = "1a6fdd4591d336a897120c50c69ed3c0e8033468"
version = "v1.0.1"
[[projects]]
branch = "master"
digest = "1:e51f40f0c19b39c1825eadd07d5c0a98a2ad5942b166d9fc4f54750ce9a04810"
@ -250,6 +322,22 @@
pruneopts = "UT"
revision = "ae77be60afb1dcacde03767a8c37337fad28ac14"
[[projects]]
digest = "1:ca955a9cd5b50b0f43d2cc3aeb35c951473eeca41b34eb67507f1dbcc0542394"
name = "github.com/kr/pretty"
packages = ["."]
pruneopts = "UT"
revision = "73f6ac0b30a98e433b289500d779f50c1a6f0712"
version = "v0.1.0"
[[projects]]
digest = "1:15b5cc79aad436d47019f814fde81a10221c740dc8ddf769221a65097fb6c2e9"
name = "github.com/kr/text"
packages = ["."]
pruneopts = "UT"
revision = "e2ffdb16a802fe2bb95e2e35ff34f0e53aeef34f"
version = "v0.1.0"
[[projects]]
branch = "master"
digest = "1:37ce7d7d80531b227023331002c0d42b4b4b291a96798c82a049d03a54ba79e4"
@ -305,6 +393,14 @@
revision = "0360b2af4f38e8d38c7fce2a9f4e702702d73a39"
version = "v0.0.3"
[[projects]]
digest = "1:6ff1026b8d873d074ddec82ad60bb0eb537dc013f456ef26b63d30beba2f99e7"
name = "github.com/microcosm-cc/bluemonday"
packages = ["."]
pruneopts = "UT"
revision = "506f3da9b7c86d737e91f16b7431df8635871552"
version = "v1.0.2"
[[projects]]
branch = "master"
digest = "1:2b32af4d2a529083275afc192d1067d8126b578c7a9613b26600e4df9c735155"
@ -377,6 +473,102 @@
revision = "792786c7400a136282c1664665ae0a8db921c6c2"
version = "v1.0.0"
[[projects]]
digest = "1:88dfda28c9738dd59becb9bbc449120ecef8b1d54e32e6004654bb66e51ef897"
name = "github.com/qor/admin"
packages = ["."]
pruneopts = "UT"
revision = "b2f472167d028a04c736b4a2477d7cce1e578b8f"
version = "v1.1"
[[projects]]
branch = "master"
digest = "1:d2bdad175842f83799e02819fa718858ebde6f8704101a7dfa0a4a8e8742e587"
name = "github.com/qor/assetfs"
packages = ["."]
pruneopts = "UT"
revision = "ff57fdc13a148d39c3e930cb27c7eaf82e376891"
[[projects]]
digest = "1:810e030da1090675a7f04c88e6de987dfc8cf1236d502dd0b5fb63cd6cc58481"
name = "github.com/qor/audited"
packages = ["."]
pruneopts = "UT"
revision = "b52c9c2f0571fc6e56dcdf2b7d3ae36d124602d4"
version = "v1.1"
[[projects]]
branch = "master"
digest = "1:d221e9b7e68759e27735dd1929cc37c916eb814cc5ef5bacf533d47d1ca55ff1"
name = "github.com/qor/middlewares"
packages = ["."]
pruneopts = "UT"
revision = "781378b69454a545d08058d8154aca40d079e7ab"
[[projects]]
digest = "1:7f19097491f6d7e42cf4babafbfbbdb4ed20985a1bbb2c2efa2e98dfa4eee696"
name = "github.com/qor/qor"
packages = [
".",
"resource",
"utils",
]
pruneopts = "UT"
revision = "186b0237364b23ebbe564d0764c5b8523c953575"
version = "v1.1"
[[projects]]
digest = "1:1b90231d81fce4a2c61be9bbae0e6cc35556ab7bf87e574e9dc334a1985125df"
name = "github.com/qor/responder"
packages = ["."]
pruneopts = "UT"
revision = "b6def473574f621fee316696ad120d4fbf470826"
version = "v1.1"
[[projects]]
digest = "1:be8425925ff99bd3fb026add07beb8069fed5c8e199b1cd55a871f832d9f6688"
name = "github.com/qor/roles"
packages = ["."]
pruneopts = "UT"
revision = "d6375609fe3e5da46ad3a574fae244fb633e79c1"
version = "v1.1"
[[projects]]
branch = "master"
digest = "1:b4e301b2efbd1820b3fcd612e86429ed64e1ddf159cd41ffdcd67947c1235180"
name = "github.com/qor/session"
packages = [
".",
"gorilla",
"manager",
]
pruneopts = "UT"
revision = "8206b0adab706a6ef3ee6fabba2584d34429a26a"
[[projects]]
digest = "1:c678ac04f0114299ce7d03817c57470fec0ee23f1f547fcb95a6979e72c7a88f"
name = "github.com/qor/transition"
packages = ["."]
pruneopts = "UT"
revision = "4015a3eee19c49a63b1d22beab1c0c084e72c53b"
version = "v1.1"
[[projects]]
digest = "1:85cb5adb402858878704bc087b8f74fb12d1e56bddbba38e3d94693a14ad93f4"
name = "github.com/qor/validations"
packages = ["."]
pruneopts = "UT"
revision = "f364bca61b46bd48a5e32552a37758864fdf005d"
version = "v1.1"
[[projects]]
branch = "master"
digest = "1:e6a29574542c00bb18adb1bfbe629ff88c468c2af2e2e953d3e58eda07165086"
name = "github.com/rainycape/unidecode"
packages = ["."]
pruneopts = "UT"
revision = "cb7f23ec59bec0d61b19c56cd88cee3d0cc1870c"
[[projects]]
digest = "1:8bc629776d035c003c7814d4369521afe67fdb8efc4b5f66540d29343b98cf23"
name = "github.com/russross/blackfriday"
@ -466,6 +658,14 @@
revision = "f35b8ab0b5a2cef36673838d662e249dd9c94686"
version = "v1.2.2"
[[projects]]
branch = "master"
digest = "1:4b0666e7398b9b64deee136f063d9caf0a58a7de52a1ab6540664615b38054ec"
name = "github.com/theplant/cldr"
packages = ["."]
pruneopts = "UT"
revision = "9f76f7ce4ee8058b01f92739cf0d0cceaeda3cb2"
[[projects]]
digest = "1:c268acaa4a4d94a467980e5e91452eb61c460145765293dc0aed48e5e9919cc6"
name = "github.com/ugorji/go"
@ -483,11 +683,13 @@
[[projects]]
branch = "master"
digest = "1:aa91545b200c0c8e6cf39bb8eedc86d85d3ed0a208662ed964bef0433839fd3c"
digest = "1:bb2e0c5269c54b816dfc046d5c8668bd38a54e8020f688dbabed7e747b00651a"
name = "golang.org/x/net"
packages = [
"context",
"context/ctxhttp",
"html",
"html/atom",
"idna",
"internal/socks",
"proxy",
@ -551,6 +753,7 @@
analyzer-version = 1
input-imports = [
"github.com/Masterminds/semver",
"github.com/ahmetb/go-linq",
"github.com/aryann/difflib",
"github.com/briandowns/spinner",
"github.com/docker/docker/api/types",
@ -575,6 +778,7 @@
"github.com/oliveagle/jsonpath",
"github.com/parnurzeal/gorequest",
"github.com/pkg/errors",
"github.com/qor/transition",
"github.com/sirupsen/logrus",
"github.com/sirupsen/logrus/hooks/test",
"github.com/skratchdot/open-golang/open",

View File

@ -95,3 +95,11 @@
[[constraint]]
branch = "master"
name = "github.com/kardianos/osext"
[[constraint]]
name = "github.com/ahmetb/go-linq"
version = "3.1.0"
[[constraint]]
name = "github.com/qor/transition"
version = "1.1.0"

View File

@ -193,6 +193,11 @@ func (r *cRouter) setRoutes(migrationDir, metadataFile string, logger *logrus.Lo
{
settingsAPIs.Any("", api.SettingsAPI)
}
squashAPIs := migrateAPIs.Group("/squash")
{
squashAPIs.POST("/create", api.SquashCreateAPI)
squashAPIs.POST("/delete", api.SquashDeleteAPI)
}
migrateAPIs.Any("", api.MigrateAPI)
}
// Migrate api endpoints and middleware

View File

@ -30,6 +30,7 @@ func NewMigrateCmd(ec *cli.ExecutionContext) *cobra.Command {
newMigrateApplyCmd(ec),
newMigrateStatusCmd(ec),
newMigrateCreateCmd(ec),
newMigrateSquashCmd(ec),
)
return migrateCmd
}

View File

@ -0,0 +1,143 @@
package commands
import (
"bytes"
"fmt"
"strconv"
"strings"
"text/tabwriter"
"github.com/hasura/graphql-engine/cli"
"github.com/hasura/graphql-engine/cli/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
mig "github.com/hasura/graphql-engine/cli/migrate/cmd"
)
func newMigrateSquashCmd(ec *cli.ExecutionContext) *cobra.Command {
v := viper.New()
opts := &migrateSquashOptions{
EC: ec,
}
migrateSquashCmd := &cobra.Command{
Use: "squash",
Short: "(PREVIEW) Squash multiple migrations into a single one",
Long: "(PREVIEW) Squash multiple migrations leading upto the latest one into a single migration file",
Example: ` # NOTE: This command is in PREVIEW, correctness is not guaranteed and the usage may change.
# squash all migrations from version 123 to the latest one:
hasura migrate squash --from 123`,
SilenceUsage: true,
PreRunE: func(cmd *cobra.Command, args []string) error {
ec.Viper = v
return ec.Validate()
},
RunE: func(cmd *cobra.Command, args []string) error {
opts.newVersion = getTime()
return opts.run()
},
}
f := migrateSquashCmd.Flags()
f.Uint64Var(&opts.from, "from", 0, "start squashing form this version")
f.StringVar(&opts.name, "name", "squashed", "name for the new squashed migration")
f.BoolVar(&opts.deleteSource, "delete-source", false, "delete the source files after squashing without any confirmation")
f.String("endpoint", "", "http(s) endpoint for Hasura GraphQL Engine")
f.String("admin-secret", "", "admin secret for Hasura GraphQL Engine")
f.String("access-key", "", "access key for Hasura GraphQL Engine")
f.MarkDeprecated("access-key", "use --admin-secret instead")
// need to create a new viper because https://github.com/spf13/viper/issues/233
v.BindPFlag("endpoint", f.Lookup("endpoint"))
v.BindPFlag("admin_secret", f.Lookup("admin-secret"))
v.BindPFlag("access_key", f.Lookup("access-key"))
return migrateSquashCmd
}
type migrateSquashOptions struct {
EC *cli.ExecutionContext
from uint64
name string
newVersion int64
deleteSource bool
}
func (o *migrateSquashOptions) run() error {
o.EC.Logger.Warnln("This command is currently experimental and hence in preview, correctness of squashed migration is not guaranteed!")
o.EC.Spin(fmt.Sprintf("Squashing migrations from %d to latest...", o.from))
defer o.EC.Spinner.Stop()
migrateDrv, err := newMigrate(o.EC.MigrationDir, o.EC.ServerConfig.ParsedEndpoint, o.EC.ServerConfig.AdminSecret, o.EC.Logger, o.EC.Version, true)
if err != nil {
return errors.Wrap(err, "unable to initialize migrations driver")
}
versions, err := mig.SquashCmd(migrateDrv, o.from, o.newVersion, o.name, o.EC.MigrationDir)
o.EC.Spinner.Stop()
if err != nil {
return errors.Wrap(err, "unable to squash migrations")
}
// squashed migration is generated
// TODO: capture keyboard interrupt and offer to delete the squashed migration
o.EC.Logger.Infof("Created '%d_%s' after squashing '%d' till '%d'", o.newVersion, o.name, versions[0], versions[len(versions)-1])
if !o.deleteSource {
ok := ask2confirmDeleteMigrations(versions, o.EC.Logger)
if !ok {
return nil
}
}
for _, v := range versions {
delOptions := mig.CreateOptions{
Version: strconv.FormatInt(v, 10),
Directory: o.EC.MigrationDir,
}
err = delOptions.Delete()
if err != nil {
return errors.Wrap(err, "unable to delete source file")
}
}
return nil
}
func ask2confirmDeleteMigrations(versions []int64, log *logrus.Logger) bool {
var s string
log.Infof("The following migrations are squashed into a new one:")
out := new(tabwriter.Writer)
buf := &bytes.Buffer{}
out.Init(buf, 0, 8, 2, ' ', 0)
w := util.NewPrefixWriter(out)
for _, version := range versions {
w.Write(util.LEVEL_0, "%d\n",
version,
)
}
_ = out.Flush()
fmt.Println(buf.String())
log.Infof("Do you want to delete these migration source files? (y/N)")
_, err := fmt.Scan(&s)
if err != nil {
log.Error("unable to take user input, skipping deleting files")
return false
}
s = strings.TrimSpace(s)
s = strings.ToLower(s)
if s == "y" || s == "yes" {
return true
}
return false
}

View File

@ -55,6 +55,19 @@ func MigrateAPI(c *gin.Context) {
// Switch on request method
switch c.Request.Method {
case "GET":
// Rescan file system
err := t.ReScan()
if err != nil {
c.JSON(http.StatusInternalServerError, &Response{Code: "internal_error", Message: err.Error()})
return
}
status, err := t.GetStatus()
if err != nil {
c.JSON(http.StatusInternalServerError, &Response{Code: "internal_error", Message: "Something went wrong"})
return
}
c.JSON(http.StatusOK, status)
case "POST":
var request Request
@ -65,7 +78,6 @@ func MigrateAPI(c *gin.Context) {
}
startTime := time.Now()
// Convert to Millisecond
timestamp := startTime.UnixNano() / int64(time.Millisecond)
createOptions := cmd.New(timestamp, request.Name, sourceURL.Path)

View File

@ -8,7 +8,7 @@ import (
"github.com/hasura/graphql-engine/cli/migrate"
)
type SettingReqeust struct {
type SettingRequest struct {
Name string `json:"name"`
Value string `json:"value"`
}
@ -37,7 +37,7 @@ func SettingsAPI(c *gin.Context) {
}
c.JSON(200, &gin.H{name: setting})
case "PUT":
var request SettingReqeust
var request SettingRequest
// Bind Request body to Request struct
if c.BindJSON(&request) != nil {
c.JSON(500, &Response{Code: "internal_error", Message: "Something went wrong"})

108
cli/migrate/api/squash.go Normal file
View File

@ -0,0 +1,108 @@
package api
import (
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/hasura/graphql-engine/cli/migrate"
"github.com/hasura/graphql-engine/cli/migrate/cmd"
mig "github.com/hasura/graphql-engine/cli/migrate/cmd"
)
type squashCreateRequest struct {
Name string `json:"name"`
From uint64 `json:"from"`
version int64
}
type squashDeleteRequest struct {
Versions []int64 `json:"migrations"`
}
func (s *squashCreateRequest) setDefaults() {
if s.Name == "" {
s.Name = "default_squash"
}
startTime := time.Now()
s.version = startTime.UnixNano() / int64(time.Millisecond)
}
func SquashCreateAPI(c *gin.Context) {
migratePtr, ok := c.Get("migrate")
if !ok {
return
}
sourcePtr, ok := c.Get("filedir")
if !ok {
return
}
t := migratePtr.(*migrate.Migrate)
sourceURL := sourcePtr.(*url.URL)
var request squashCreateRequest
// Bind Request body to Request struct
if c.BindJSON(&request) != nil {
c.JSON(500, &Response{Code: "internal_error", Message: "Something went wrong"})
return
}
request.setDefaults()
// Rescan file system
err := t.ReScan()
if err != nil {
c.JSON(http.StatusInternalServerError, &Response{Code: "internal_error", Message: err.Error()})
return
}
versions, err := cmd.SquashCmd(t, request.From, request.version, request.Name, sourceURL.Path)
if err != nil {
if strings.HasPrefix(err.Error(), DataAPIError) {
c.JSON(http.StatusBadRequest, &Response{Code: "data_api_error", Message: strings.TrimPrefix(err.Error(), DataAPIError)})
return
}
if err == migrate.ErrNoMigrationMode {
c.JSON(http.StatusBadRequest, &Response{Code: "migration_mode_disabled", Message: err.Error()})
return
}
c.JSON(http.StatusInternalServerError, &Response{Code: "internal_error", Message: err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"version": request.version, "squashed_migrations": versions})
}
func SquashDeleteAPI(c *gin.Context) {
sourcePtr, ok := c.Get("filedir")
if !ok {
return
}
sourceURL := sourcePtr.(*url.URL)
var request squashDeleteRequest
// Bind Request body to Request struct
if c.BindJSON(&request) != nil {
c.JSON(500, &Response{Code: "internal_error", Message: "Something went wrong"})
return
}
for _, v := range request.Versions {
delOptions := mig.CreateOptions{
Version: strconv.FormatInt(v, 10),
Directory: sourceURL.Path,
}
err := delOptions.Delete()
if err != nil {
c.JSON(500, &Response{Code: "internal_error", Message: "Something went wrong"})
return
}
}
c.JSON(http.StatusOK, gin.H{"message": "Migrations deleted"})
}

View File

@ -2,16 +2,17 @@ package cmd
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"github.com/ghodss/yaml"
"github.com/hasura/graphql-engine/cli/migrate"
"github.com/pkg/errors"
)
const (
@ -22,7 +23,7 @@ const (
var ext = []string{sqlFile, yamlFile}
type CreateOptions struct {
Version int64
Version string
Directory string
Name string
MetaUp []byte
@ -32,11 +33,12 @@ type CreateOptions struct {
}
func New(version int64, name, directory string) *CreateOptions {
v := strconv.FormatInt(version, 10)
if runtime.GOOS == "windows" {
directory = strings.TrimPrefix(directory, "/")
}
return &CreateOptions{
Version: version,
Version: v,
Directory: directory,
Name: name,
}
@ -112,9 +114,8 @@ func (c *CreateOptions) SetSQLDown(data string) error {
}
func (c *CreateOptions) Create() error {
fileName := fmt.Sprintf("%v_%v.", c.Version, c.Name)
base := filepath.Join(c.Directory, fileName)
err := os.MkdirAll(c.Directory, os.ModePerm)
path := filepath.Join(c.Directory, fmt.Sprintf("%s_%s", c.Version, c.Name))
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return err
}
@ -126,7 +127,7 @@ func (c *CreateOptions) Create() error {
if c.MetaUp != nil {
// Create MetaUp
err = createFile(base+"up.yaml", c.MetaUp)
err = createFile(filepath.Join(path, "up.yaml"), c.MetaUp)
if err != nil {
return err
}
@ -134,7 +135,7 @@ func (c *CreateOptions) Create() error {
if c.MetaDown != nil {
// Create MetaDown
err = createFile(base+"down.yaml", c.MetaDown)
err = createFile(filepath.Join(path, "down.yaml"), c.MetaDown)
if err != nil {
return err
}
@ -142,7 +143,7 @@ func (c *CreateOptions) Create() error {
if c.SQLUp != nil {
// Create SQLUp
err = createFile(base+"up.sql", c.SQLUp)
err = createFile(filepath.Join(path, "up.sql"), c.SQLUp)
if err != nil {
return err
}
@ -150,7 +151,7 @@ func (c *CreateOptions) Create() error {
if c.SQLDown != nil {
// Create SQLDown
err = createFile(base+"down.sql", c.SQLDown)
err = createFile(filepath.Join(path, "down.sql"), c.SQLDown)
if err != nil {
return err
}
@ -159,29 +160,24 @@ func (c *CreateOptions) Create() error {
}
func (c *CreateOptions) Delete() error {
count := 0
fileName := fmt.Sprintf("%v_", c.Version)
// scan directory
files, err := ioutil.ReadDir(c.Directory)
if err != nil {
return err
}
for _, fi := range files {
if !fi.IsDir() {
if strings.HasPrefix(fi.Name(), fileName) {
base := filepath.Join(c.Directory, fi.Name())
err = deleteFile(base)
if err != nil {
return err
}
count = count + 1
if strings.HasPrefix(fi.Name(), fmt.Sprintf("%s_", c.Version)) {
if fi.IsDir() {
path := filepath.Join(c.Directory, fi.Name())
return deleteFile(path)
}
path := filepath.Join(c.Directory, fi.Name())
err := deleteFile(path)
if err != nil {
return err
}
}
}
if count == 0 {
return errors.New("Cannot find any migration file")
}
return nil
}
@ -201,8 +197,7 @@ func createFile(fname string, data []byte) error {
}
func deleteFile(fname string) error {
err := os.RemoveAll(fname)
return err
return os.RemoveAll(fname)
}
func GotoCmd(m *migrate.Migrate, v uint64, direction string) error {
@ -228,3 +223,35 @@ func DownCmd(m *migrate.Migrate, limit int64) error {
func ResetCmd(m *migrate.Migrate) error {
return m.Reset()
}
func SquashCmd(m *migrate.Migrate, from uint64, version int64, name, directory string) (versions []int64, err error) {
versions, upMeta, upSql, downMeta, downSql, err := m.Squash(from)
if err != nil {
return
}
createOptions := New(version, name, directory)
if len(upMeta) != 0 {
byteUp, err := yaml.Marshal(upMeta)
if err != nil {
return versions, errors.Wrap(err, "cannot unmarshall up query")
}
createOptions.MetaUp = byteUp
}
if len(downMeta) != 0 {
byteDown, err := yaml.Marshal(downMeta)
if err != nil {
return versions, errors.Wrap(err, "cannot unmarshall down query")
}
createOptions.MetaDown = byteDown
}
createOptions.SQLUp = upSql
createOptions.SQLDown = downSql
err = createOptions.Create()
if err != nil {
return versions, errors.Wrap(err, "cannot create migration")
}
return
}

View File

@ -104,6 +104,10 @@ type Driver interface {
Read(version uint64) (ok bool)
PushToList(migration io.Reader, fileType string, list *CustomList) error
Squash(list *CustomList, ret chan<- interface{})
SettingsDriver
MetadataDriver

File diff suppressed because it is too large Load Diff

View File

@ -5,6 +5,9 @@ import (
"fmt"
"strings"
"github.com/hasura/graphql-engine/cli/migrate/database"
"github.com/qor/transition"
log "github.com/sirupsen/logrus"
)
@ -23,8 +26,115 @@ func (h *HasuraInterfaceBulk) ResetArgs() {
}
type HasuraInterfaceQuery struct {
Type string `json:"type" yaml:"type"`
Args interface{} `json:"args" yaml:"args"`
Type requestTypes `json:"type" yaml:"type"`
Version metadataVersion `json:"version,omitempty" yaml:"version,omitempty"`
Args interface{} `json:"args" yaml:"args"`
}
type metadataVersion int
const (
v1 metadataVersion = 1
v2 = 2
)
type newHasuraIntefaceQuery struct {
Type requestTypes `json:"type" yaml:"type"`
Version metadataVersion `json:"version,omitempty" yaml:"version,omitempty"`
Args interface{} `json:"args" yaml:"args"`
}
func (h *newHasuraIntefaceQuery) UnmarshalJSON(b []byte) error {
type t newHasuraIntefaceQuery
var q t
if err := json.Unmarshal(b, &q); err != nil {
return err
}
if q.Version == 0 {
q.Version = v1
}
argBody, err := json.Marshal(q.Args)
if err != nil {
return err
}
switch q.Type {
case trackTable, addExistingTableOrView:
switch q.Version {
case v2:
q.Args = &trackTableV2Input{}
default:
q.Args = &trackTableInput{}
}
case setTableCustomFields:
q.Args = &setTableCustomFieldsV2Input{}
case untrackTable:
q.Args = &unTrackTableInput{}
case createObjectRelationship:
q.Args = &createObjectRelationshipInput{}
case createArrayRelationship:
q.Args = &createArrayRelationshipInput{}
case setRelationshipComment:
q.Args = &setRelationshipCommentInput{}
case dropRelationship:
q.Args = &dropRelationshipInput{}
case createInsertPermission:
q.Args = &createInsertPermissionInput{}
case dropInsertPermission:
q.Args = &dropInsertPermissionInput{}
case createSelectPermission:
q.Args = &createSelectPermissionInput{}
case dropSelectPermission:
q.Args = &dropSelectPermissionInput{}
case createUpdatePermission:
q.Args = &createUpdatePermissionInput{}
case dropUpdatePermission:
q.Args = &dropUpdatePermissionInput{}
case createDeletePermission:
q.Args = &createDeletePermissionInput{}
case dropDeletePermission:
q.Args = &dropDeletePermissionInput{}
case trackFunction:
q.Args = &trackFunctionInput{}
case unTrackFunction:
q.Args = &unTrackFunctionInput{}
case createEventTrigger:
q.Args = &createEventTriggerInput{}
case deleteEventTrigger:
q.Args = &deleteEventTriggerInput{}
case addRemoteSchema:
q.Args = &addRemoteSchemaInput{}
case removeRemoteSchema:
q.Args = &removeRemoteSchemaInput{}
case createQueryCollection:
q.Args = &createQueryCollectionInput{}
case dropQueryCollection:
q.Args = &dropQueryCollectionInput{}
case addQueryToCollection:
q.Args = &addQueryToCollectionInput{}
case dropQueryFromCollection:
q.Args = &dropQueryFromCollectionInput{}
case addCollectionToAllowList:
q.Args = &addCollectionToAllowListInput{}
case dropCollectionFromAllowList:
q.Args = &dropCollectionFromAllowListInput{}
case replaceMetadata:
q.Args = &replaceMetadataInput{}
case clearMetadata:
q.Args = &clearMetadataInput{}
case runSQL:
q.Args = &runSQLInput{}
case addComputedField:
q.Args = &addComputedFieldInput{}
case dropComputedField:
q.Args = &dropComputedFieldInput{}
default:
return fmt.Errorf("cannot squash type %s", h.Type)
}
if err := json.Unmarshal(argBody, &q.Args); err != nil {
return err
}
*h = newHasuraIntefaceQuery(q)
return nil
}
type HasuraQuery struct {
@ -136,3 +246,539 @@ type HasuraSQLRes struct {
ResultType string `json:"result_type"`
Result [][]string `json:"result"`
}
type requestTypes string
const (
trackTable requestTypes = "track_table"
addExistingTableOrView = "add_existing_table_or_view"
setTableCustomFields = "set_table_custom_fields"
untrackTable = "untrack_table"
trackFunction = "track_function"
unTrackFunction = "untrack_function"
createObjectRelationship = "create_object_relationship"
createArrayRelationship = "create_array_relationship"
dropRelationship = "drop_relationship"
setRelationshipComment = "set_relationship_comment"
createInsertPermission = "create_insert_permission"
dropInsertPermission = "drop_insert_permission"
createSelectPermission = "create_select_permission"
dropSelectPermission = "drop_select_permission"
createUpdatePermission = "create_update_permission"
dropUpdatePermission = "drop_update_permission"
createDeletePermission = "create_delete_permission"
dropDeletePermission = "drop_delete_permission"
setPermissionComment = "set_permission_comment"
createEventTrigger = "create_event_trigger"
deleteEventTrigger = "delete_event_trigger"
addRemoteSchema = "add_remote_schema"
removeRemoteSchema = "remove_remote_schema"
createQueryCollection = "create_query_collection"
dropQueryCollection = "drop_query_collection"
addQueryToCollection = "add_query_to_collection"
dropQueryFromCollection = "drop_query_from_collection"
addCollectionToAllowList = "add_collection_to_allowlist"
dropCollectionFromAllowList = "drop_collection_from_allowlist"
replaceMetadata = "replace_metadata"
clearMetadata = "clear_metadata"
runSQL = "run_sql"
bulkQuery = "bulk"
addComputedField = "add_computed_field"
dropComputedField = "drop_computed_field"
)
type tableMap struct {
name, schema string
}
type relationshipMap struct {
tableName, schemaName, name string
}
type permissionMap struct {
tableName, schemaName, permType, Role string
}
type computedFieldMap struct {
tableName, schemaName, name string
}
type queryInCollectionMap struct {
collectionName, queryName string
}
type tableSchema struct {
Name string `json:"name" yaml:"name"`
Schema string `json:"schema" yaml:"schema"`
}
func (t *tableSchema) UnmarshalJSON(b []byte) error {
var table string
if err := json.Unmarshal(b, &table); err != nil {
var ts struct {
Name string `json:"name"`
Schema string `json:"schema"`
}
if err := json.Unmarshal(b, &ts); err != nil {
return err
}
t.Name = ts.Name
t.Schema = ts.Schema
return nil
}
t.Name = table
t.Schema = "public"
return nil
}
type trackTableInput struct {
tableSchema
IsEnum bool `json:"is_enum" yaml:"is_enum"`
}
func (t *trackTableInput) UnmarshalJSON(b []byte) error {
type tmpT trackTableInput
var ts tmpT
if err := json.Unmarshal(b, &ts); err != nil {
return err
}
if ts.Schema == "" {
ts.Schema = "public"
}
*t = trackTableInput(ts)
return nil
}
type tableConfiguration struct {
CustomRootFields map[string]string `json:"custom_root_fields" yaml:"custom_root_fields"`
CustomColumnNames map[string]string `json:"custom_column_names" yaml:"custom_column_names"`
}
type trackTableV2Input struct {
Table tableSchema `json:"table" yaml:"table"`
Configuration tableConfiguration `json:"configuration" yaml:"configuration"`
}
type setTableCustomFieldsV2Input struct {
Table tableSchema `json:"table" yaml:"table"`
tableConfiguration
}
type unTrackTableInput struct {
tableSchema
}
func (t *unTrackTableInput) UnmarshalJSON(b []byte) error {
type tmpT unTrackTableInput
var ts tmpT
if err := json.Unmarshal(b, &ts); err != nil {
return err
}
if ts.Schema == "" {
ts.Schema = "public"
}
*t = unTrackTableInput(ts)
return nil
}
type trackFunctionInput struct {
tableSchema
}
type unTrackFunctionInput struct {
Schema string `json:"schema" yaml:"schema"`
Name string `json:"name" yaml:"name"`
}
type createObjectRelationshipInput struct {
Name string `json:"name" yaml:"name"`
Table tableSchema `json:"table" yaml:"table"`
Using interface{} `json:"using" yaml:"using"`
Comment *string `json:"comment,omitempty" yaml:"comment,omitempty"`
}
type createArrayRelationshipInput struct {
Name string `json:"name" yaml:"name"`
Table tableSchema `json:"table" yaml:"table"`
Using interface{} `json:"using" yaml:"using"`
Comment *string `json:"comment,omitempty" yaml:"comment,omitempty"`
}
type setRelationshipCommentInput struct {
Name string `json:"name" yaml:"name"`
Table tableSchema `json:"table" yaml:"table"`
Comment *string `json:"comment" yaml:"comment"`
}
type dropRelationshipInput struct {
RelationShip string `json:"relationship" yaml:"relationship"`
Table tableSchema `json:"table" yaml:"table"`
}
type createInsertPermissionInput struct {
Table tableSchema `json:"table" yaml:"table"`
Role string `json:"role" yaml:"role"`
Permission interface{} `json:"permission" yaml:"permission"`
Comment *string `json:"comment,omitempty" yaml:"comment,omitempty"`
}
type dropInsertPermissionInput struct {
Table tableSchema `json:"table" yaml:"table"`
Role string `json:"role" yaml:"role"`
}
type createSelectPermissionInput struct {
Table tableSchema `json:"table" yaml:"table"`
Role string `json:"role" yaml:"role"`
Permission interface{} `json:"permission" yaml:"permission"`
Comment *string `json:"comment,omitempty" yaml:"comment,omitempty"`
}
type dropSelectPermissionInput struct {
Table tableSchema `json:"table" yaml:"table"`
Role string `json:"role" yaml:"role"`
}
type createUpdatePermissionInput struct {
Table tableSchema `json:"table" yaml:"table"`
Role string `json:"role" yaml:"role"`
Permission interface{} `json:"permission" yaml:"permission"`
Comment *string `json:"comment,omitempty" yaml:"comment,omitempty"`
}
type dropUpdatePermissionInput struct {
Table tableSchema `json:"table" yaml:"table"`
Role string `json:"role" yaml:"role"`
}
type createDeletePermissionInput struct {
Table tableSchema `json:"table" yaml:"table"`
Role string `json:"role" yaml:"role"`
Permission interface{} `json:"permission" yaml:"permission"`
Comment *string `json:"comment,omitempty" yaml:"comment,omitempty"`
}
type dropDeletePermissionInput struct {
Table tableSchema `json:"table" yaml:"table"`
Role string `json:"role" yaml:"role"`
}
type setPermissionCommentInput struct {
Table tableSchema `json:"table" yaml:"table"`
Role string `json:"role" yaml:"role"`
Type string `json:"type" yaml:"type"`
Comment *string `json:"comment" yaml:"comment"`
}
type createEventTriggerInput struct {
Name string `json:"name" yaml:"name"`
Table tableSchema `json:"table" yaml:"table"`
Webhook string `json:"webhook,omitempty" yaml:"webhook,omitempty"`
WebhookFromEnv string `json:"webhook_from_env,omitempty" yaml:"webhook_from_env,omitempty"`
Definition *createEventTriggerOperationInput `json:"definition,omitempty" yaml:"definition,omitempty"`
Headers interface{} `json:"headers" yaml:"headers"`
Replace bool `json:"replace" yaml:"replace"`
createEventTriggerOperationInput
}
type createEventTriggerOperationInput struct {
Insert interface{} `json:"insert,omitempty" yaml:"insert,omitempty"`
Update interface{} `json:"update,omitempty" yaml:"update,omitempty"`
Delete interface{} `json:"delete,omitempty" yaml:"delete,omitempty"`
}
func (c *createEventTriggerInput) MarshalJSON() ([]byte, error) {
if c.Definition != nil {
c.Insert = c.Definition.Insert
c.Update = c.Definition.Update
c.Delete = c.Definition.Delete
c.Definition = nil
}
return json.Marshal(&struct {
Name string `json:"name" yaml:"name"`
Table tableSchema `json:"table" yaml:"table"`
Webhook string `json:"webhook,omitempty" yaml:"webhook,omitempty"`
WebhookFromEnv string `json:"webhook_from_env,omitempty" yaml:"webhook_from_env,omitempty"`
Headers interface{} `json:"headers" yaml:"headers"`
Replace bool `json:"replace" yaml:"replace"`
Insert interface{} `json:"insert,omitempty" yaml:"insert,omitempty"`
Update interface{} `json:"update,omitempty" yaml:"update,omitempty"`
Delete interface{} `json:"delete,omitempty" yaml:"delete,omitempty"`
}{
Name: c.Name,
Table: c.Table,
Webhook: c.Webhook,
WebhookFromEnv: c.WebhookFromEnv,
Headers: c.Headers,
Replace: c.Replace,
Insert: c.Insert,
Update: c.Update,
Delete: c.Delete,
})
}
type deleteEventTriggerInput struct {
Name string `json:"name" yaml:"name"`
}
type addRemoteSchemaInput struct {
Name string `json:"name" yaml:"name"`
Definition interface{} `json:"definition" yaml:"definition"`
Comment *string `json:"comment,omitempty" yaml:"comment,omitempty"`
}
type removeRemoteSchemaInput struct {
Name string `json:"name" yaml:"name"`
}
type collectionQuery struct {
Name string `json:"name" yaml:"name"`
Query string `json:"query" yaml:"query"`
}
type createQueryCollectionInput struct {
Name string `json:"name" yaml:"name"`
Comment *string `json:"comment,omitempty" yaml:"comment,omitempty"`
Definition struct {
Queries []collectionQuery `json:"queries" yaml:"queries"`
} `json:"definition" yaml:"definition"`
}
type dropQueryCollectionInput struct {
Collection string `json:"name" yaml:"name"`
Cascade bool `json:"cascade" yaml:"cascade"`
}
type addQueryToCollectionInput struct {
CollectionName string `json:"collection_name" yaml:"collection_name"`
QueryName string `json:"query_name" yaml:"query_name"`
Query string `json:"query" yaml:"query"`
}
type dropQueryFromCollectionInput struct {
CollectionName string `json:"collection_name" yaml:"collection_name"`
QueryName string `json:"query_name" yaml:"query_name"`
}
type addCollectionToAllowListInput struct {
Collection string `json:"collection" yaml:"collection"`
}
type dropCollectionFromAllowListInput struct {
Collection string `json:"collection" yaml:"collection"`
}
type addComputedFieldInput struct {
Table tableSchema `json:"table" yaml:"table"`
Name string `json:"name" yaml:"name"`
Definition interface{} `json:"definition" yaml:"definition"`
}
type dropComputedFieldInput struct {
Table tableSchema `json:"table" yaml:"table"`
Name string `json:"name" yaml:"name"`
Cascade bool `json:"cascade" yaml:"cascade"`
Comment string `json:"comment" yaml:"comment"`
}
type clearMetadataInput struct {
}
type replaceMetadataInput struct {
Tables []struct {
Table tableSchema `json:"table" yaml:"table"`
ArrayRelationships []*createArrayRelationshipInput `json:"array_relationships" yaml:"array_relationships"`
ObjectRelationships []*createObjectRelationshipInput `json:"object_relationships" yaml:"object_relationships"`
InsertPermissions []*createInsertPermissionInput `json:"insert_permissions" yaml:"insert_permissions"`
SelectPermissions []*createSelectPermissionInput `json:"select_permissions" yaml:"select_permissions"`
UpdatePermissions []*createUpdatePermissionInput `json:"update_permissions" yaml:"update_permissions"`
DeletePermissions []*createDeletePermissionInput `json:"delete_permissions" yaml:"delete_permissions"`
EventTriggers []*createEventTriggerInput `json:"event_triggers" yaml:"event_triggers"`
ComputedFields []*addComputedFieldInput `json:"computed_fields" yaml:"computed_fields"`
Configuration *tableConfiguration `json:"configuration" yaml:"configuration"`
} `json:"tables" yaml:"tables"`
Functions []*trackFunctionInput `json:"functions" yaml:"functions"`
QueryCollections []*createQueryCollectionInput `json:"query_collections" yaml:"query_collections"`
AllowList []*addCollectionToAllowListInput `json:"allowlist" yaml:"allowlist"`
RemoteSchemas []*addRemoteSchemaInput `json:"remote_schemas" yaml:"remote_schemas"`
}
func (rmi *replaceMetadataInput) convertToMetadataActions(l *database.CustomList) {
// track tables
for _, table := range rmi.Tables {
if table.Configuration == nil {
t := &trackTableInput{
tableSchema: tableSchema{
Name: table.Table.Name,
Schema: table.Table.Schema,
},
}
l.PushBack(t)
} else {
t := &trackTableV2Input{
Table: tableSchema{
Name: table.Table.Name,
Schema: table.Table.Schema,
},
Configuration: *table.Configuration,
}
l.PushBack(t)
}
}
for _, table := range rmi.Tables {
for _, objRel := range table.ObjectRelationships {
objRel.Table = tableSchema{
table.Table.Name,
table.Table.Schema,
}
l.PushBack(objRel)
}
}
for _, table := range rmi.Tables {
for _, arrayRel := range table.ArrayRelationships {
arrayRel.Table = tableSchema{
table.Table.Name,
table.Table.Schema,
}
l.PushBack(arrayRel)
}
}
for _, table := range rmi.Tables {
for _, insertPerm := range table.InsertPermissions {
insertPerm.Table = tableSchema{
table.Table.Name,
table.Table.Schema,
}
l.PushBack(insertPerm)
}
}
for _, table := range rmi.Tables {
for _, selectPerm := range table.SelectPermissions {
selectPerm.Table = tableSchema{
table.Table.Name,
table.Table.Schema,
}
l.PushBack(selectPerm)
}
}
for _, table := range rmi.Tables {
for _, updatePerm := range table.UpdatePermissions {
updatePerm.Table = tableSchema{
table.Table.Name,
table.Table.Schema,
}
l.PushBack(updatePerm)
}
}
for _, table := range rmi.Tables {
for _, deletePerm := range table.DeletePermissions {
deletePerm.Table = tableSchema{
table.Table.Name,
table.Table.Schema,
}
l.PushBack(deletePerm)
}
}
for _, table := range rmi.Tables {
for _, et := range table.EventTriggers {
et.Table = tableSchema{
table.Table.Name,
table.Table.Schema,
}
l.PushBack(et)
}
}
for _, table := range rmi.Tables {
for _, cf := range table.ComputedFields {
cf.Table = tableSchema{
table.Table.Name,
table.Table.Schema,
}
l.PushBack(cf)
}
}
// track functions
for _, function := range rmi.Functions {
l.PushBack(function)
}
// track query collections
for _, qc := range rmi.QueryCollections {
l.PushBack(qc)
}
// track allow list
for _, al := range rmi.AllowList {
l.PushBack(al)
}
// track remote schemas
for _, rs := range rmi.RemoteSchemas {
l.PushBack(rs)
}
}
type runSQLInput struct {
SQL string `json:"sql" yaml:"sql"`
}
type tableConfig struct {
name, schema string
transition.Transition
}
type relationshipConfig struct {
tableName, schemaName, name string
transition.Transition
}
type permissionConfig struct {
tableName, schemaName, permType, role string
transition.Transition
}
type computedFieldConfig struct {
tableName, schemaName, name string
transition.Transition
}
type functionConfig struct {
name, schema string
transition.Transition
}
type eventTriggerConfig struct {
name string
transition.Transition
}
type remoteSchemaConfig struct {
name string
transition.Transition
}
type queryCollectionConfig struct {
name string
allowList bool
transition.Transition
}
type queryInCollectionConfig struct {
collectionName string
queryName string
transition.Transition
}
type allowListConfig struct {
collection string
transition.Transition
}

View File

@ -0,0 +1,33 @@
package database
import (
"container/list"
"github.com/ahmetb/go-linq"
)
type CustomList struct {
*list.List
}
func (c *CustomList) Iterate() linq.Iterator {
length := c.Len()
var prevElem *list.Element
i := 0
return func() (item interface{}, ok bool) {
if length == 0 {
return
}
if i == 0 {
prevElem = c.Front()
i++
} else {
prevElem = prevElem.Next()
if prevElem == nil {
return
}
}
return prevElem, true
}
}

View File

@ -6,6 +6,8 @@
package migrate
import (
"bytes"
"container/list"
"fmt"
"os"
"sync"
@ -338,6 +340,127 @@ func (m *Migrate) Query(data []interface{}) error {
return m.databaseDrv.Query(data)
}
// Squash migrations from version v into a new migration.
// Returns a list of migrations that are squashed: vs
// the squashed metadata for all UP steps: um
// the squashed SQL for all UP steps: us
// the squashed metadata for all down steps: dm
// the squashed SQL for all down steps: ds
func (m *Migrate) Squash(v uint64) (vs []int64, um []interface{}, us []byte, dm []interface{}, ds []byte, err error) {
// check the migration mode on the database
mode, err := m.databaseDrv.GetSetting("migration_mode")
if err != nil {
return
}
// if migration_mode is false, set err to ErrNoMigrationMode and return
if mode != "true" {
err = ErrNoMigrationMode
return
}
// concurrently squash all the up migrations
retUp := make(chan interface{}, m.PrefetchMigrations)
go m.squashUp(v, retUp)
// concurrently squash all down migrations
retDown := make(chan interface{}, m.PrefetchMigrations)
go m.squashDown(v, retDown)
// combine squashed up and down migrations into a single one when they're ready
dataUp := make(chan interface{}, m.PrefetchMigrations)
dataDown := make(chan interface{}, m.PrefetchMigrations)
retVersions := make(chan int64, m.PrefetchMigrations)
go m.squashMigrations(retUp, retDown, dataUp, dataDown, retVersions)
// make a chan for errors
errChn := make(chan error)
// create a waitgroup to wait for all goroutines to finish execution
var wg sync.WaitGroup
// add three tasks to waitgroup since we used 3 goroutines above
wg.Add(3)
// read from dataUp chan when all up migrations are squashed and compiled
go func() {
// defer to mark one task in the waitgroup as complete
defer wg.Done()
buf := &bytes.Buffer{}
for r := range dataUp {
// check the type of value returned through the chan
switch data := r.(type) {
case error:
// it's an error, set error and return
// note: this return is returning the goroutine, not the current function
err = r.(error)
return
case []byte:
// it's SQL, concat all of them
buf.WriteString("\n")
buf.Write(data)
case interface{}:
// it's metadata, append into the array
um = append(um, data)
}
}
// set us as the bytes written into buf
us = buf.Bytes()
}()
// read from dataDown when it is ready:
go func() {
// defer to mark another task in the waitgroup as complete
defer wg.Done()
buf := &bytes.Buffer{}
for r := range dataDown {
// check the type of value returned through the chan
switch data := r.(type) {
case error:
// it's an error, set error and return
// note: this return is returning the goroutine, not the current function
err = r.(error)
return
case []byte:
// it's SQL, concat all of them
buf.WriteString("\n")
buf.Write(data)
case interface{}:
// it's metadata, append into the array
dm = append(dm, data)
}
}
// set ds as the bytes written into buf
ds = buf.Bytes()
}()
// read retVersions - versions that are squashed
go func() {
// defer to mark another task in the waitgroup as complete
defer wg.Done()
for r := range retVersions {
// append each version into the versions array
vs = append(vs, r)
}
}()
// returns from the above goroutines pass the control here.
// wait until all tasks (3) in the workgroup are completed
wg.Wait()
// check for errors in the error channel
select {
// we got an error, set err and return
case err = <-errChn:
return
default:
// set nothing and return, all is well
return
}
}
// Migrate looks at the currently active migration version,
// then migrates either up or down to the specified version.
func (m *Migrate) Migrate(version uint64, direction string) error {
@ -456,8 +579,152 @@ func (m *Migrate) Down() error {
// Reset resets public schema and hasuradb metadata
func (m *Migrate) Reset() (err error) {
err = m.databaseDrv.Reset()
return
return m.databaseDrv.Reset()
}
func (m *Migrate) squashUp(version uint64, ret chan<- interface{}) {
defer close(ret)
currentVersion := version
count := int64(0)
limit := int64(-1)
if m.stop() {
return
}
for limit == -1 {
if currentVersion == version {
if err := m.versionUpExists(version); err != nil {
ret <- err
return
}
migr, err := m.newMigration(version, int64(version))
if err != nil {
ret <- err
return
}
ret <- migr
go migr.Buffer()
migr, err = m.metanewMigration(version, int64(version))
if err != nil {
ret <- err
return
}
ret <- migr
go migr.Buffer()
count++
}
// apply next migration
next, err := m.sourceDrv.Next(currentVersion)
if os.IsNotExist(err) {
// no limit, but no migrations applied?
if count == 0 {
ret <- ErrNoChange
return
}
if limit == -1 {
return
}
}
if err != nil {
ret <- err
return
}
// Check if next files exists (yaml or sql)
if err = m.versionUpExists(next); err != nil {
ret <- err
return
}
migr, err := m.newMigration(next, int64(next))
if err != nil {
ret <- err
return
}
ret <- migr
go migr.Buffer()
migr, err = m.metanewMigration(next, int64(next))
if err != nil {
ret <- err
return
}
ret <- migr
go migr.Buffer()
currentVersion = next
count++
}
}
func (m *Migrate) squashDown(version uint64, ret chan<- interface{}) {
defer close(ret)
from, err := m.sourceDrv.GetLocalVersion()
if err != nil {
ret <- err
return
}
for {
if m.stop() {
return
}
err = m.versionDownExists(from)
if err != nil {
ret <- err
return
}
prev, err := m.sourceDrv.Prev(from)
if err != nil {
migr, err := m.metanewMigration(from, -1)
if err != nil {
ret <- err
return
}
ret <- migr
go migr.Buffer()
migr, err = m.newMigration(from, -1)
if err != nil {
ret <- err
return
}
ret <- migr
go migr.Buffer()
}
if from == version {
return
}
migr, err := m.metanewMigration(from, int64(prev))
if err != nil {
ret <- err
return
}
ret <- migr
go migr.Buffer()
migr, err = m.newMigration(from, int64(prev))
if err != nil {
ret <- err
return
}
ret <- migr
go migr.Buffer()
from = prev
}
}
// read reads either up or down migrations from source `from` to `to`.
@ -479,7 +746,7 @@ func (m *Migrate) read(version uint64, direction string, ret chan<- interface{})
return
}
// Check if next version exiss (yaml or sql)
// Check if next version exists (yaml or sql)
if err := m.versionUpExists(version); err != nil {
ret <- err
return
@ -833,6 +1100,71 @@ func (m *Migrate) runMigrations(ret <-chan interface{}) error {
return nil
}
func (m *Migrate) squashMigrations(retUp <-chan interface{}, retDown <-chan interface{}, dataUp chan<- interface{}, dataDown chan<- interface{}, versions chan<- int64) error {
var latestVersion int64
go func() {
defer close(dataUp)
defer close(versions)
squashList := database.CustomList{
list.New(),
}
defer m.databaseDrv.Squash(&squashList, dataUp)
for r := range retUp {
if m.stop() {
return
}
switch r.(type) {
case error:
dataUp <- r.(error)
case *Migration:
migr := r.(*Migration)
if migr.Body != nil {
if err := m.databaseDrv.PushToList(migr.BufferedBody, migr.FileType, &squashList); err != nil {
dataUp <- err
}
}
version := int64(migr.Version)
if version == migr.TargetVersion && version != latestVersion {
versions <- version
latestVersion = version
}
}
}
}()
go func() {
defer close(dataDown)
squashList := database.CustomList{
list.New(),
}
defer m.databaseDrv.Squash(&squashList, dataDown)
for r := range retDown {
if m.stop() {
return
}
switch r.(type) {
case error:
dataDown <- r.(error)
case *Migration:
migr := r.(*Migration)
if migr.Body != nil {
if err := m.databaseDrv.PushToList(migr.BufferedBody, migr.FileType, &squashList); err != nil {
dataDown <- err
}
}
}
}
}()
return nil
}
// versionUpExists checks the source if either the up or down migration for
// the specified migration version exists.
func (m *Migrate) versionUpExists(version uint64) error {

View File

@ -80,17 +80,50 @@ func (f *File) Close() error {
func (f *File) Scan() error {
f.migrations = source.NewMigrations()
files, err := ioutil.ReadDir(f.path)
folders, err := ioutil.ReadDir(f.path)
if err != nil {
return err
}
for _, fi := range files {
if !fi.IsDir() {
m, err := source.DefaultParse(fi.Name(), f.path)
for _, fo := range folders {
if fo.IsDir() {
// v2 migrate
dirName := fo.Name()
dirPath := filepath.Join(f.path, dirName)
files, err := ioutil.ReadDir(dirPath)
if err != nil {
return err
}
for _, fi := range files {
if fi.IsDir() {
continue
}
fileName := fmt.Sprintf("%s.%s", dirName, fi.Name())
m, err := source.DefaultParse(fileName)
if err != nil {
continue // ignore files that we can't parse
}
m.Raw = filepath.Join(dirName, fi.Name())
ok, err := source.IsEmptyFile(m, f.path)
if err != nil {
return err
}
if !ok {
continue
}
err = f.migrations.Append(m)
if err != nil {
return err
}
}
} else {
// v1 migrate
m, err := source.DefaultParse(fo.Name())
if err != nil {
continue // ignore files that we can't parse
}
m.Raw = fo.Name()
ok, err := source.IsEmptyFile(m, f.path)
if err != nil {
return err

View File

@ -26,7 +26,7 @@ var (
var Regex = regexp.MustCompile(`^([0-9]+)_(.*)\.(` + string(Down) + `|` + string(Up) + `)\.(.*)$`)
// Parse returns Migration for matching Regex pattern.
func Parse(raw string, directory string) (*Migration, error) {
func Parse(raw string) (*Migration, error) {
var direction Direction
m := Regex.FindStringSubmatch(raw)
if len(m) == 5 {
@ -58,7 +58,6 @@ func Parse(raw string, directory string) (*Migration, error) {
Version: versionUint64,
Identifier: m[2],
Direction: direction,
Raw: raw,
}, nil
}
return nil, ErrParse

View File

@ -6,18 +6,18 @@ import (
type MigrationStatus struct {
// Version is the version of this migration.
Version uint64
Version uint64 `json:"-"`
// Check if the migration is applied on the cluster
IsApplied bool
IsApplied bool `json:"database_status"`
// Check if the migration is present on the local.
IsPresent bool
IsPresent bool `json:"source_status"`
}
type Status struct {
Index uint64Slice
Migrations map[uint64]*MigrationStatus
Index uint64Slice `json:"migrations"`
Migrations map[uint64]*MigrationStatus `json:"status"`
}
func NewStatus() *Status {