mirror of
https://github.com/neilotoole/sq.git
synced 2024-11-24 11:54:37 +03:00
Fixed issues with files and databases not being closed correctly (#73)
* fiddling with scratch database close order * files debugging * files debugging2 * files debugging3 * files debugging 4 * files debugging 5 * didn't close the ReadCloser in csv import * more closing cleanup
This commit is contained in:
parent
8c883d276e
commit
5aebc04356
@ -111,7 +111,7 @@ func execInspect(rc *RunContext, cmd *cobra.Command, args []string) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errz.Wrapf(err, "failed to inspect %s", src.Handle)
|
return errz.Wrapf(err, "failed to inspect %s", src.Handle)
|
||||||
}
|
}
|
||||||
defer rc.Log.WarnIfCloseError(dbase)
|
//defer rc.Log.WarnIfCloseError(dbase)
|
||||||
|
|
||||||
if table != "" {
|
if table != "" {
|
||||||
var tblMeta *source.TableMetadata
|
var tblMeta *source.TableMetadata
|
||||||
|
@ -49,7 +49,7 @@ func TestCmdInspect(t *testing.T) {
|
|||||||
tc := tc
|
tc := tc
|
||||||
|
|
||||||
t.Run(tc.handle, func(t *testing.T) {
|
t.Run(tc.handle, func(t *testing.T) {
|
||||||
t.Parallel()
|
//t.Parallel()
|
||||||
|
|
||||||
th := testh.New(t)
|
th := testh.New(t)
|
||||||
src := th.Source(tc.handle)
|
src := th.Source(tc.handle)
|
||||||
|
@ -11,7 +11,6 @@ import (
|
|||||||
"github.com/neilotoole/lg"
|
"github.com/neilotoole/lg"
|
||||||
|
|
||||||
"github.com/neilotoole/sq/cli/output/csvw"
|
"github.com/neilotoole/sq/cli/output/csvw"
|
||||||
"github.com/neilotoole/sq/libsq/core/cleanup"
|
|
||||||
"github.com/neilotoole/sq/libsq/core/errz"
|
"github.com/neilotoole/sq/libsq/core/errz"
|
||||||
"github.com/neilotoole/sq/libsq/driver"
|
"github.com/neilotoole/sq/libsq/driver"
|
||||||
"github.com/neilotoole/sq/libsq/source"
|
"github.com/neilotoole/sq/libsq/source"
|
||||||
@ -67,31 +66,37 @@ func (d *driveri) DriverMetadata() driver.Metadata {
|
|||||||
|
|
||||||
// Open implements driver.Driver.
|
// Open implements driver.Driver.
|
||||||
func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) {
|
func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) {
|
||||||
dbase := &database{log: d.log, src: src, clnup: cleanup.New(), files: d.files}
|
dbase := &database{
|
||||||
|
log: d.log,
|
||||||
r, err := d.files.Open(src)
|
src: src,
|
||||||
if err != nil {
|
//clnup: cleanup.New(),
|
||||||
return nil, err
|
files: d.files,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//r, err := d.files.Open(src)
|
||||||
|
//if err != nil {
|
||||||
|
// return nil, err
|
||||||
|
//}
|
||||||
|
|
||||||
|
var err error
|
||||||
dbase.impl, err = d.scratcher.OpenScratch(ctx, src.Handle)
|
dbase.impl, err = d.scratcher.OpenScratch(ctx, src.Handle)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.log.WarnIfCloseError(r)
|
//d.log.WarnIfCloseError(r)
|
||||||
d.log.WarnIfFuncError(dbase.clnup.Run)
|
//d.log.WarnIfFuncError(dbase.clnup.Run)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = importCSV(ctx, d.log, src, d.files.OpenFunc(src), dbase.impl)
|
err = importCSV(ctx, d.log, src, d.files.OpenFunc(src), dbase.impl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.log.WarnIfCloseError(r)
|
//d.log.WarnIfCloseError(r)
|
||||||
d.log.WarnIfFuncError(dbase.clnup.Run)
|
//d.log.WarnIfFuncError(dbase.clnup.Run)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = r.Close()
|
//err = r.Close()
|
||||||
if err != nil {
|
//if err != nil {
|
||||||
return nil, err
|
// return nil, err
|
||||||
}
|
//}
|
||||||
|
|
||||||
return dbase, nil
|
return dbase, nil
|
||||||
}
|
}
|
||||||
@ -140,10 +145,10 @@ func (d *driveri) Ping(ctx context.Context, src *source.Source) error {
|
|||||||
|
|
||||||
// database implements driver.Database.
|
// database implements driver.Database.
|
||||||
type database struct {
|
type database struct {
|
||||||
log lg.Log
|
log lg.Log
|
||||||
src *source.Source
|
src *source.Source
|
||||||
impl driver.Database
|
impl driver.Database
|
||||||
clnup *cleanup.Cleanup
|
//clnup *cleanup.Cleanup
|
||||||
files *source.Files
|
files *source.Files
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -207,7 +212,9 @@ func (d *database) SourceMetadata(ctx context.Context) (*source.Metadata, error)
|
|||||||
func (d *database) Close() error {
|
func (d *database) Close() error {
|
||||||
d.log.Debugf("Close database: %s", d.src)
|
d.log.Debugf("Close database: %s", d.src)
|
||||||
|
|
||||||
return errz.Combine(d.impl.Close(), d.clnup.Run())
|
return errz.Err(d.impl.Close())
|
||||||
|
|
||||||
|
//return errz.Combine(d.impl.Close(), d.clnup.Run())
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -38,6 +38,8 @@ func importCSV(ctx context.Context, log lg.Log, src *source.Source, openFn sourc
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer log.WarnIfCloseError(r)
|
||||||
|
|
||||||
// We add the CR filter reader to deal with CSV files exported
|
// We add the CR filter reader to deal with CSV files exported
|
||||||
// from Excel which can have the DOS-style \r EOL markers.
|
// from Excel which can have the DOS-style \r EOL markers.
|
||||||
cr := csv.NewReader(&crFilterReader{r: r})
|
cr := csv.NewReader(&crFilterReader{r: r})
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
"github.com/neilotoole/lg"
|
"github.com/neilotoole/lg"
|
||||||
@ -389,6 +390,10 @@ type database struct {
|
|||||||
db *sql.DB
|
db *sql.DB
|
||||||
src *source.Source
|
src *source.Source
|
||||||
drvr *driveri
|
drvr *driveri
|
||||||
|
|
||||||
|
// DEBUG: closeMu and closed exist while debugging close behavior
|
||||||
|
closeMu sync.Mutex
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// DB implements driver.Database.
|
// DB implements driver.Database.
|
||||||
@ -451,13 +456,29 @@ func (d *database) SourceMetadata(ctx context.Context) (*source.Metadata, error)
|
|||||||
|
|
||||||
// Close implements driver.Database.
|
// Close implements driver.Database.
|
||||||
func (d *database) Close() error {
|
func (d *database) Close() error {
|
||||||
d.log.Debugf("Close database: %s", d.src)
|
d.closeMu.Lock()
|
||||||
|
defer d.closeMu.Unlock()
|
||||||
|
|
||||||
return errz.Err(d.db.Close())
|
//if !d.closed {
|
||||||
|
// debug.PrintStack()
|
||||||
|
//}
|
||||||
|
|
||||||
|
if d.closed {
|
||||||
|
//panic( "SQLITE DB already closed")
|
||||||
|
d.log.Warnf("SQLite DB already closed: %v", d.src)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
d.log.Debugf("Closing database: %s", d.src)
|
||||||
|
err := errz.Err(d.db.Close())
|
||||||
|
d.closed = true
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewScratchSource returns a new scratch src. Currently this
|
// NewScratchSource returns a new scratch src. Effectively this
|
||||||
// defaults to a sqlite-backed source.
|
// function creates a new sqlite db file in the temp dir, and
|
||||||
|
// src points at this file. The returned clnup func closes that
|
||||||
|
// db file and deletes it.
|
||||||
func NewScratchSource(log lg.Log, name string) (src *source.Source, clnup func() error, err error) {
|
func NewScratchSource(log lg.Log, name string) (src *source.Source, clnup func() error, err error) {
|
||||||
name = stringz.SanitizeAlphaNumeric(name, '_')
|
name = stringz.SanitizeAlphaNumeric(name, '_')
|
||||||
_, f, cleanFn, err := source.TempDirFile(name + ".sqlite")
|
_, f, cleanFn, err := source.TempDirFile(name + ".sqlite")
|
||||||
|
@ -294,33 +294,42 @@ func (d *Databases) OpenScratch(ctx context.Context, name string) (Database, err
|
|||||||
}
|
}
|
||||||
d.log.Debugf("got scratch src %s: %s", scratchSrc.Handle, scratchSrc.RedactedLocation())
|
d.log.Debugf("got scratch src %s: %s", scratchSrc.Handle, scratchSrc.RedactedLocation())
|
||||||
|
|
||||||
scratchDBClnup := cleanup.New().AddE(cleanFn)
|
//scratchDBClnup := cleanup.New().AddE(cleanFn)
|
||||||
|
|
||||||
drvr, err := d.drvrs.DriverFor(scratchSrc.Type)
|
drvr, err := d.drvrs.DriverFor(scratchSrc.Type)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.log.WarnIfError(scratchDBClnup.Run())
|
d.log.WarnIfFuncError(cleanFn)
|
||||||
|
//d.log.WarnIfError(scratchDBClnup.Run())
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sqlDrvr, ok := drvr.(SQLDriver)
|
sqlDrvr, ok := drvr.(SQLDriver)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
d.log.WarnIfFuncError(cleanFn)
|
||||||
|
|
||||||
|
//d.log.WarnIfError(scratchDBClnup.Run())
|
||||||
return nil, errz.Errorf("driver for scratch source %s is not a SQLDriver but is %T", scratchSrc.Handle, drvr)
|
return nil, errz.Errorf("driver for scratch source %s is not a SQLDriver but is %T", scratchSrc.Handle, drvr)
|
||||||
}
|
}
|
||||||
|
|
||||||
var backingDB Database
|
var backingDB Database
|
||||||
backingDB, err = sqlDrvr.Open(ctx, scratchSrc)
|
backingDB, err = sqlDrvr.Open(ctx, scratchSrc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.log.WarnIfError(scratchDBClnup.Run())
|
d.log.WarnIfFuncError(cleanFn)
|
||||||
|
|
||||||
|
//d.log.WarnIfError(scratchDBClnup.Run())
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
scratchDBClnup.AddE(backingDB.Close)
|
d.clnup.AddE(cleanFn)
|
||||||
|
return backingDB, nil
|
||||||
scratchDB := &scratchDatabase{log: d.log, impl: backingDB, cleanup: scratchDBClnup}
|
//
|
||||||
|
//scratchDBClnup.AddE(backingDB.Close)
|
||||||
// scratchDB.Close will be invoked when d.Close is invoked.
|
//
|
||||||
d.clnup.AddE(scratchDB.Close)
|
//scratchDB := &scratchDatabase{log: d.log, impl: backingDB, cleanup: scratchDBClnup}
|
||||||
return scratchDB, nil
|
//
|
||||||
|
//// scratchDB.Close will be invoked when d.Close is invoked.
|
||||||
|
//d.clnup.AddE(scratchDB.Close)
|
||||||
|
//return scratchDB, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenJoin opens an appropriate database for use as
|
// OpenJoin opens an appropriate database for use as
|
||||||
|
@ -3,6 +3,7 @@ package driver
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/neilotoole/lg"
|
"github.com/neilotoole/lg"
|
||||||
|
|
||||||
@ -22,6 +23,8 @@ type scratchDatabase struct {
|
|||||||
log lg.Log
|
log lg.Log
|
||||||
impl Database
|
impl Database
|
||||||
cleanup *cleanup.Cleanup
|
cleanup *cleanup.Cleanup
|
||||||
|
mu sync.Mutex
|
||||||
|
closed bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// DB implements driver.Database.
|
// DB implements driver.Database.
|
||||||
@ -51,8 +54,20 @@ func (d *scratchDatabase) SourceMetadata(ctx context.Context) (*source.Metadata,
|
|||||||
|
|
||||||
// Close implements driver.Database.
|
// Close implements driver.Database.
|
||||||
func (d *scratchDatabase) Close() error {
|
func (d *scratchDatabase) Close() error {
|
||||||
d.log.Debugf("Close scratch database: %s", d.impl.Source())
|
d.mu.Lock()
|
||||||
|
defer d.mu.Unlock()
|
||||||
|
|
||||||
|
if d.closed {
|
||||||
|
panic("already closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
d.log.Debugf("Closing scratch database [%s]", d.impl.Source())
|
||||||
|
|
||||||
|
//debug.PrintStack()
|
||||||
// No need to explicitly invoke c.impl.Close because it
|
// No need to explicitly invoke c.impl.Close because it
|
||||||
// has already been added to c.cleanup.
|
// has already been added to c.cleanup.
|
||||||
return d.cleanup.Run()
|
err := d.cleanup.Run()
|
||||||
|
d.log.Debugf("Closed scratch database [%s]: err=%v", d.impl.Source(), err)
|
||||||
|
d.closed = true
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
@ -41,14 +41,21 @@ type Files struct {
|
|||||||
func NewFiles(log lg.Log) (*Files, error) {
|
func NewFiles(log lg.Log) (*Files, error) {
|
||||||
fs := &Files{log: log, clnup: cleanup.New()}
|
fs := &Files{log: log, clnup: cleanup.New()}
|
||||||
|
|
||||||
tmpdir, err := ioutil.TempDir("", "")
|
tmpdir, err := ioutil.TempDir("", "sq_files_fscache_*")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errz.Err(err)
|
return nil, errz.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fs.clnup.AddE(func() error {
|
//fs.clnup.AddE(func() error {
|
||||||
return errz.Err(os.RemoveAll(tmpdir))
|
// log.Debugf("Deleting files tmp dir: %s", tmpdir)
|
||||||
})
|
// err := errz.Err(os.RemoveAll(tmpdir))
|
||||||
|
// if err != nil {
|
||||||
|
// log.Errorf("Error deleting files tmp dir: %v", err)
|
||||||
|
// } else {
|
||||||
|
// log.Debugf("Success deleting files tmp dir")
|
||||||
|
// }
|
||||||
|
// return err
|
||||||
|
//})
|
||||||
|
|
||||||
fcache, err := fscache.New(tmpdir, os.ModePerm, time.Hour)
|
fcache, err := fscache.New(tmpdir, os.ModePerm, time.Hour)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -56,7 +63,13 @@ func NewFiles(log lg.Log) (*Files, error) {
|
|||||||
return nil, errz.Err(err)
|
return nil, errz.Err(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fs.clnup.AddE(fcache.Clean)
|
fs.clnup.AddE(func() error {
|
||||||
|
log.Debugf("About to clean fscache from dir: %s", tmpdir)
|
||||||
|
err := fcache.Clean()
|
||||||
|
log.WarnIfError(err)
|
||||||
|
|
||||||
|
return err
|
||||||
|
})
|
||||||
fs.fcache = fcache
|
fs.fcache = fcache
|
||||||
return fs, nil
|
return fs, nil
|
||||||
}
|
}
|
||||||
@ -129,6 +142,7 @@ func (fs *Files) TypeStdin(ctx context.Context) (Type, error) {
|
|||||||
// add file copies f to fs's cache, returning a reader which the
|
// add file copies f to fs's cache, returning a reader which the
|
||||||
// caller is responsible for closing. f is closed by this method.
|
// caller is responsible for closing. f is closed by this method.
|
||||||
func (fs *Files) addFile(f *os.File, key string) (fscache.ReadAtCloser, error) {
|
func (fs *Files) addFile(f *os.File, key string) (fscache.ReadAtCloser, error) {
|
||||||
|
fs.log.Debugf("Adding file with key %q: %s", key, f.Name())
|
||||||
r, w, err := fs.fcache.Get(key)
|
r, w, err := fs.fcache.Get(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errz.Err(err)
|
return nil, errz.Err(err)
|
||||||
@ -305,6 +319,8 @@ func (fs *Files) fetch(loc string) (fpath string, err error) {
|
|||||||
|
|
||||||
// Close closes any open resources.
|
// Close closes any open resources.
|
||||||
func (fs *Files) Close() error {
|
func (fs *Files) Close() error {
|
||||||
|
fs.log.Debugf("Files.Close invoked: has %d clean funcs", fs.clnup.Len())
|
||||||
|
|
||||||
return fs.clnup.Run()
|
return fs.clnup.Run()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,3 +72,12 @@ func TestSakila_CSV(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSQLiteCloseError(t *testing.T) {
|
||||||
|
th := testh.New(t)
|
||||||
|
src := th.Source(sakila.CSVActor)
|
||||||
|
// Note table "data" instead of "actor", because CSV is monotable
|
||||||
|
sink, err := th.QuerySQL(src, "SELECT * FROM data")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, sakila.TblActorCount, len(sink.Recs))
|
||||||
|
}
|
||||||
|
@ -103,7 +103,14 @@ func (h *Helper) init() {
|
|||||||
var err error
|
var err error
|
||||||
h.files, err = source.NewFiles(log)
|
h.files, err = source.NewFiles(log)
|
||||||
require.NoError(h.T, err)
|
require.NoError(h.T, err)
|
||||||
h.Cleanup.AddC(h.files)
|
|
||||||
|
h.Cleanup.Add(func() {
|
||||||
|
h.T.Logf("Executing outer Files cleanup")
|
||||||
|
err := h.files.Close()
|
||||||
|
assert.NoError(h.T, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
//h.Cleanup.AddC(h.files)
|
||||||
h.files.AddTypeDetectors(source.DetectMagicNumber)
|
h.files.AddTypeDetectors(source.DetectMagicNumber)
|
||||||
|
|
||||||
h.databases = driver.NewDatabases(log, h.registry, sqlite3.NewScratchSource)
|
h.databases = driver.NewDatabases(log, h.registry, sqlite3.NewScratchSource)
|
||||||
|
Loading…
Reference in New Issue
Block a user