sq/cli/cmd_sql.go

167 lines
4.4 KiB
Go
Raw Normal View History

2020-08-06 20:58:47 +03:00
package cli
import (
"context"
"fmt"
"strings"
"github.com/spf13/cobra"
"github.com/neilotoole/sq/cli/flag"
"github.com/neilotoole/sq/cli/output"
"github.com/neilotoole/sq/cli/run"
2020-08-06 20:58:47 +03:00
"github.com/neilotoole/sq/libsq"
"github.com/neilotoole/sq/libsq/core/errz"
"github.com/neilotoole/sq/libsq/core/lg"
"github.com/neilotoole/sq/libsq/core/lg/lga"
"github.com/neilotoole/sq/libsq/core/lg/lgm"
"github.com/neilotoole/sq/libsq/core/stringz"
"github.com/neilotoole/sq/libsq/driver"
2020-08-06 20:58:47 +03:00
"github.com/neilotoole/sq/libsq/source"
)
func newSQLCmd() *cobra.Command {
2020-08-06 20:58:47 +03:00
cmd := &cobra.Command{
Use: "sql QUERY|STMT",
Short: "Execute DB-native SQL query or statement",
Long: `Execute a SQL query or statement against the active source using the
source's SQL dialect. Use flag --src=@HANDLE to specify an alternative
source.`,
RunE: execSQL,
2020-08-06 20:58:47 +03:00
Example: ` # Select from active source
$ sq sql 'SELECT * FROM actor'
2020-08-06 20:58:47 +03:00
# Select from a specified source
$ sq sql --src=@sakila_pg12 'SELECT * FROM actor'
2020-08-06 20:58:47 +03:00
# Drop table @sakila_pg12.actor
$ sq sql --src=@sakila_pg12 'DROP TABLE actor'
2020-08-06 20:58:47 +03:00
# Select from active source and write results to @sakila_ms17.actor
$ sq sql 'SELECT * FROM actor' --insert=@sakila_ms17.actor`,
2020-08-06 20:58:47 +03:00
}
addQueryCmdFlags(cmd)
return cmd
2020-08-06 20:58:47 +03:00
}
func execSQL(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
ru := run.FromContext(ctx)
2020-08-06 20:58:47 +03:00
switch len(args) {
default:
return errz.New("a single query string is required")
case 0:
return errz.New("no SQL query string")
2020-08-06 20:58:47 +03:00
case 1:
if strings.TrimSpace(args[0]) == "" {
return errz.New("empty SQL query string")
}
}
err := determineSources(ctx, ru, true)
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
coll := ru.Config.Collection
2020-08-06 20:58:47 +03:00
// activeSrc is guaranteed to be non-nil after
// determineSources successfully returns.
activeSrc := coll.Active()
2020-08-06 20:58:47 +03:00
if err = applySourceOptions(cmd, activeSrc); err != nil {
return err
}
if !cmdFlagChanged(cmd, flag.Insert) {
2020-08-06 20:58:47 +03:00
// The user didn't specify the --insert=@src.tbl flag,
// so we just want to print the records.
return execSQLPrint(ctx, ru, activeSrc)
2020-08-06 20:58:47 +03:00
}
// Instead of printing the records, they will be
// written to another database
insertTo, _ := cmd.Flags().GetString(flag.Insert)
2020-08-06 20:58:47 +03:00
if insertTo == "" {
return errz.Errorf("invalid --%s value: empty", flag.Insert)
2020-08-06 20:58:47 +03:00
}
destHandle, destTbl, err := source.ParseTableHandle(insertTo)
if err != nil {
return errz.Wrapf(err, "invalid --%s value", flag.Insert)
2020-08-06 20:58:47 +03:00
}
destSrc, err := coll.Get(destHandle)
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
return execSQLInsert(ctx, ru, activeSrc, destSrc, destTbl)
2020-08-06 20:58:47 +03:00
}
// execSQLPrint executes the SQL and prints resulting records
// to the configured writer.
func execSQLPrint(ctx context.Context, ru *run.Run, fromSrc *source.Source) error {
args := ru.Args
pool, err := ru.Pools.Open(ctx, fromSrc)
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
recw := output.NewRecordWriterAdapter(ctx, ru.Writers.Record)
err = libsq.QuerySQL(ctx, pool, nil, recw, args[0])
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
_, err = recw.Wait() // Wait for the writer to finish processing
return err
}
// execSQLInsert executes the SQL and inserts resulting records
// into destTbl in destSrc.
func execSQLInsert(ctx context.Context, ru *run.Run,
fromSrc, destSrc *source.Source, destTbl string,
) error {
args := ru.Args
pools := ru.Pools
ctx, cancelFn := context.WithCancel(ctx)
2020-08-06 20:58:47 +03:00
defer cancelFn()
fromPool, err := pools.Open(ctx, fromSrc)
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
destPool, err := pools.Open(ctx, destSrc)
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
// Note: We don't need to worry about closing fromPool and
// destPool because they are closed by pools.Close, which
// is invoked by ru.Close, and ru is closed further up the
2020-08-06 20:58:47 +03:00
// stack.
inserter := libsq.NewDBWriter(
destPool,
destTbl,
driver.OptTuningRecChanSize.Get(destSrc.Options),
libsq.DBWriterCreateTableIfNotExistsHook(destTbl),
)
err = libsq.QuerySQL(ctx, fromPool, nil, inserter, args[0])
2020-08-06 20:58:47 +03:00
if err != nil {
return errz.Wrapf(err, "insert to {%s} failed", source.Target(destSrc, destTbl))
2020-08-06 20:58:47 +03:00
}
affected, err := inserter.Wait() // Wait for the writer to finish processing
if err != nil {
return errz.Wrapf(err, "insert %s.%s failed", destSrc.Handle, destTbl)
}
lg.FromContext(ctx).Debug(lgm.RowsAffected, lga.Count, affected)
2020-08-06 20:58:47 +03:00
// TODO: Should really use a Printer here
_, _ = fmt.Fprintf(ru.Out, stringz.Plu("Inserted %d row(s) into %s\n",
int(affected)), affected, source.Target(destSrc, destTbl))
2020-08-06 20:58:47 +03:00
return nil
}