mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-09-17 13:37:26 +03:00
cli: introduce BigQuery source
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/5766 Co-authored-by: Mohd Bilal <24944223+m-Bilal@users.noreply.github.com> GitOrigin-RevId: 1bd4718dbf7b0368e99e5d7c6dd08f362dbf5c2b
This commit is contained in:
parent
334660d906
commit
22d515dcb6
@ -43,6 +43,7 @@ const (
|
||||
SourceKindMSSQL SourceKind = "mssql"
|
||||
SourceKindCitus SourceKind = "citus"
|
||||
SourceKindCockroach SourceKind = "cockroach"
|
||||
SourceKindBigQuery SourceKind = "bigquery"
|
||||
)
|
||||
|
||||
type V2Query interface {
|
||||
@ -50,6 +51,7 @@ type V2Query interface {
|
||||
MSSQLSourceOps
|
||||
CitusSourceOps
|
||||
CockroachSourceOps
|
||||
BigQuerySourceOps
|
||||
Send(requestBody interface{}) (httpcResponse *httpc.Response, body io.Reader, error error)
|
||||
Bulk([]RequestBody) (io.Reader, error)
|
||||
}
|
||||
|
@ -41,6 +41,14 @@ type CitusRunSQLInput PGRunSQLInput
|
||||
|
||||
type CitusRunSQLOutput PGRunSQLOutput
|
||||
|
||||
type BigQuerySourceOps interface {
|
||||
BigQueryRunSQL(input BigQueryRunSQLInput) (response *BigQueryRunSQLOutput, err error)
|
||||
}
|
||||
|
||||
type BigQueryRunSQLInput PGRunSQLInput
|
||||
|
||||
type BigQueryRunSQLOutput PGRunSQLOutput
|
||||
|
||||
type CockroachSourceOps interface {
|
||||
CockroachRunSQL(input CockroachRunSQLInput) (response *CockroachRunSQLOutput, err error)
|
||||
}
|
||||
|
34
cli/internal/hasura/sourceops/bigquery/bigquery.go
Normal file
34
cli/internal/hasura/sourceops/bigquery/bigquery.go
Normal file
@ -0,0 +1,34 @@
|
||||
package bigquery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/hasura/graphql-engine/cli/v2/internal/errors"
|
||||
"github.com/hasura/graphql-engine/cli/v2/internal/httpc"
|
||||
)
|
||||
|
||||
// allow to interact with all hasura opertions on database
|
||||
type SourceOps struct {
|
||||
*httpc.Client
|
||||
// api subpath eg: "v1/query"
|
||||
path string
|
||||
}
|
||||
|
||||
func New(client *httpc.Client, path string) *SourceOps {
|
||||
return &SourceOps{client, path}
|
||||
}
|
||||
|
||||
func (d *SourceOps) send(body interface{}, responseBodyWriter io.Writer) (*httpc.Response, error) {
|
||||
var op errors.Op = "bigquery.SourceOps.send"
|
||||
req, err := d.NewRequest(http.MethodPost, d.path, body)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
resp, err := d.LockAndDo(context.Background(), req, responseBodyWriter)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
36
cli/internal/hasura/sourceops/bigquery/run_sql.go
Normal file
36
cli/internal/hasura/sourceops/bigquery/run_sql.go
Normal file
@ -0,0 +1,36 @@
|
||||
package bigquery
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/hasura/graphql-engine/cli/v2/internal/errors"
|
||||
"github.com/hasura/graphql-engine/cli/v2/internal/hasura"
|
||||
)
|
||||
|
||||
func (d *SourceOps) BigQueryRunSQL(input hasura.BigQueryRunSQLInput) (*hasura.BigQueryRunSQLOutput, error) {
|
||||
var op errors.Op = "bigquery.SourceOps.CockroachRunSQL"
|
||||
body := hasura.RequestBody{
|
||||
Type: "bigquery_run_sql",
|
||||
Args: input,
|
||||
}
|
||||
var b = new(bytes.Buffer)
|
||||
resp, err := d.send(body, b)
|
||||
if err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
if b.Len() > 0 {
|
||||
return nil, errors.E(op, errors.KindHasuraAPI, b.String())
|
||||
} else {
|
||||
return nil, errors.E(op, errors.KindHasuraAPI, fmt.Errorf("bigquery_run_sql api request failed %d", resp.StatusCode))
|
||||
}
|
||||
}
|
||||
o := new(hasura.BigQueryRunSQLOutput)
|
||||
if err = json.NewDecoder(b).Decode(o); err != nil {
|
||||
return nil, errors.E(op, err)
|
||||
}
|
||||
return o, nil
|
||||
}
|
70
cli/internal/hasura/sourceops/bigquery/run_sql_test.go
Normal file
70
cli/internal/hasura/sourceops/bigquery/run_sql_test.go
Normal file
@ -0,0 +1,70 @@
|
||||
package bigquery
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/hasura/graphql-engine/cli/v2/internal/hasura"
|
||||
"github.com/hasura/graphql-engine/cli/v2/internal/httpc"
|
||||
"github.com/hasura/graphql-engine/cli/v2/internal/testutil"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestHasuraDatabaseOperations_RunSQL(t *testing.T) {
|
||||
port, source, projectId, dataset, teardown := testutil.StartHasuraWithBigQuerySource(t, testutil.HasuraDockerImage)
|
||||
defer teardown()
|
||||
type fields struct {
|
||||
httpClient *httpc.Client
|
||||
path string
|
||||
}
|
||||
type args struct {
|
||||
input hasura.BigQueryRunSQLInput
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
want *hasura.BigQueryRunSQLOutput
|
||||
wantErr bool
|
||||
assertErr require.ErrorAssertionFunc
|
||||
}{
|
||||
{
|
||||
"can send a run_sql request",
|
||||
fields{
|
||||
httpClient: testutil.NewHttpcClient(t, port, nil),
|
||||
path: "v2/query",
|
||||
},
|
||||
args{
|
||||
input: hasura.BigQueryRunSQLInput{
|
||||
SQL: fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s.%s.test_1` (first_name STRING,last_name STRING);", projectId, dataset),
|
||||
Source: source,
|
||||
},
|
||||
},
|
||||
&hasura.BigQueryRunSQLOutput{
|
||||
ResultType: hasura.TuplesOK,
|
||||
Result: [][]string{{}},
|
||||
},
|
||||
false,
|
||||
require.NoError,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
test := func() {
|
||||
h := &SourceOps{
|
||||
Client: tt.fields.httpClient,
|
||||
path: tt.fields.path,
|
||||
}
|
||||
got, err := h.BigQueryRunSQL(tt.args.input)
|
||||
tt.assertErr(t, err)
|
||||
if tt.wantErr {
|
||||
return
|
||||
}
|
||||
assert.Equal(t, tt.want, got)
|
||||
}
|
||||
test()
|
||||
})
|
||||
}
|
||||
}
|
@ -6,6 +6,7 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/hasura/graphql-engine/cli/v2/internal/hasura/sourceops/bigquery"
|
||||
"github.com/hasura/graphql-engine/cli/v2/internal/hasura/sourceops/citus"
|
||||
"github.com/hasura/graphql-engine/cli/v2/internal/hasura/sourceops/cockroach"
|
||||
"github.com/hasura/graphql-engine/cli/v2/internal/hasura/sourceops/mssql"
|
||||
@ -21,6 +22,7 @@ type Client struct {
|
||||
hasura.PGSourceOps
|
||||
hasura.MSSQLSourceOps
|
||||
hasura.CitusSourceOps
|
||||
hasura.BigQuerySourceOps
|
||||
hasura.CockroachSourceOps
|
||||
path string
|
||||
}
|
||||
@ -32,6 +34,7 @@ func New(c *httpc.Client, path string) *Client {
|
||||
MSSQLSourceOps: mssql.New(c, path),
|
||||
CitusSourceOps: citus.New(c, path),
|
||||
CockroachSourceOps: cockroach.New(c, path),
|
||||
BigQuerySourceOps: bigquery.New(c, path),
|
||||
path: path,
|
||||
}
|
||||
return client
|
||||
|
@ -575,6 +575,19 @@ func StartHasuraWithCockroachSource(t TestingT, image string) (hasuraPort, sourc
|
||||
return hasuraPort, sourceName, teardown
|
||||
}
|
||||
|
||||
func StartHasuraWithBigQuerySource(t TestingT, image string) (hasuraPort, sourceName, projectId, dataset string, teardown func()) {
|
||||
hasuraPort, hasuraTeardown := StartHasuraWithMetadataDatabase(t, image)
|
||||
sourceName = randomdata.SillyName()
|
||||
serviceAccount := os.Getenv("HASURA_BIGQUERY_SERVICE_KEY")
|
||||
globalSelectLimit := 10
|
||||
projectId = os.Getenv("HASURA_BIGQUERY_PROJECT_ID")
|
||||
dataset = os.Getenv("HASURA_BIGQUERY_DATASET")
|
||||
hasuraEndpoint := fmt.Sprintf("%s:%s", BaseURL, hasuraPort)
|
||||
AddBigQuerySourceToHasura(t, globalSelectLimit, sourceName, serviceAccount, projectId, dataset, hasuraEndpoint)
|
||||
|
||||
return hasuraPort, sourceName, projectId, dataset, hasuraTeardown
|
||||
}
|
||||
|
||||
func StartCockroachContainer(t TestingT) (connectionString string, teardown func()) {
|
||||
user := "root"
|
||||
database := "defaultdb"
|
||||
@ -627,14 +640,9 @@ func StartCockroachContainer(t TestingT) (connectionString string, teardown func
|
||||
}
|
||||
|
||||
func AddCockroachSourceToHasura(t TestingT, hasuraEndpoint, connectionString, sourceName string) {
|
||||
addSourceToHasura(t, hasuraEndpoint, connectionString, sourceName, "cockroach_add_source")
|
||||
}
|
||||
|
||||
func addSourceToHasura(t TestingT, hasuraEndpoint, connectionString, sourceName, requestType string) {
|
||||
url := fmt.Sprintf("%s/v1/metadata", hasuraEndpoint)
|
||||
body := fmt.Sprintf(`
|
||||
request := fmt.Sprintf(`
|
||||
{
|
||||
"type": "%s",
|
||||
"type": "cockroach_add_source",
|
||||
"args": {
|
||||
"name": "%s",
|
||||
"configuration": {
|
||||
@ -644,9 +652,39 @@ func addSourceToHasura(t TestingT, hasuraEndpoint, connectionString, sourceName,
|
||||
}
|
||||
}
|
||||
}
|
||||
`, requestType, sourceName, connectionString)
|
||||
` ,sourceName, connectionString)
|
||||
addSourceToHasura(t, hasuraEndpoint, sourceName, request)
|
||||
}
|
||||
|
||||
func AddBigQuerySourceToHasura(t TestingT, globalSelectLimit int, sourceName, serviceAccount, projectId, dataset, hasuraEndpoint string) {
|
||||
request := fmt.Sprintf(`
|
||||
{
|
||||
"type": "bigquery_add_source",
|
||||
"args": {
|
||||
"name": "%s",
|
||||
"configuration": {
|
||||
"service_account": %s,
|
||||
"global_select_limit": %d,
|
||||
"project_id": "%s",
|
||||
"datasets": [
|
||||
"%s"
|
||||
]
|
||||
},
|
||||
"replace_configuration": false,
|
||||
"customization": {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
`, sourceName, serviceAccount, globalSelectLimit, projectId, dataset)
|
||||
|
||||
addSourceToHasura(t, hasuraEndpoint, sourceName, request)
|
||||
}
|
||||
|
||||
func addSourceToHasura(t TestingT, hasuraEndpoint, sourceName, requestBody string) {
|
||||
url := fmt.Sprintf("%s/v1/metadata", hasuraEndpoint)
|
||||
body := requestBody
|
||||
|
||||
fmt.Println(connectionString)
|
||||
fmt.Println(hasuraEndpoint)
|
||||
|
||||
req, err := http.NewRequest("POST", url, strings.NewReader(body))
|
||||
|
@ -158,6 +158,8 @@ func MigrateAPI(c *gin.Context) {
|
||||
sourceKind = hasura.SourceKindCitus
|
||||
case "cockroach":
|
||||
sourceKind = hasura.SourceKindCockroach
|
||||
case "bigquery":
|
||||
sourceKind = hasura.SourceKindBigQuery
|
||||
default:
|
||||
c.JSON(http.StatusInternalServerError, &Response{Code: "request_parse_error", Message: fmt.Sprintf("cannot determine database kind for '%v'", sourceName)})
|
||||
return
|
||||
|
@ -39,6 +39,7 @@ var (
|
||||
"select", "insert", "update", "delete", "count", "run_sql", "bulk",
|
||||
"mssql_select", "mssql_insert", "mssql_update", "mssql_delete", "mssql_count", "mssql_run_sql",
|
||||
"citus_select", "citus_insert", "citus_update", "citus_delete", "citus_count", "citus_run_sql",
|
||||
"bigquery_select", "bigquery_insert", "bigquery_update", "bigquery_delete", "bigquery_count", "bigquery_run_sql",
|
||||
"cockroach_run_sql",
|
||||
}
|
||||
queryTypesMap = func() map[string]bool {
|
||||
@ -78,6 +79,7 @@ type HasuraDB struct {
|
||||
pgSourceOps hasura.PGSourceOps
|
||||
mssqlSourceOps hasura.MSSQLSourceOps
|
||||
citusSourceOps hasura.CitusSourceOps
|
||||
bigquerySourceOps hasura.BigQuerySourceOps
|
||||
cockroachSourceOps hasura.CockroachSourceOps
|
||||
genericQueryRequest hasura.GenericSend
|
||||
hasuraClient *hasura.Client
|
||||
@ -104,6 +106,7 @@ func WithInstance(config *Config, logger *log.Logger, hasuraOpts *database.Hasur
|
||||
pgSourceOps: hasuraOpts.PGSourceOps,
|
||||
mssqlSourceOps: hasuraOpts.MSSQLSourceOps,
|
||||
citusSourceOps: hasuraOpts.CitusSourceOps,
|
||||
bigquerySourceOps: hasuraOpts.BigQuerySourceOps,
|
||||
genericQueryRequest: hasuraOpts.GenericQueryRequest,
|
||||
pgDumpClient: hasuraOpts.PGDumpClient,
|
||||
|
||||
@ -268,6 +271,11 @@ func (h *HasuraDB) Run(migration io.Reader, fileType, fileName string) error {
|
||||
if err != nil {
|
||||
return errors.E(op, err)
|
||||
}
|
||||
case hasura.SourceKindBigQuery:
|
||||
_, err := h.bigquerySourceOps.BigQueryRunSQL(hasura.BigQueryRunSQLInput(sqlInput))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
return errors.E(op, fmt.Errorf("unsupported source kind, source name: %v kind: %v", h.hasuraOpts.SourceName, h.hasuraOpts.SourceKind))
|
||||
|
@ -111,6 +111,7 @@ type HasuraOpts struct {
|
||||
PGSourceOps hasura.PGSourceOps
|
||||
MSSQLSourceOps hasura.MSSQLSourceOps
|
||||
CitusSourceOps hasura.CitusSourceOps
|
||||
BigQuerySourceOps hasura.BigQuerySourceOps
|
||||
MetadataOps hasura.CommonMetadataOperations
|
||||
V2MetadataOps hasura.V2CommonMetadataOperations
|
||||
GenericQueryRequest hasura.GenericSend
|
||||
|
@ -157,6 +157,7 @@ func NewMigrate(ec *cli.ExecutionContext, isCmd bool, sourceName string, sourceK
|
||||
opts.hasuraOpts.PGSourceOps = ec.APIClient.V2Query
|
||||
opts.hasuraOpts.MSSQLSourceOps = ec.APIClient.V2Query
|
||||
opts.hasuraOpts.CitusSourceOps = ec.APIClient.V2Query
|
||||
opts.hasuraOpts.BigQuerySourceOps = ec.APIClient.V2Query
|
||||
opts.hasuraOpts.GenericQueryRequest = ec.APIClient.V2Query.Send
|
||||
} else {
|
||||
opts.hasuraOpts.PGSourceOps = ec.APIClient.V1Query
|
||||
@ -218,7 +219,11 @@ func GetFilePath(dir string) *nurl.URL {
|
||||
|
||||
func IsMigrationsSupported(kind hasura.SourceKind) bool {
|
||||
switch kind {
|
||||
case hasura.SourceKindMSSQL, hasura.SourceKindPG, hasura.SourceKindCitus, hasura.SourceKindCockroach:
|
||||
case hasura.SourceKindMSSQL,
|
||||
hasura.SourceKindPG,
|
||||
hasura.SourceKindCitus,
|
||||
hasura.SourceKindCockroach,
|
||||
hasura.SourceKindBigQuery:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
Loading…
Reference in New Issue
Block a user