sq/cli/source.go

249 lines
6.4 KiB
Go
Raw Normal View History

2020-08-06 20:58:47 +03:00
package cli
import (
"context"
"strings"
"github.com/spf13/cobra"
"github.com/neilotoole/sq/cli/flag"
"github.com/neilotoole/sq/cli/run"
"github.com/neilotoole/sq/libsq/ast"
"github.com/neilotoole/sq/libsq/core/errz"
"github.com/neilotoole/sq/libsq/core/lg"
"github.com/neilotoole/sq/libsq/core/lg/lga"
"github.com/neilotoole/sq/libsq/core/options"
2020-08-06 20:58:47 +03:00
"github.com/neilotoole/sq/libsq/driver"
"github.com/neilotoole/sq/libsq/source"
)
// determineSources figures out what the active source is
// from any combination of stdin, flags or cfg. It will
// mutate ru.Config.Collection as necessary. If requireActive
// is true, an error is returned if there's no active source.
func determineSources(ctx context.Context, ru *run.Run, requireActive bool) error {
cmd, coll := ru.Cmd, ru.Config.Collection
activeSrc, err := activeSrcAndSchemaFromFlagsOrConfig(ru)
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
// Note: ^ activeSrc could still be nil
// check if there's input on stdin
stdinSrc, err := checkStdinSource(ctx, ru)
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
if stdinSrc != nil {
// We have a valid source on stdin.
// Add the stdin source to coll.
err = coll.Add(stdinSrc)
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
if !cmdFlagChanged(cmd, flag.ActiveSrc) {
2020-08-06 20:58:47 +03:00
// If the user has not explicitly set an active
// source via flag, then we set the stdin pipe data
// source as the active source.
// We do this because the @stdin src is commonly the
// only data source the user cares about in a pipe
// situation.
_, err = coll.SetActive(stdinSrc.Handle, false)
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
activeSrc = stdinSrc
}
}
if activeSrc == nil && requireActive {
2020-08-06 20:58:47 +03:00
return errz.New(msgNoActiveSrc)
}
return nil
}
// activeSrcAndSchemaFromFlagsOrConfig gets the active source, either
2020-08-06 20:58:47 +03:00
// from flagActiveSrc or from srcs.Active. An error is returned
// if the flag src is not found: if the flag src is found,
// it is set as the active src on coll. If the flag was not
// set and there is no active src in coll, (nil, nil) is
2020-08-06 20:58:47 +03:00
// returned.
//
// This source also checks flag.ActiveSchema, and changes the schema
// of the source if the flag is set.
func activeSrcAndSchemaFromFlagsOrConfig(ru *run.Run) (*source.Source, error) {
cmd, coll := ru.Cmd, ru.Config.Collection
2020-08-06 20:58:47 +03:00
var activeSrc *source.Source
if cmdFlagChanged(cmd, flag.ActiveSrc) {
2020-08-06 20:58:47 +03:00
// The user explicitly wants to set an active source
// just for this query.
handle, _ := cmd.Flags().GetString(flag.ActiveSrc)
s, err := coll.Get(handle)
2020-08-06 20:58:47 +03:00
if err != nil {
return nil, errz.Wrapf(err, "flag --%s", flag.ActiveSrc)
2020-08-06 20:58:47 +03:00
}
activeSrc, err = coll.SetActive(s.Handle, false)
2020-08-06 20:58:47 +03:00
if err != nil {
return nil, err
}
} else {
activeSrc = coll.Active()
2020-08-06 20:58:47 +03:00
}
if err := processFlagActiveSchema(cmd, activeSrc); err != nil {
return nil, err
}
2020-08-06 20:58:47 +03:00
return activeSrc, nil
}
// processFlagActiveSchema processes the --src.schema flag, setting
// appropriate Source.Catalog and Source.Schema values on activeSrc.
// If flag.ActiveSchema is not set, this is no-op. If activeSrc is nil,
// an error is returned.
func processFlagActiveSchema(cmd *cobra.Command, activeSrc *source.Source) error {
ru := run.FromContext(cmd.Context())
if !cmdFlagChanged(cmd, flag.ActiveSchema) {
// Nothing to do here
return nil
}
if activeSrc == nil {
return errz.Errorf("active catalog/schema specified via --%s, but active source is nil",
flag.ActiveSchema)
}
val, _ := cmd.Flags().GetString(flag.ActiveSchema)
if val = strings.TrimSpace(val); val == "" {
return errz.Errorf("active catalog/schema specified via --%s, but schema is empty",
flag.ActiveSchema)
}
catalog, schema, err := ast.ParseCatalogSchema(val)
if err != nil {
return errz.Wrapf(err, "invalid active schema specified via --%s",
flag.ActiveSchema)
}
drvr, err := ru.DriverRegistry.SQLDriverFor(activeSrc.Type)
if err != nil {
return err
}
if catalog != "" {
if !drvr.Dialect().Catalog {
return errz.Errorf("driver {%s} does not support catalog", activeSrc.Type)
}
activeSrc.Catalog = catalog
}
if schema != "" {
activeSrc.Schema = schema
}
return nil
}
2020-08-06 20:58:47 +03:00
// checkStdinSource checks if there's stdin data (on pipe/redirect).
// If there is, that pipe is inspected, and if it has recognizable
// input, a new source instance with handle @stdin is constructed
// and returned. If the pipe has no data (size is zero),
// then (nil,nil) is returned.
func checkStdinSource(ctx context.Context, ru *run.Run) (*source.Source, error) {
cmd := ru.Cmd
2020-08-06 20:58:47 +03:00
f := ru.Stdin
2020-08-06 20:58:47 +03:00
info, err := f.Stat()
if err != nil {
return nil, errz.Wrap(err, "failed to get stat on stdin")
}
if info.Size() <= 0 {
// Doesn't make sense to have zero-data pipe? just ignore.
return nil, nil //nolint:nilnil
2020-08-06 20:58:47 +03:00
}
// If we got this far, we have pipe input
typ := source.TypeNone
if cmd.Flags().Changed(flag.IngestDriver) {
val, _ := cmd.Flags().GetString(flag.IngestDriver)
typ = source.DriverType(val)
if ru.DriverRegistry.ProviderFor(typ) == nil {
2020-08-06 20:58:47 +03:00
return nil, errz.Errorf("unknown driver type: %s", typ)
}
}
err = ru.Files.AddStdin(f)
2020-08-06 20:58:47 +03:00
if err != nil {
return nil, err
}
if typ == source.TypeNone {
typ, err = ru.Files.TypeStdin(ctx)
2020-08-06 20:58:47 +03:00
if err != nil {
return nil, err
}
if typ == source.TypeNone {
return nil, errz.New("unable to detect type of stdin: use flag --driver")
}
}
return newSource(
ctx,
ru.DriverRegistry,
typ,
source.StdinHandle,
source.StdinHandle,
options.Options{},
)
2020-08-06 20:58:47 +03:00
}
// newSource creates a new Source instance where the
// driver type is known. Opts may be nil.
func newSource(ctx context.Context, dp driver.Provider, typ source.DriverType, handle, loc string,
opts options.Options,
) (*source.Source, error) {
log := lg.FromContext(ctx)
2020-08-06 20:58:47 +03:00
if opts == nil {
log.Debug("Create new data source",
lga.Handle, handle,
lga.Driver, typ,
lga.Loc, source.RedactLocation(loc),
)
2020-08-06 20:58:47 +03:00
} else {
log.Debug("Create new data source with opts",
lga.Handle, handle,
lga.Driver, typ,
lga.Loc, source.RedactLocation(loc),
// lga.Opts, opts.Encode(), // FIXME: encode opts for debugging
)
2020-08-06 20:58:47 +03:00
}
err := source.ValidHandle(handle)
2020-08-06 20:58:47 +03:00
if err != nil {
return nil, err
}
drvr, err := dp.DriverFor(typ)
if err != nil {
return nil, err
}
src := &source.Source{Handle: handle, Location: loc, Type: typ, Options: opts}
2020-08-06 20:58:47 +03:00
log.Debug("Validating provisional new data source", lga.Src, src)
2020-08-06 20:58:47 +03:00
canonicalSrc, err := drvr.ValidateSource(src)
if err != nil {
return nil, err
}
return canonicalSrc, nil
}