mirror of
https://github.com/neilotoole/sq.git
synced 2024-12-18 13:41:49 +03:00
c7bba4dfe4
* go1.21: changes to support slog as part of stdlib * Removed accidentially checked-in line of code * Fixed minor linting issues; reenable typecheck * go1.21: switched to stdlib slices pkg
545 lines
14 KiB
Go
545 lines
14 KiB
Go
package source
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"log/slog"
|
|
"mime"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/neilotoole/sq/libsq/core/lg/lga"
|
|
|
|
"github.com/neilotoole/sq/libsq/core/lg/lgm"
|
|
|
|
"github.com/neilotoole/sq/libsq/core/lg"
|
|
|
|
"github.com/djherbis/fscache"
|
|
"github.com/h2non/filetype"
|
|
"github.com/h2non/filetype/matchers"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/neilotoole/sq/libsq/core/cleanup"
|
|
"github.com/neilotoole/sq/libsq/core/errz"
|
|
"github.com/neilotoole/sq/libsq/source/fetcher"
|
|
)
|
|
|
|
// Files is the centralized API for interacting with files.
|
|
//
|
|
// Why does Files exist? There's a need for functionality to
|
|
// transparently get a Reader for remote or local files, and most importantly,
|
|
// an ability for multiple goroutines to read/sample a file while
|
|
// it's being read (mainly to "sample" the file type, e.g. to determine
|
|
// if it's an XLSX file etc.). Currently we use fscache under the hood
|
|
// for this, but our implementation is not satisfactory: in particular,
|
|
// the implementation currently requires that we read the entire source
|
|
// file into fscache before it's available to be read (which is awful
|
|
// if we're reading long-running pipe from stdin). This entire thing
|
|
// needs to be revisited. Maybe Files even becomes a fs.FS.
|
|
type Files struct {
|
|
log *slog.Logger
|
|
mu sync.Mutex
|
|
clnup *cleanup.Cleanup
|
|
fcache *fscache.FSCache
|
|
detectFns []DriverDetectFunc
|
|
}
|
|
|
|
// NewFiles returns a new Files instance.
|
|
func NewFiles(ctx context.Context) (*Files, error) {
|
|
fs := &Files{clnup: cleanup.New(), log: lg.FromContext(ctx)}
|
|
|
|
tmpdir, err := os.MkdirTemp("", "sq_files_fscache_*")
|
|
if err != nil {
|
|
return nil, errz.Err(err)
|
|
}
|
|
|
|
fcache, err := fscache.New(tmpdir, os.ModePerm, time.Hour)
|
|
if err != nil {
|
|
return nil, errz.Err(err)
|
|
}
|
|
|
|
fs.clnup.AddE(fcache.Clean)
|
|
fs.fcache = fcache
|
|
return fs, nil
|
|
}
|
|
|
|
// AddDriverDetectors adds driver type detectors.
|
|
func (fs *Files) AddDriverDetectors(detectFns ...DriverDetectFunc) {
|
|
fs.detectFns = append(fs.detectFns, detectFns...)
|
|
}
|
|
|
|
// Size returns the file size of src.Location. This exists
|
|
// as a convenience function and something of a replacement
|
|
// for using os.Stat to get the file size.
|
|
func (fs *Files) Size(src *Source) (size int64, err error) {
|
|
r, err := fs.Open(src)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
defer lg.WarnIfCloseError(fs.log, lgm.CloseFileReader, r)
|
|
|
|
size, err = io.Copy(io.Discard, r)
|
|
if err != nil {
|
|
return 0, errz.Err(err)
|
|
}
|
|
|
|
return size, nil
|
|
}
|
|
|
|
// AddStdin copies f to fs's cache: the stdin data in f
|
|
// is later accessible via fs.Open(src) where src.Handle
|
|
// is StdinHandle; f's type can be detected via TypeStdin.
|
|
// Note that f is closed by this method.
|
|
//
|
|
// REVISIT: it's possible we'll ditch AddStdin and TypeStdin
|
|
// in some future version; this mechanism is a stopgap.
|
|
func (fs *Files) AddStdin(f *os.File) error {
|
|
fs.mu.Lock()
|
|
defer fs.mu.Unlock()
|
|
|
|
// We don't need r, but we're responsible for closing it.
|
|
r, err := fs.addFile(f, StdinHandle) // f is closed by addFile
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return r.Close()
|
|
}
|
|
|
|
// TypeStdin detects the type of stdin as previously added
|
|
// by AddStdin. An error is returned if AddStdin was not
|
|
// first invoked. If the type cannot be detected, TypeNone and
|
|
// nil are returned.
|
|
func (fs *Files) TypeStdin(ctx context.Context) (DriverType, error) {
|
|
if !fs.fcache.Exists(StdinHandle) {
|
|
return TypeNone, errz.New("must invoke AddStdin before invoking TypeStdin")
|
|
}
|
|
|
|
typ, ok, err := fs.detectType(ctx, StdinHandle)
|
|
if err != nil {
|
|
return TypeNone, err
|
|
}
|
|
|
|
if !ok {
|
|
return TypeNone, nil
|
|
}
|
|
|
|
return typ, nil
|
|
}
|
|
|
|
// add file copies f to fs's cache, returning a reader which the
|
|
// caller is responsible for closing. f is closed by this method.
|
|
func (fs *Files) addFile(f *os.File, key string) (fscache.ReadAtCloser, error) {
|
|
fs.log.Debug("Adding file", lga.Key, key, lga.Path, f.Name())
|
|
r, w, err := fs.fcache.Get(key)
|
|
if err != nil {
|
|
return nil, errz.Err(err)
|
|
}
|
|
|
|
if w == nil {
|
|
lg.WarnIfCloseError(fs.log, lgm.CloseFileReader, r)
|
|
return nil, errz.Errorf("failed to add to fscache (possibly previously added): %s", key)
|
|
}
|
|
|
|
// TODO: Problematically, we copy the entire contents of f into fscache.
|
|
// If f is a large file (e.g. piped over stdin), this means that
|
|
// everything is held up until f is fully copied. Hopefully we can
|
|
// do something with fscache so that the readers returned from
|
|
// fscache can lazily read from f.
|
|
_, err = io.Copy(w, f)
|
|
if err != nil {
|
|
lg.WarnIfCloseError(fs.log, lgm.CloseFileReader, r)
|
|
return nil, errz.Err(err)
|
|
}
|
|
|
|
err = errz.Combine(w.Close(), f.Close())
|
|
if err != nil {
|
|
lg.WarnIfCloseError(fs.log, lgm.CloseFileReader, r)
|
|
return nil, err
|
|
}
|
|
|
|
return r, nil
|
|
}
|
|
|
|
// Open returns a new io.ReadCloser for src.Location.
|
|
// If src.Handle is StdinHandle, AddStdin must first have
|
|
// been invoked. The caller must close the reader.
|
|
func (fs *Files) Open(src *Source) (io.ReadCloser, error) {
|
|
fs.mu.Lock()
|
|
defer fs.mu.Unlock()
|
|
|
|
return fs.newReader(src.Location)
|
|
}
|
|
|
|
// OpenFunc returns a func that invokes fs.Open for src.Location.
|
|
func (fs *Files) OpenFunc(src *Source) func() (io.ReadCloser, error) {
|
|
return func() (io.ReadCloser, error) {
|
|
return fs.Open(src)
|
|
}
|
|
}
|
|
|
|
// ReadAll is a convenience method to read the bytes of a source.
|
|
func (fs *Files) ReadAll(src *Source) ([]byte, error) {
|
|
r, err := fs.newReader(src.Location)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var data []byte
|
|
data, err = io.ReadAll(r)
|
|
closeErr := r.Close()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if closeErr != nil {
|
|
return nil, closeErr
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
func (fs *Files) newReader(loc string) (io.ReadCloser, error) {
|
|
if loc == StdinHandle {
|
|
r, w, err := fs.fcache.Get(StdinHandle)
|
|
if err != nil {
|
|
return nil, errz.Err(err)
|
|
}
|
|
if w != nil {
|
|
return nil, errz.New("@stdin not cached: has AddStdin been invoked yet?")
|
|
}
|
|
|
|
return r, nil
|
|
}
|
|
|
|
if !fs.fcache.Exists(loc) {
|
|
// cache miss
|
|
f, err := fs.openLocation(loc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Note that addFile closes f
|
|
r, err := fs.addFile(f, loc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return r, nil
|
|
}
|
|
|
|
r, _, err := fs.fcache.Get(loc)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return r, nil
|
|
}
|
|
|
|
// openLocation returns a file for loc. It is the caller's
|
|
// responsibility to close the returned file.
|
|
func (fs *Files) openLocation(loc string) (*os.File, error) {
|
|
var fpath string
|
|
var ok bool
|
|
var err error
|
|
|
|
fpath, ok = isFpath(loc)
|
|
if !ok {
|
|
// It's not a local file path, maybe it's remote (http)
|
|
var u *url.URL
|
|
u, ok = httpURL(loc)
|
|
if !ok {
|
|
// We're out of luck, it's not a valid file location
|
|
return nil, errz.Errorf("invalid src location: %s", loc)
|
|
}
|
|
|
|
// It's a remote file
|
|
fpath, err = fs.fetch(u.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// we have a legitimate fpath
|
|
return fs.openFile(fpath)
|
|
}
|
|
|
|
// openFile opens the file at fpath. It is the caller's
|
|
// responsibility to close the returned file.
|
|
func (fs *Files) openFile(fpath string) (*os.File, error) {
|
|
f, err := os.OpenFile(fpath, os.O_RDWR, 0o666)
|
|
if err != nil {
|
|
return nil, errz.Err(err)
|
|
}
|
|
|
|
return f, nil
|
|
}
|
|
|
|
// fetch ensures that loc exists locally as a file. This may
|
|
// entail downloading the file via HTTPS etc.
|
|
func (fs *Files) fetch(loc string) (fpath string, err error) {
|
|
// This impl is a vestigial abomination from an early
|
|
// experiment.
|
|
|
|
var ok bool
|
|
if fpath, ok = isFpath(loc); ok {
|
|
// loc is already a local file path
|
|
return fpath, nil
|
|
}
|
|
|
|
var u *url.URL
|
|
if u, ok = httpURL(loc); !ok {
|
|
return "", errz.Errorf("not a valid file location: %s", loc)
|
|
}
|
|
|
|
var dlFile *os.File
|
|
dlFile, err = os.CreateTemp("", "")
|
|
if err != nil {
|
|
return "", errz.Err(err)
|
|
}
|
|
|
|
fetchr := &fetcher.Fetcher{}
|
|
// TOOD: ultimately should be passing a real context here
|
|
err = fetchr.Fetch(context.Background(), u.String(), dlFile)
|
|
if err != nil {
|
|
return "", errz.Err(err)
|
|
}
|
|
|
|
// dlFile is kept open until fs is closed.
|
|
fs.clnup.AddC(dlFile)
|
|
|
|
return dlFile.Name(), nil
|
|
}
|
|
|
|
// Close closes any open resources.
|
|
func (fs *Files) Close() error {
|
|
fs.log.Debug("Files.Close invoked: executing clean funcs", lga.Count, fs.clnup.Len())
|
|
|
|
return fs.clnup.Run()
|
|
}
|
|
|
|
// CleanupE adds fn to the cleanup sequence invoked by fs.Close.
|
|
func (fs *Files) CleanupE(fn func() error) {
|
|
fs.clnup.AddE(fn)
|
|
}
|
|
|
|
// DriverType returns the driver type of loc.
|
|
func (fs *Files) DriverType(ctx context.Context, loc string) (DriverType, error) {
|
|
ploc, err := parseLoc(loc)
|
|
if err != nil {
|
|
return TypeNone, err
|
|
}
|
|
|
|
if ploc.typ != TypeNone {
|
|
return ploc.typ, nil
|
|
}
|
|
|
|
if ploc.ext != "" {
|
|
mtype := mime.TypeByExtension(ploc.ext)
|
|
if mtype == "" {
|
|
fs.log.Debug(
|
|
"unknown mime type",
|
|
lga.Type, mtype,
|
|
lga.Loc, loc,
|
|
)
|
|
} else {
|
|
if typ, ok := typeFromMediaType(mtype); ok {
|
|
return typ, nil
|
|
}
|
|
fs.log.Debug(
|
|
"unknown driver type for media type",
|
|
lga.Type, mtype,
|
|
lga.Loc, loc,
|
|
)
|
|
}
|
|
}
|
|
|
|
// Fall back to the byte detectors
|
|
typ, ok, err := fs.detectType(ctx, loc)
|
|
if err != nil {
|
|
return TypeNone, err
|
|
}
|
|
|
|
if !ok {
|
|
return TypeNone, errz.Errorf("unable to determine driver type: %s", loc)
|
|
}
|
|
|
|
return typ, nil
|
|
}
|
|
|
|
func (fs *Files) detectType(ctx context.Context, loc string) (typ DriverType, ok bool, err error) {
|
|
if len(fs.detectFns) == 0 {
|
|
return TypeNone, false, nil
|
|
}
|
|
|
|
type result struct {
|
|
typ DriverType
|
|
score float32
|
|
}
|
|
|
|
resultCh := make(chan result, len(fs.detectFns))
|
|
openFn := func() (io.ReadCloser, error) {
|
|
fs.mu.Lock()
|
|
defer fs.mu.Unlock()
|
|
|
|
return fs.newReader(loc)
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return TypeNone, false, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
g, gCtx := errgroup.WithContext(ctx)
|
|
gCtx = lg.NewContext(gCtx, fs.log)
|
|
|
|
for _, detectFn := range fs.detectFns {
|
|
detectFn := detectFn
|
|
|
|
g.Go(func() error {
|
|
select {
|
|
case <-gCtx.Done():
|
|
return gCtx.Err()
|
|
default:
|
|
}
|
|
|
|
gTyp, gScore, gErr := detectFn(gCtx, openFn)
|
|
if gErr != nil {
|
|
return gErr
|
|
}
|
|
|
|
if gScore > 0 {
|
|
resultCh <- result{typ: gTyp, score: gScore}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
err = g.Wait()
|
|
if err != nil {
|
|
fs.log.Error(err.Error())
|
|
return TypeNone, false, errz.Err(err)
|
|
}
|
|
close(resultCh)
|
|
|
|
var highestScore float32
|
|
for res := range resultCh {
|
|
if res.score > highestScore {
|
|
highestScore = res.score
|
|
typ = res.typ
|
|
}
|
|
}
|
|
|
|
const detectScoreThreshold = 0.5
|
|
if highestScore >= detectScoreThreshold {
|
|
return typ, true, nil
|
|
}
|
|
|
|
return TypeNone, false, nil
|
|
}
|
|
|
|
// FileOpenFunc returns a func that opens a ReadCloser. The caller
|
|
// is responsible for closing the returned ReadCloser.
|
|
type FileOpenFunc func() (io.ReadCloser, error)
|
|
|
|
// DriverDetectFunc interrogates a byte stream to determine
|
|
// the source driver type. A score is returned indicating
|
|
// the confidence that the driver type has been detected.
|
|
// A score <= 0 is failure, a score >= 1 is success; intermediate
|
|
// values indicate some level of confidence.
|
|
// An error is returned only if an IO problem occurred.
|
|
// The implementation gets access to the byte stream by invoking openFn,
|
|
// and is responsible for closing any reader it opens.
|
|
type DriverDetectFunc func(ctx context.Context, openFn FileOpenFunc) (detected DriverType, score float32, err error)
|
|
|
|
var _ DriverDetectFunc = DetectMagicNumber
|
|
|
|
// DetectMagicNumber is a DriverDetectFunc that uses an external
|
|
// pkg (h2non/filetype) to detect the "magic number" from
|
|
// the start of files.
|
|
func DetectMagicNumber(ctx context.Context, openFn FileOpenFunc,
|
|
) (detected DriverType, score float32, err error) {
|
|
log := lg.FromContext(ctx)
|
|
var r io.ReadCloser
|
|
r, err = openFn()
|
|
if err != nil {
|
|
return TypeNone, 0, errz.Err(err)
|
|
}
|
|
defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r)
|
|
|
|
// We only have to pass the file header = first 261 bytes
|
|
head := make([]byte, 261)
|
|
_, err = r.Read(head)
|
|
if err != nil {
|
|
return TypeNone, 0, errz.Wrapf(err, "failed to read header")
|
|
}
|
|
|
|
ftype, err := filetype.Match(head)
|
|
if err != nil {
|
|
if err != nil {
|
|
return TypeNone, 0, errz.Err(err)
|
|
}
|
|
}
|
|
|
|
switch ftype {
|
|
default:
|
|
return TypeNone, 0, nil
|
|
case matchers.TypeXlsx:
|
|
// This doesn't seem to work, because .xlsx files are
|
|
// zipped, so the type returns as a zip. Perhaps there's
|
|
// something we can do about it, such as first extracting
|
|
// the zip, and then reading the inner magic number, but
|
|
// the xlsx.DetectXLSX func should catch the type anyway.
|
|
return typeXLSX, 1.0, nil
|
|
case matchers.TypeXls:
|
|
// TODO: our xlsx driver doesn't yet support XLS
|
|
return typeXLSX, 1.0, errz.Errorf("Microsoft XLS (%s) not currently supported", ftype)
|
|
case matchers.TypeSqlite:
|
|
return typeSL3, 1.0, nil
|
|
}
|
|
}
|
|
|
|
// httpURL tests if s is a well-structured HTTP or HTTPS url, and
|
|
// if so, returns the url and true.
|
|
func httpURL(s string) (u *url.URL, ok bool) {
|
|
var err error
|
|
u, err = url.Parse(s)
|
|
if err != nil || u.Host == "" || !(u.Scheme == "http" || u.Scheme == "https") {
|
|
return nil, false
|
|
}
|
|
|
|
return u, true
|
|
}
|
|
|
|
// TempDirFile creates a new temporary file in a new temp dir,
|
|
// opens the file for reading and writing, and then closes it.
|
|
// It's probably unnecessary to go through the ceremony of
|
|
// opening and closing the file, but maybe it's better to fail early.
|
|
// It is the caller's responsibility to remove the file and/or dir
|
|
// if desired.
|
|
func TempDirFile(filename string) (dir, file string, err error) {
|
|
dir, err = os.MkdirTemp("", "sq_")
|
|
if err != nil {
|
|
return "", "", errz.Err(err)
|
|
}
|
|
|
|
file = filepath.Join(dir, filename)
|
|
var f *os.File
|
|
if f, err = os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o600); err != nil {
|
|
// Silently delete the temp dir
|
|
_ = os.RemoveAll(dir)
|
|
return "", "", errz.Err(err)
|
|
}
|
|
|
|
if err = f.Close(); err != nil {
|
|
// Silently delete the temp dir
|
|
_ = os.RemoveAll(dir)
|
|
return "", "", errz.Wrap(err, "close temp file")
|
|
}
|
|
|
|
return dir, file, nil
|
|
}
|