Refactor/rename database to pool (#328)

* Renamed `driver.Database` to `driver.Pool` (and related things)

* workflow: Update tparse version

* workflow: Update golangci-lint version
This commit is contained in:
Neil O'Toole 2023-11-18 19:21:14 -07:00 committed by GitHub
parent 511e69f59b
commit 82727b3890
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 626 additions and 628 deletions

View File

@ -11,8 +11,8 @@ on:
env: env:
GO_VERSION: 1.21.0 GO_VERSION: 1.21.0
GORELEASER_VERSION: 1.20.0 GORELEASER_VERSION: 1.20.0
GOLANGCI_LINT_VERSION: v1.54.1 GOLANGCI_LINT_VERSION: v1.55.2
TPARSE_VERSION: v0.11.1 TPARSE_VERSION: v0.13.2
BUILD_TAGS: 'sqlite_vtable sqlite_stat4 sqlite_fts5 sqlite_introspect sqlite_json sqlite_math_functions' BUILD_TAGS: 'sqlite_vtable sqlite_stat4 sqlite_fts5 sqlite_introspect sqlite_json sqlite_math_functions'
jobs: jobs:
@ -129,6 +129,7 @@ jobs:
- name: golangci-lint - name: golangci-lint
uses: golangci/golangci-lint-action@v3 uses: golangci/golangci-lint-action@v3
with: with:
skip-cache: true
version: ${{ env.GOLANGCI_LINT_VERSION }} version: ${{ env.GOLANGCI_LINT_VERSION }}

View File

@ -166,14 +166,14 @@ func execInspect(cmd *cobra.Command, args []string) error {
return err return err
} }
dbase, err := ru.Databases.Open(ctx, src) pool, err := ru.Pools.Open(ctx, src)
if err != nil { if err != nil {
return errz.Wrapf(err, "failed to inspect %s", src.Handle) return errz.Wrapf(err, "failed to inspect %s", src.Handle)
} }
if table != "" { if table != "" {
var tblMeta *source.TableMetadata var tblMeta *source.TableMetadata
tblMeta, err = dbase.TableMetadata(ctx, table) tblMeta, err = pool.TableMetadata(ctx, table)
if err != nil { if err != nil {
return err return err
} }
@ -183,11 +183,11 @@ func execInspect(cmd *cobra.Command, args []string) error {
if cmdFlagIsSetTrue(cmd, flag.InspectDBProps) { if cmdFlagIsSetTrue(cmd, flag.InspectDBProps) {
var db *sql.DB var db *sql.DB
if db, err = dbase.DB(ctx); err != nil { if db, err = pool.DB(ctx); err != nil {
return err return err
} }
var props map[string]any var props map[string]any
sqlDrvr := dbase.SQLDriver() sqlDrvr := pool.SQLDriver()
if props, err = sqlDrvr.DBProperties(ctx, db); err != nil { if props, err = sqlDrvr.DBProperties(ctx, db); err != nil {
return err return err
} }
@ -197,7 +197,7 @@ func execInspect(cmd *cobra.Command, args []string) error {
overviewOnly := cmdFlagIsSetTrue(cmd, flag.InspectOverview) overviewOnly := cmdFlagIsSetTrue(cmd, flag.InspectOverview)
srcMeta, err := dbase.SourceMetadata(ctx, overviewOnly) srcMeta, err := pool.SourceMetadata(ctx, overviewOnly)
if err != nil { if err != nil {
return errz.Wrapf(err, "failed to read %s source metadata", src.Handle) return errz.Wrapf(err, "failed to read %s source metadata", src.Handle)
} }

View File

@ -141,7 +141,7 @@ func execSLQInsert(ctx context.Context, ru *run.Run, mArgs map[string]string,
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn() defer cancelFn()
destDB, err := ru.Databases.Open(ctx, destSrc) destPool, err := ru.Pools.Open(ctx, destSrc)
if err != nil { if err != nil {
return err return err
} }
@ -152,7 +152,7 @@ func execSLQInsert(ctx context.Context, ru *run.Run, mArgs map[string]string,
// stack. // stack.
inserter := libsq.NewDBWriter( inserter := libsq.NewDBWriter(
destDB, destPool,
destTbl, destTbl,
driver.OptTuningRecChanSize.Get(destSrc.Options), driver.OptTuningRecChanSize.Get(destSrc.Options),
libsq.DBWriterCreateTableIfNotExistsHook(destTbl), libsq.DBWriterCreateTableIfNotExistsHook(destTbl),
@ -209,7 +209,7 @@ func execSLQPrint(ctx context.Context, ru *run.Run, mArgs map[string]string) err
// //
// $ cat something.xlsx | sq @stdin.sheet1 // $ cat something.xlsx | sq @stdin.sheet1
func preprocessUserSLQ(ctx context.Context, ru *run.Run, args []string) (string, error) { func preprocessUserSLQ(ctx context.Context, ru *run.Run, args []string) (string, error) {
log, reg, dbases, coll := lg.FromContext(ctx), ru.DriverRegistry, ru.Databases, ru.Config.Collection log, reg, pools, coll := lg.FromContext(ctx), ru.DriverRegistry, ru.Pools, ru.Config.Collection
activeSrc := coll.Active() activeSrc := coll.Active()
if len(args) == 0 { if len(args) == 0 {
@ -240,13 +240,13 @@ func preprocessUserSLQ(ctx context.Context, ru *run.Run, args []string) (string,
// This isn't a monotable src, so we can't // This isn't a monotable src, so we can't
// just select @stdin.data. Instead we'll select // just select @stdin.data. Instead we'll select
// the first table name, as found in the source meta. // the first table name, as found in the source meta.
dbase, err := dbases.Open(ctx, activeSrc) pool, err := pools.Open(ctx, activeSrc)
if err != nil { if err != nil {
return "", err return "", err
} }
defer lg.WarnIfCloseError(log, lgm.CloseDB, dbase) defer lg.WarnIfCloseError(log, lgm.CloseDB, pool)
srcMeta, err := dbase.SourceMetadata(ctx, false) srcMeta, err := pool.SourceMetadata(ctx, false)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -118,13 +118,13 @@ func execSQL(cmd *cobra.Command, args []string) error {
// to the configured writer. // to the configured writer.
func execSQLPrint(ctx context.Context, ru *run.Run, fromSrc *source.Source) error { func execSQLPrint(ctx context.Context, ru *run.Run, fromSrc *source.Source) error {
args := ru.Args args := ru.Args
dbase, err := ru.Databases.Open(ctx, fromSrc) pool, err := ru.Pools.Open(ctx, fromSrc)
if err != nil { if err != nil {
return err return err
} }
recw := output.NewRecordWriterAdapter(ctx, ru.Writers.Record) recw := output.NewRecordWriterAdapter(ctx, ru.Writers.Record)
err = libsq.QuerySQL(ctx, dbase, nil, recw, args[0]) err = libsq.QuerySQL(ctx, pool, nil, recw, args[0])
if err != nil { if err != nil {
return err return err
} }
@ -138,27 +138,27 @@ func execSQLInsert(ctx context.Context, ru *run.Run,
fromSrc, destSrc *source.Source, destTbl string, fromSrc, destSrc *source.Source, destTbl string,
) error { ) error {
args := ru.Args args := ru.Args
dbases := ru.Databases pools := ru.Pools
ctx, cancelFn := context.WithCancel(ctx) ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn() defer cancelFn()
fromDB, err := dbases.Open(ctx, fromSrc) fromDB, err := pools.Open(ctx, fromSrc)
if err != nil { if err != nil {
return err return err
} }
destDB, err := dbases.Open(ctx, destSrc) destPool, err := pools.Open(ctx, destSrc)
if err != nil { if err != nil {
return err return err
} }
// Note: We don't need to worry about closing fromDB and // Note: We don't need to worry about closing fromDB and
// destDB because they are closed by dbases.Close, which // destPool because they are closed by pools.Close, which
// is invoked by ru.Close, and ru is closed further up the // is invoked by ru.Close, and ru is closed further up the
// stack. // stack.
inserter := libsq.NewDBWriter( inserter := libsq.NewDBWriter(
destDB, destPool,
destTbl, destTbl,
driver.OptTuningRecChanSize.Get(destSrc.Options), driver.OptTuningRecChanSize.Get(destSrc.Options),
libsq.DBWriterCreateTableIfNotExistsHook(destTbl), libsq.DBWriterCreateTableIfNotExistsHook(destTbl),

View File

@ -124,13 +124,13 @@ func execTblCopy(cmd *cobra.Command, args []string) error {
return err return err
} }
var dbase driver.Database var pool driver.Pool
dbase, err = ru.Databases.Open(ctx, tblHandles[0].src) pool, err = ru.Pools.Open(ctx, tblHandles[0].src)
if err != nil { if err != nil {
return err return err
} }
db, err := dbase.DB(ctx) db, err := pool.DB(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -257,13 +257,13 @@ func execTblDrop(cmd *cobra.Command, args []string) (err error) {
return errz.Errorf("driver type {%s} (%s) doesn't support dropping tables", tblH.src.Type, tblH.src.Handle) return errz.Errorf("driver type {%s} (%s) doesn't support dropping tables", tblH.src.Type, tblH.src.Handle)
} }
var dbase driver.Database var pool driver.Pool
if dbase, err = ru.Databases.Open(ctx, tblH.src); err != nil { if pool, err = ru.Pools.Open(ctx, tblH.src); err != nil {
return err return err
} }
var db *sql.DB var db *sql.DB
if db, err = dbase.DB(ctx); err != nil { if db, err = pool.DB(ctx); err != nil {
return err return err
} }

View File

@ -149,7 +149,7 @@ func completeSLQ(cmd *cobra.Command, args []string, toComplete string) ([]string
// completeDriverType is a completionFunc that suggests drivers. // completeDriverType is a completionFunc that suggests drivers.
func completeDriverType(cmd *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) { func completeDriverType(cmd *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) {
ru := getRun(cmd) ru := getRun(cmd)
if ru.Databases == nil { if ru.Pools == nil {
if err := preRun(cmd, ru); err != nil { if err := preRun(cmd, ru); err != nil {
lg.Unexpected(logFrom(cmd), err) lg.Unexpected(logFrom(cmd), err)
return nil, cobra.ShellCompDirectiveError return nil, cobra.ShellCompDirectiveError
@ -392,13 +392,13 @@ func (c activeSchemaCompleter) complete(cmd *cobra.Command, args []string, toCom
ctx, cancelFn := context.WithTimeout(cmd.Context(), OptShellCompletionTimeout.Get(ru.Config.Options)) ctx, cancelFn := context.WithTimeout(cmd.Context(), OptShellCompletionTimeout.Get(ru.Config.Options))
defer cancelFn() defer cancelFn()
dbase, err := ru.Databases.Open(ctx, src) pool, err := ru.Pools.Open(ctx, src)
if err != nil { if err != nil {
lg.Unexpected(log, err) lg.Unexpected(log, err)
return nil, cobra.ShellCompDirectiveError return nil, cobra.ShellCompDirectiveError
} }
db, err := dbase.DB(ctx) db, err := pool.DB(ctx)
if err != nil { if err != nil {
lg.Unexpected(log, err) lg.Unexpected(log, err)
return nil, cobra.ShellCompDirectiveError return nil, cobra.ShellCompDirectiveError
@ -768,14 +768,14 @@ func getTableNamesForHandle(ctx context.Context, ru *run.Run, handle string) ([]
return nil, err return nil, err
} }
dbase, err := ru.Databases.Open(ctx, src) pool, err := ru.Pools.Open(ctx, src)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO: We shouldn't have to load the full metadata just to get // TODO: We shouldn't have to load the full metadata just to get
// the table names. driver.SQLDriver should have a method ListTables. // the table names. driver.SQLDriver should have a method ListTables.
md, err := dbase.SourceMetadata(ctx, false) md, err := pool.SourceMetadata(ctx, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -195,11 +195,11 @@ func fetchSourceMeta(ctx context.Context, ru *run.Run, handle string) (*source.S
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
dbase, err := ru.Databases.Open(ctx, src) pool, err := ru.Pools.Open(ctx, src)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
md, err := dbase.SourceMetadata(ctx, false) md, err := pool.SourceMetadata(ctx, false)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -113,11 +113,11 @@ func buildTableStructureDiff(cfg *Config, showRowCounts bool, td1, td2 *tableDat
func fetchTableMeta(ctx context.Context, ru *run.Run, src *source.Source, table string) ( func fetchTableMeta(ctx context.Context, ru *run.Run, src *source.Source, table string) (
*source.TableMetadata, error, *source.TableMetadata, error,
) { ) {
dbase, err := ru.Databases.Open(ctx, src) pool, err := ru.Pools.Open(ctx, src)
if err != nil { if err != nil {
return nil, err return nil, err
} }
md, err := dbase.TableMetadata(ctx, table) md, err := pool.TableMetadata(ctx, table)
if err != nil { if err != nil {
if errz.IsErrNotExist(err) { if errz.IsErrNotExist(err) {
return nil, nil //nolint:nilnil return nil, nil //nolint:nilnil

View File

@ -51,11 +51,11 @@ func TestRecordWriterAdapter(t *testing.T) {
th := testh.New(t) th := testh.New(t)
src := th.Source(tc.handle) src := th.Source(tc.handle)
dbase := th.Open(src) pool := th.Open(src)
sink := &testh.RecordSink{} sink := &testh.RecordSink{}
recw := output.NewRecordWriterAdapter(th.Context, sink) recw := output.NewRecordWriterAdapter(th.Context, sink)
err := libsq.QuerySQL(th.Context, dbase, nil, recw, tc.sqlQuery) err := libsq.QuerySQL(th.Context, pool, nil, recw, tc.sqlQuery)
require.NoError(t, err) require.NoError(t, err)
written, err := recw.Wait() written, err := recw.Wait()
require.NoError(t, err) require.NoError(t, err)

View File

@ -157,19 +157,19 @@ func FinishRunInit(ctx context.Context, ru *run.Run) error {
ru.DriverRegistry = driver.NewRegistry(log) ru.DriverRegistry = driver.NewRegistry(log)
dr := ru.DriverRegistry dr := ru.DriverRegistry
ru.Databases = driver.NewDatabases(log, dr, scratchSrcFunc) ru.Pools = driver.NewPools(log, dr, scratchSrcFunc)
ru.Cleanup.AddC(ru.Databases) ru.Cleanup.AddC(ru.Pools)
dr.AddProvider(sqlite3.Type, &sqlite3.Provider{Log: log}) dr.AddProvider(sqlite3.Type, &sqlite3.Provider{Log: log})
dr.AddProvider(postgres.Type, &postgres.Provider{Log: log}) dr.AddProvider(postgres.Type, &postgres.Provider{Log: log})
dr.AddProvider(sqlserver.Type, &sqlserver.Provider{Log: log}) dr.AddProvider(sqlserver.Type, &sqlserver.Provider{Log: log})
dr.AddProvider(mysql.Type, &mysql.Provider{Log: log}) dr.AddProvider(mysql.Type, &mysql.Provider{Log: log})
csvp := &csv.Provider{Log: log, Scratcher: ru.Databases, Files: ru.Files} csvp := &csv.Provider{Log: log, Scratcher: ru.Pools, Files: ru.Files}
dr.AddProvider(csv.TypeCSV, csvp) dr.AddProvider(csv.TypeCSV, csvp)
dr.AddProvider(csv.TypeTSV, csvp) dr.AddProvider(csv.TypeTSV, csvp)
ru.Files.AddDriverDetectors(csv.DetectCSV, csv.DetectTSV) ru.Files.AddDriverDetectors(csv.DetectCSV, csv.DetectTSV)
jsonp := &json.Provider{Log: log, Scratcher: ru.Databases, Files: ru.Files} jsonp := &json.Provider{Log: log, Scratcher: ru.Pools, Files: ru.Files}
dr.AddProvider(json.TypeJSON, jsonp) dr.AddProvider(json.TypeJSON, jsonp)
dr.AddProvider(json.TypeJSONA, jsonp) dr.AddProvider(json.TypeJSONA, jsonp)
dr.AddProvider(json.TypeJSONL, jsonp) dr.AddProvider(json.TypeJSONL, jsonp)
@ -180,7 +180,7 @@ func FinishRunInit(ctx context.Context, ru *run.Run) error {
json.DetectJSONL(sampleSize), json.DetectJSONL(sampleSize),
) )
dr.AddProvider(xlsx.Type, &xlsx.Provider{Log: log, Scratcher: ru.Databases, Files: ru.Files}) dr.AddProvider(xlsx.Type, &xlsx.Provider{Log: log, Scratcher: ru.Pools, Files: ru.Files})
ru.Files.AddDriverDetectors(xlsx.DetectXLSX) ru.Files.AddDriverDetectors(xlsx.DetectXLSX)
// One day we may have more supported user driver genres. // One day we may have more supported user driver genres.
userDriverImporters := map[string]userdriver.ImportFunc{ userDriverImporters := map[string]userdriver.ImportFunc{
@ -210,7 +210,7 @@ func FinishRunInit(ctx context.Context, ru *run.Run) error {
Log: log, Log: log,
DriverDef: userDriverDef, DriverDef: userDriverDef,
ImportFn: importFn, ImportFn: importFn,
Scratcher: ru.Databases, Scratcher: ru.Pools,
Files: ru.Files, Files: ru.Files,
} }

View File

@ -75,8 +75,8 @@ type Run struct {
// Files manages file access. // Files manages file access.
Files *source.Files Files *source.Files
// Databases mediates access to databases. // Pools mediates access to db pools.
Databases *driver.Databases Pools *driver.Pools
// Writers holds the various writer types that // Writers holds the various writer types that
// the CLI uses to print output. // the CLI uses to print output.
@ -102,9 +102,9 @@ func (ru *Run) Close() error {
func NewQueryContext(ru *Run, args map[string]string) *libsq.QueryContext { func NewQueryContext(ru *Run, args map[string]string) *libsq.QueryContext {
return &libsq.QueryContext{ return &libsq.QueryContext{
Collection: ru.Config.Collection, Collection: ru.Config.Collection,
DBOpener: ru.Databases, PoolOpener: ru.Pools,
JoinDBOpener: ru.Databases, JoinPoolOpener: ru.Pools,
ScratchDBOpener: ru.Databases, ScratchPoolOpener: ru.Pools,
Args: args, Args: args,
} }
} }

View File

@ -28,7 +28,7 @@ const (
// Provider implements driver.Provider. // Provider implements driver.Provider.
type Provider struct { type Provider struct {
Log *slog.Logger Log *slog.Logger
Scratcher driver.ScratchDatabaseOpener Scratcher driver.ScratchPoolOpener
Files *source.Files Files *source.Files
} }
@ -48,7 +48,7 @@ func (d *Provider) DriverFor(typ source.DriverType) (driver.Driver, error) {
type driveri struct { type driveri struct {
log *slog.Logger log *slog.Logger
typ source.DriverType typ source.DriverType
scratcher driver.ScratchDatabaseOpener scratcher driver.ScratchPoolOpener
files *source.Files files *source.Files
} }
@ -65,27 +65,27 @@ func (d *driveri) DriverMetadata() driver.Metadata {
return md return md
} }
// Open implements driver.DatabaseOpener. // Open implements driver.PoolOpener.
func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) { func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) {
lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src)
dbase := &database{ pool := &pool{
log: d.log, log: d.log,
src: src, src: src,
files: d.files, files: d.files,
} }
var err error var err error
dbase.impl, err = d.scratcher.OpenScratch(ctx, src.Handle) pool.impl, err = d.scratcher.OpenScratch(ctx, src.Handle)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err = ingestCSV(ctx, src, d.files.OpenFunc(src), dbase.impl); err != nil { if err = ingestCSV(ctx, src, d.files.OpenFunc(src), pool.impl); err != nil {
return nil, err return nil, err
} }
return dbase, nil return pool, nil
} }
// Truncate implements driver.Driver. // Truncate implements driver.Driver.
@ -113,37 +113,37 @@ func (d *driveri) Ping(_ context.Context, src *source.Source) error {
return nil return nil
} }
// database implements driver.Database. // pool implements driver.Pool.
type database struct { type pool struct {
log *slog.Logger log *slog.Logger
src *source.Source src *source.Source
impl driver.Database impl driver.Pool
files *source.Files files *source.Files
} }
// DB implements driver.Database. // DB implements driver.Pool.
func (d *database) DB(ctx context.Context) (*sql.DB, error) { func (p *pool) DB(ctx context.Context) (*sql.DB, error) {
return d.impl.DB(ctx) return p.impl.DB(ctx)
} }
// SQLDriver implements driver.Database. // SQLDriver implements driver.Pool.
func (d *database) SQLDriver() driver.SQLDriver { func (p *pool) SQLDriver() driver.SQLDriver {
return d.impl.SQLDriver() return p.impl.SQLDriver()
} }
// Source implements driver.Database. // Source implements driver.Pool.
func (d *database) Source() *source.Source { func (p *pool) Source() *source.Source {
return d.src return p.src
} }
// TableMetadata implements driver.Database. // TableMetadata implements driver.Pool.
func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { func (p *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) {
if tblName != source.MonotableName { if tblName != source.MonotableName {
return nil, errz.Errorf("table name should be %s for CSV/TSV etc., but got: %s", return nil, errz.Errorf("table name should be %s for CSV/TSV etc., but got: %s",
source.MonotableName, tblName) source.MonotableName, tblName)
} }
srcMeta, err := d.SourceMetadata(ctx, false) srcMeta, err := p.SourceMetadata(ctx, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -152,23 +152,23 @@ func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.T
return srcMeta.Tables[0], nil return srcMeta.Tables[0], nil
} }
// SourceMetadata implements driver.Database. // SourceMetadata implements driver.Pool.
func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { func (p *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) {
md, err := d.impl.SourceMetadata(ctx, noSchema) md, err := p.impl.SourceMetadata(ctx, noSchema)
if err != nil { if err != nil {
return nil, err return nil, err
} }
md.Handle = d.src.Handle md.Handle = p.src.Handle
md.Location = d.src.Location md.Location = p.src.Location
md.Driver = d.src.Type md.Driver = p.src.Type
md.Name, err = source.LocationFileName(d.src) md.Name, err = source.LocationFileName(p.src)
if err != nil { if err != nil {
return nil, err return nil, err
} }
md.Size, err = d.files.Size(d.src) md.Size, err = p.files.Size(p.src)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -177,9 +177,9 @@ func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.M
return md, nil return md, nil
} }
// Close implements driver.Database. // Close implements driver.Pool.
func (d *database) Close() error { func (p *pool) Close() error {
d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) p.log.Debug(lgm.CloseDB, lga.Handle, p.src.Handle)
return errz.Err(d.impl.Close()) return errz.Err(p.impl.Close())
} }

View File

@ -54,7 +54,7 @@ Possible values are: comma, space, pipe, tab, colon, semi, period.`,
) )
// ingestCSV loads the src CSV data into scratchDB. // ingestCSV loads the src CSV data into scratchDB.
func ingestCSV(ctx context.Context, src *source.Source, openFn source.FileOpenFunc, scratchDB driver.Database) error { func ingestCSV(ctx context.Context, src *source.Source, openFn source.FileOpenFunc, scratchPool driver.Pool) error {
log := lg.FromContext(ctx) log := lg.FromContext(ctx)
var err error var err error
@ -109,17 +109,17 @@ func ingestCSV(ctx context.Context, src *source.Source, openFn source.FileOpenFu
// And now we need to create the dest table in scratchDB // And now we need to create the dest table in scratchDB
tblDef := createTblDef(source.MonotableName, header, kinds) tblDef := createTblDef(source.MonotableName, header, kinds)
db, err := scratchDB.DB(ctx) db, err := scratchPool.DB(ctx)
if err != nil { if err != nil {
return err return err
} }
err = scratchDB.SQLDriver().CreateTable(ctx, db, tblDef) err = scratchPool.SQLDriver().CreateTable(ctx, db, tblDef)
if err != nil { if err != nil {
return errz.Wrap(err, "csv: failed to create dest scratch table") return errz.Wrap(err, "csv: failed to create dest scratch table")
} }
recMeta, err := getIngestRecMeta(ctx, scratchDB, tblDef) recMeta, err := getIngestRecMeta(ctx, scratchPool, tblDef)
if err != nil { if err != nil {
return err return err
} }
@ -129,9 +129,9 @@ func ingestCSV(ctx context.Context, src *source.Source, openFn source.FileOpenFu
} }
insertWriter := libsq.NewDBWriter( insertWriter := libsq.NewDBWriter(
scratchDB, scratchPool,
tblDef.Name, tblDef.Name,
driver.OptTuningRecChanSize.Get(scratchDB.Source().Options), driver.OptTuningRecChanSize.Get(scratchPool.Source().Options),
) )
err = execInsert(ctx, insertWriter, recMeta, mungers, recs, cr) err = execInsert(ctx, insertWriter, recMeta, mungers, recs, cr)
if err != nil { if err != nil {
@ -145,7 +145,7 @@ func ingestCSV(ctx context.Context, src *source.Source, openFn source.FileOpenFu
log.Debug("Inserted rows", log.Debug("Inserted rows",
lga.Count, inserted, lga.Count, inserted,
lga.Target, source.Target(scratchDB.Source(), tblDef.Name), lga.Target, source.Target(scratchPool.Source(), tblDef.Name),
) )
return nil return nil
} }

View File

@ -122,13 +122,13 @@ func createTblDef(tblName string, colNames []string, kinds []kind.Kind) *sqlmode
} }
// getIngestRecMeta returns record.Meta to use with RecordWriter.Open. // getIngestRecMeta returns record.Meta to use with RecordWriter.Open.
func getIngestRecMeta(ctx context.Context, scratchDB driver.Database, tblDef *sqlmodel.TableDef) (record.Meta, error) { func getIngestRecMeta(ctx context.Context, scratchPool driver.Pool, tblDef *sqlmodel.TableDef) (record.Meta, error) {
db, err := scratchDB.DB(ctx) db, err := scratchPool.DB(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
drvr := scratchDB.SQLDriver() drvr := scratchPool.SQLDriver()
colTypes, err := drvr.TableColumnTypes(ctx, db, tblDef.Name, tblDef.ColNames()) colTypes, err := drvr.TableColumnTypes(ctx, db, tblDef.Name, tblDef.ColNames())
if err != nil { if err != nil {

View File

@ -30,11 +30,11 @@ import (
// importJob describes a single import job, where the JSON // importJob describes a single import job, where the JSON
// at fromSrc is read via openFn and the resulting records // at fromSrc is read via openFn and the resulting records
// are written to destDB. // are written to destPool.
type importJob struct { type importJob struct {
fromSrc *source.Source fromSrc *source.Source
openFn source.FileOpenFunc openFn source.FileOpenFunc
destDB driver.Database destPool driver.Pool
// sampleSize is the maximum number of values to // sampleSize is the maximum number of values to
// sample to determine the kind of an element. // sample to determine the kind of an element.
@ -57,18 +57,18 @@ var (
) )
// getRecMeta returns record.Meta to use with RecordWriter.Open. // getRecMeta returns record.Meta to use with RecordWriter.Open.
func getRecMeta(ctx context.Context, scratchDB driver.Database, tblDef *sqlmodel.TableDef) (record.Meta, error) { func getRecMeta(ctx context.Context, scratchPool driver.Pool, tblDef *sqlmodel.TableDef) (record.Meta, error) {
db, err := scratchDB.DB(ctx) db, err := scratchPool.DB(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
colTypes, err := scratchDB.SQLDriver().TableColumnTypes(ctx, db, tblDef.Name, tblDef.ColNames()) colTypes, err := scratchPool.SQLDriver().TableColumnTypes(ctx, db, tblDef.Name, tblDef.ColNames())
if err != nil { if err != nil {
return nil, err return nil, err
} }
destMeta, _, err := scratchDB.SQLDriver().RecordMeta(ctx, colTypes) destMeta, _, err := scratchPool.SQLDriver().RecordMeta(ctx, colTypes)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -143,9 +143,9 @@ func importJSON(ctx context.Context, job importJob) error {
} }
defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r) defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r)
drvr := job.destDB.SQLDriver() drvr := job.destPool.SQLDriver()
db, err := job.destDB.DB(ctx) db, err := job.destPool.DB(ctx)
if err != nil { if err != nil {
return err return err
} }

View File

@ -122,19 +122,19 @@ func importJSONA(ctx context.Context, job importJob) error {
colNames[i] = stringz.GenerateAlphaColName(i, true) colNames[i] = stringz.GenerateAlphaColName(i, true)
} }
// And now we need to create the dest table in destDB // And now we need to create the dest table in destPool
tblDef := sqlmodel.NewTableDef(source.MonotableName, colNames, colKinds) tblDef := sqlmodel.NewTableDef(source.MonotableName, colNames, colKinds)
db, err := job.destDB.DB(ctx) db, err := job.destPool.DB(ctx)
if err != nil { if err != nil {
return err return err
} }
err = job.destDB.SQLDriver().CreateTable(ctx, db, tblDef) err = job.destPool.SQLDriver().CreateTable(ctx, db, tblDef)
if err != nil { if err != nil {
return errz.Wrapf(err, "import %s: failed to create dest scratch table", TypeJSONA) return errz.Wrapf(err, "import %s: failed to create dest scratch table", TypeJSONA)
} }
recMeta, err := getRecMeta(ctx, job.destDB, tblDef) recMeta, err := getRecMeta(ctx, job.destPool, tblDef)
if err != nil { if err != nil {
return err return err
} }
@ -146,9 +146,9 @@ func importJSONA(ctx context.Context, job importJob) error {
defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r) defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r)
insertWriter := libsq.NewDBWriter( insertWriter := libsq.NewDBWriter(
job.destDB, job.destPool,
tblDef.Name, tblDef.Name,
driver.OptTuningRecChanSize.Get(job.destDB.Source().Options), driver.OptTuningRecChanSize.Get(job.destPool.Source().Options),
) )
var cancelFn context.CancelFunc var cancelFn context.CancelFunc
@ -174,7 +174,7 @@ func importJSONA(ctx context.Context, job importJob) error {
log.Debug("Inserted rows", log.Debug("Inserted rows",
lga.Count, inserted, lga.Count, inserted,
lga.Target, source.Target(job.destDB.Source(), tblDef.Name), lga.Target, source.Target(job.destPool.Source(), tblDef.Name),
) )
return nil return nil
} }

View File

@ -94,8 +94,8 @@ func importJSONL(ctx context.Context, job importJob) error { //nolint:gocognit
} }
defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r) defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r)
drvr := job.destDB.SQLDriver() drvr := job.destPool.SQLDriver()
db, err := job.destDB.DB(ctx) db, err := job.destPool.DB(ctx)
if err != nil { if err != nil {
return err return err
} }

View File

@ -85,8 +85,8 @@ func TestImportJSONL_Flat(t *testing.T) {
} }
} }
th, src, _, dbase, _ := testh.NewWith(t, testsrc.EmptyDB) th, src, _, pool, _ := testh.NewWith(t, testsrc.EmptyDB)
job := json.NewImportJob(src, openFn, dbase, 0, true) job := json.NewImportJob(src, openFn, pool, 0, true)
err := json.ImportJSONL(th.Context, job) err := json.ImportJSONL(th.Context, job)
if tc.wantErr { if tc.wantErr {
@ -110,8 +110,8 @@ func TestImportJSON_Flat(t *testing.T) {
return os.Open("testdata/actor.json") return os.Open("testdata/actor.json")
} }
th, src, _, dbase, _ := testh.NewWith(t, testsrc.EmptyDB) th, src, _, pool, _ := testh.NewWith(t, testsrc.EmptyDB)
job := json.NewImportJob(src, openFn, dbase, 0, true) job := json.NewImportJob(src, openFn, pool, 0, true)
err := json.ImportJSON(th.Context, job) err := json.ImportJSON(th.Context, job)
require.NoError(t, err) require.NoError(t, err)

View File

@ -26,7 +26,7 @@ var (
// newImportJob is a constructor for the unexported importJob type. // newImportJob is a constructor for the unexported importJob type.
// If sampleSize <= 0, a default value is used. // If sampleSize <= 0, a default value is used.
func newImportJob(fromSrc *source.Source, openFn source.FileOpenFunc, destDB driver.Database, sampleSize int, func newImportJob(fromSrc *source.Source, openFn source.FileOpenFunc, destPool driver.Pool, sampleSize int,
flatten bool, flatten bool,
) importJob { ) importJob {
if sampleSize <= 0 { if sampleSize <= 0 {
@ -36,7 +36,7 @@ func newImportJob(fromSrc *source.Source, openFn source.FileOpenFunc, destDB dri
return importJob{ return importJob{
fromSrc: fromSrc, fromSrc: fromSrc,
openFn: openFn, openFn: openFn,
destDB: destDB, destPool: destPool,
sampleSize: sampleSize, sampleSize: sampleSize,
flatten: flatten, flatten: flatten,
} }

View File

@ -36,7 +36,7 @@ const (
// Provider implements driver.Provider. // Provider implements driver.Provider.
type Provider struct { type Provider struct {
Log *slog.Logger Log *slog.Logger
Scratcher driver.ScratchDatabaseOpener Scratcher driver.ScratchPoolOpener
Files *source.Files Files *source.Files
} }
@ -69,7 +69,7 @@ type driveri struct {
log *slog.Logger log *slog.Logger
typ source.DriverType typ source.DriverType
importFn importFunc importFn importFunc
scratcher driver.ScratchDatabaseOpener scratcher driver.ScratchPoolOpener
files *source.Files files *source.Files
} }
@ -92,28 +92,28 @@ func (d *driveri) DriverMetadata() driver.Metadata {
return md return md
} }
// Open implements driver.DatabaseOpener. // Open implements driver.PoolOpener.
func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) { func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) {
lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src)
dbase := &database{log: d.log, src: src, clnup: cleanup.New(), files: d.files} p := &pool{log: d.log, src: src, clnup: cleanup.New(), files: d.files}
r, err := d.files.Open(src) r, err := d.files.Open(src)
if err != nil { if err != nil {
return nil, err return nil, err
} }
dbase.impl, err = d.scratcher.OpenScratch(ctx, src.Handle) p.impl, err = d.scratcher.OpenScratch(ctx, src.Handle)
if err != nil { if err != nil {
lg.WarnIfCloseError(d.log, lgm.CloseFileReader, r) lg.WarnIfCloseError(d.log, lgm.CloseFileReader, r)
lg.WarnIfFuncError(d.log, lgm.CloseDB, dbase.clnup.Run) lg.WarnIfFuncError(d.log, lgm.CloseDB, p.clnup.Run)
return nil, err return nil, err
} }
job := importJob{ job := importJob{
fromSrc: src, fromSrc: src,
openFn: d.files.OpenFunc(src), openFn: d.files.OpenFunc(src),
destDB: dbase.impl, destPool: p.impl,
sampleSize: driver.OptIngestSampleSize.Get(src.Options), sampleSize: driver.OptIngestSampleSize.Get(src.Options),
flatten: true, flatten: true,
} }
@ -121,7 +121,7 @@ func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database
err = d.importFn(ctx, job) err = d.importFn(ctx, job)
if err != nil { if err != nil {
lg.WarnIfCloseError(d.log, lgm.CloseFileReader, r) lg.WarnIfCloseError(d.log, lgm.CloseFileReader, r)
lg.WarnIfFuncError(d.log, lgm.CloseDB, dbase.clnup.Run) lg.WarnIfFuncError(d.log, lgm.CloseDB, p.clnup.Run)
return nil, err return nil, err
} }
@ -130,7 +130,7 @@ func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database
return nil, err return nil, err
} }
return dbase, nil return p, nil
} }
// Truncate implements driver.Driver. // Truncate implements driver.Driver.
@ -160,38 +160,38 @@ func (d *driveri) Ping(_ context.Context, src *source.Source) error {
return nil return nil
} }
// database implements driver.Database. // pool implements driver.Pool.
type database struct { type pool struct {
log *slog.Logger log *slog.Logger
src *source.Source src *source.Source
impl driver.Database impl driver.Pool
clnup *cleanup.Cleanup clnup *cleanup.Cleanup
files *source.Files files *source.Files
} }
// DB implements driver.Database. // DB implements driver.Pool.
func (d *database) DB(ctx context.Context) (*sql.DB, error) { func (p *pool) DB(ctx context.Context) (*sql.DB, error) {
return d.impl.DB(ctx) return p.impl.DB(ctx)
} }
// SQLDriver implements driver.Database. // SQLDriver implements driver.Pool.
func (d *database) SQLDriver() driver.SQLDriver { func (p *pool) SQLDriver() driver.SQLDriver {
return d.impl.SQLDriver() return p.impl.SQLDriver()
} }
// Source implements driver.Database. // Source implements driver.Pool.
func (d *database) Source() *source.Source { func (p *pool) Source() *source.Source {
return d.src return p.src
} }
// TableMetadata implements driver.Database. // TableMetadata implements driver.Pool.
func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { func (p *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) {
if tblName != source.MonotableName { if tblName != source.MonotableName {
return nil, errz.Errorf("table name should be %s for CSV/TSV etc., but got: %s", return nil, errz.Errorf("table name should be %s for CSV/TSV etc., but got: %s",
source.MonotableName, tblName) source.MonotableName, tblName)
} }
srcMeta, err := d.SourceMetadata(ctx, false) srcMeta, err := p.SourceMetadata(ctx, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -200,23 +200,23 @@ func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.T
return srcMeta.Tables[0], nil return srcMeta.Tables[0], nil
} }
// SourceMetadata implements driver.Database. // SourceMetadata implements driver.Pool.
func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { func (p *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) {
md, err := d.impl.SourceMetadata(ctx, noSchema) md, err := p.impl.SourceMetadata(ctx, noSchema)
if err != nil { if err != nil {
return nil, err return nil, err
} }
md.Handle = d.src.Handle md.Handle = p.src.Handle
md.Location = d.src.Location md.Location = p.src.Location
md.Driver = d.src.Type md.Driver = p.src.Type
md.Name, err = source.LocationFileName(d.src) md.Name, err = source.LocationFileName(p.src)
if err != nil { if err != nil {
return nil, err return nil, err
} }
md.Size, err = d.files.Size(d.src) md.Size, err = p.files.Size(p.src)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -225,9 +225,9 @@ func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.M
return md, nil return md, nil
} }
// Close implements driver.Database. // Close implements driver.Pool.
func (d *database) Close() error { func (p *pool) Close() error {
d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) p.log.Debug(lgm.CloseDB, lga.Handle, p.src.Handle)
return errz.Combine(d.impl.Close(), d.clnup.Run()) return errz.Combine(p.impl.Close(), p.clnup.Run())
} }

View File

@ -165,7 +165,7 @@ func getNewRecordFunc(rowMeta record.Meta) driver.NewRecordFunc {
} }
// getTableMetadata gets the metadata for a single table. It is the // getTableMetadata gets the metadata for a single table. It is the
// implementation of driver.Database.TableMetadata. // implementation of driver.Pool.TableMetadata.
func getTableMetadata(ctx context.Context, db sqlz.DB, tblName string) (*source.TableMetadata, error) { func getTableMetadata(ctx context.Context, db sqlz.DB, tblName string) (*source.TableMetadata, error) {
query := `SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, TABLE_COMMENT, (DATA_LENGTH + INDEX_LENGTH) AS table_size, query := `SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, TABLE_COMMENT, (DATA_LENGTH + INDEX_LENGTH) AS table_size,
(SELECT COUNT(*) FROM ` + "`" + tblName + "`" + `) AS row_count (SELECT COUNT(*) FROM ` + "`" + tblName + "`" + `) AS row_count
@ -243,7 +243,7 @@ ORDER BY cols.ordinal_position ASC`
return cols, errw(rows.Err()) return cols, errw(rows.Err())
} }
// getSourceMetadata is the implementation of driver.Database.SourceMetadata. // getSourceMetadata is the implementation of driver.Pool.SourceMetadata.
// //
// Multiple queries are required to build the SourceMetadata, and this // Multiple queries are required to build the SourceMetadata, and this
// impl makes use of errgroup to make concurrent queries. In the initial // impl makes use of errgroup to make concurrent queries. In the initial

View File

@ -79,8 +79,8 @@ func TestDatabase_SourceMetadata_MySQL(t *testing.T) {
t.Run(handle, func(t *testing.T) { t.Run(handle, func(t *testing.T) {
t.Parallel() t.Parallel()
th, _, _, dbase, _ := testh.NewWith(t, handle) th, _, _, pool, _ := testh.NewWith(t, handle)
md, err := dbase.SourceMetadata(th.Context, false) md, err := pool.SourceMetadata(th.Context, false)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "sakila", md.Name) require.Equal(t, "sakila", md.Name)
require.Equal(t, handle, md.Handle) require.Equal(t, handle, md.Handle)
@ -102,8 +102,8 @@ func TestDatabase_TableMetadata(t *testing.T) {
t.Run(handle, func(t *testing.T) { t.Run(handle, func(t *testing.T) {
t.Parallel() t.Parallel()
th, _, _, dbase, _ := testh.NewWith(t, handle) th, _, _, pool, _ := testh.NewWith(t, handle)
md, err := dbase.TableMetadata(th.Context, sakila.TblActor) md, err := pool.TableMetadata(th.Context, sakila.TblActor)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, sakila.TblActor, md.Name) require.Equal(t, sakila.TblActor, md.Name)
}) })

View File

@ -410,8 +410,8 @@ func (d *driveri) getTableRecordMeta(ctx context.Context, db sqlz.DB, tblName st
return destCols, nil return destCols, nil
} }
// Open implements driver.DatabaseOpener. // Open implements driver.PoolOpener.
func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) { func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) {
lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src)
db, err := d.doOpen(ctx, src) db, err := d.doOpen(ctx, src)
@ -423,7 +423,7 @@ func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database
return nil, err return nil, err
} }
return &database{log: d.log, db: db, src: src, drvr: d}, nil return &pool{log: d.log, db: db, src: src, drvr: d}, nil
} }
func (d *driveri) doOpen(ctx context.Context, src *source.Source) (*sql.DB, error) { func (d *driveri) doOpen(ctx context.Context, src *source.Source) (*sql.DB, error) {
@ -519,43 +519,43 @@ func (d *driveri) Truncate(ctx context.Context, src *source.Source, tbl string,
return beforeCount, errw(tx.Commit()) return beforeCount, errw(tx.Commit())
} }
// database implements driver.Database. // pool implements driver.Pool.
type database struct { type pool struct {
log *slog.Logger log *slog.Logger
db *sql.DB db *sql.DB
src *source.Source src *source.Source
drvr *driveri drvr *driveri
} }
// DB implements driver.Database. // DB implements driver.Pool.
func (d *database) DB(context.Context) (*sql.DB, error) { func (p *pool) DB(context.Context) (*sql.DB, error) {
return d.db, nil return p.db, nil
} }
// SQLDriver implements driver.Database. // SQLDriver implements driver.Pool.
func (d *database) SQLDriver() driver.SQLDriver { func (p *pool) SQLDriver() driver.SQLDriver {
return d.drvr return p.drvr
} }
// Source implements driver.Database. // Source implements driver.Pool.
func (d *database) Source() *source.Source { func (p *pool) Source() *source.Source {
return d.src return p.src
} }
// TableMetadata implements driver.Database. // TableMetadata implements driver.Pool.
func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { func (p *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) {
return getTableMetadata(ctx, d.db, tblName) return getTableMetadata(ctx, p.db, tblName)
} }
// SourceMetadata implements driver.Database. // SourceMetadata implements driver.Pool.
func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { func (p *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) {
return getSourceMetadata(ctx, d.src, d.db, noSchema) return getSourceMetadata(ctx, p.src, p.db, noSchema)
} }
// Close implements driver.Database. // Close implements driver.Pool.
func (d *database) Close() error { func (p *pool) Close() error {
d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) p.log.Debug(lgm.CloseDB, lga.Handle, p.src.Handle)
return errw(d.db.Close()) return errw(p.db.Close())
} }
// dsnFromLocation builds the mysql driver DSN from src.Location. // dsnFromLocation builds the mysql driver DSN from src.Location.

View File

@ -144,8 +144,8 @@ func (d *driveri) Renderer() *render.Renderer {
return r return r
} }
// Open implements driver.DatabaseOpener. // Open implements driver.PoolOpener.
func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) { func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) {
lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src)
db, err := d.doOpen(ctx, src) db, err := d.doOpen(ctx, src)
@ -157,7 +157,7 @@ func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database
return nil, err return nil, err
} }
return &database{log: d.log, db: db, src: src, drvr: d}, nil return &pool{log: d.log, db: db, src: src, drvr: d}, nil
} }
func (d *driveri) doOpen(ctx context.Context, src *source.Source) (*sql.DB, error) { func (d *driveri) doOpen(ctx context.Context, src *source.Source) (*sql.DB, error) {
@ -709,32 +709,32 @@ func (d *driveri) RecordMeta(ctx context.Context, colTypes []*sql.ColumnType) (
return recMeta, mungeFn, nil return recMeta, mungeFn, nil
} }
// database is the postgres implementation of driver.Database. // pool is the postgres implementation of driver.Pool.
type database struct { type pool struct {
log *slog.Logger log *slog.Logger
drvr *driveri drvr *driveri
db *sql.DB db *sql.DB
src *source.Source src *source.Source
} }
// DB implements driver.Database. // DB implements driver.Pool.
func (d *database) DB(context.Context) (*sql.DB, error) { func (p *pool) DB(context.Context) (*sql.DB, error) {
return d.db, nil return p.db, nil
} }
// SQLDriver implements driver.Database. // SQLDriver implements driver.Pool.
func (d *database) SQLDriver() driver.SQLDriver { func (p *pool) SQLDriver() driver.SQLDriver {
return d.drvr return p.drvr
} }
// Source implements driver.Database. // Source implements driver.Pool.
func (d *database) Source() *source.Source { func (p *pool) Source() *source.Source {
return d.src return p.src
} }
// TableMetadata implements driver.Database. // TableMetadata implements driver.Pool.
func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { func (p *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) {
db, err := d.DB(ctx) db, err := p.DB(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -742,20 +742,20 @@ func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.T
return getTableMetadata(ctx, db, tblName) return getTableMetadata(ctx, db, tblName)
} }
// SourceMetadata implements driver.Database. // SourceMetadata implements driver.Pool.
func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { func (p *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) {
db, err := d.DB(ctx) db, err := p.DB(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return getSourceMetadata(ctx, d.src, db, noSchema) return getSourceMetadata(ctx, p.src, db, noSchema)
} }
// Close implements driver.Database. // Close implements driver.Pool.
func (d *database) Close() error { func (p *pool) Close() error {
d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) p.log.Debug(lgm.CloseDB, lga.Handle, p.src.Handle)
err := d.db.Close() err := p.db.Close()
if err != nil { if err != nil {
return errw(err) return errw(err)
} }

View File

@ -222,12 +222,12 @@ func TestAlternateSchema(t *testing.T) {
src2 := src.Clone() src2 := src.Clone()
src2.Handle += "2" src2.Handle += "2"
src2.Location += "?search_path=" + schemaName src2.Location += "?search_path=" + schemaName
dbase2 := th.Open(src2) pool2 := th.Open(src2)
md2, err := dbase2.SourceMetadata(ctx, false) md2, err := pool2.SourceMetadata(ctx, false)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, schemaName, md2.Schema) require.Equal(t, schemaName, md2.Schema)
tblMeta2, err := dbase2.TableMetadata(ctx, tblName) tblMeta2, err := pool2.TableMetadata(ctx, tblName)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, int64(wantRowCount), tblMeta2.RowCount) require.Equal(t, int64(wantRowCount), tblMeta2.RowCount)
} }
@ -279,10 +279,10 @@ func BenchmarkDatabase_SourceMetadata(b *testing.B) {
b.Run(handle, func(b *testing.B) { b.Run(handle, func(b *testing.B) {
th := testh.New(b) th := testh.New(b)
th.Log = lg.Discard() th.Log = lg.Discard()
dbase := th.Open(th.Source(handle)) pool := th.Open(th.Source(handle))
b.ResetTimer() b.ResetTimer()
md, err := dbase.SourceMetadata(th.Context, false) md, err := pool.SourceMetadata(th.Context, false)
require.NoError(b, err) require.NoError(b, err)
require.Equal(b, "sakila", md.Name) require.Equal(b, "sakila", md.Name)
}) })

View File

@ -204,7 +204,7 @@ func TestRecordMetadata(t *testing.T) {
t.Run(tc.tbl, func(t *testing.T) { t.Run(tc.tbl, func(t *testing.T) {
t.Parallel() t.Parallel()
th, _, drvr, dbase, db := testh.NewWith(t, sakila.SL3) th, _, drvr, pool, db := testh.NewWith(t, sakila.SL3)
query := fmt.Sprintf("SELECT %s FROM %s", strings.Join(tc.colNames, ", "), tc.tbl) query := fmt.Sprintf("SELECT %s FROM %s", strings.Join(tc.colNames, ", "), tc.tbl)
rows, err := db.QueryContext(th.Context, query) //nolint:rowserrcheck rows, err := db.QueryContext(th.Context, query) //nolint:rowserrcheck
@ -235,7 +235,7 @@ func TestRecordMetadata(t *testing.T) {
} }
// Now check our table metadata // Now check our table metadata
gotTblMeta, err := dbase.TableMetadata(th.Context, tc.tbl) gotTblMeta, err := pool.TableMetadata(th.Context, tc.tbl)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, tc.tbl, gotTblMeta.Name) require.Equal(t, tc.tbl, gotTblMeta.Name)
require.Equal(t, tc.rowCount, gotTblMeta.RowCount) require.Equal(t, tc.rowCount, gotTblMeta.RowCount)
@ -285,12 +285,12 @@ func TestAggregateFuncsQuery(t *testing.T) {
func BenchmarkDatabase_SourceMetadata(b *testing.B) { func BenchmarkDatabase_SourceMetadata(b *testing.B) {
const numTables = 1000 const numTables = 1000
th, src, drvr, dbase, db := testh.NewWith(b, testsrc.MiscDB) th, src, drvr, pool, db := testh.NewWith(b, testsrc.MiscDB)
tblNames := createTypeTestTbls(th, src, numTables, true) tblNames := createTypeTestTbls(th, src, numTables, true)
b.ResetTimer() b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
srcMeta, err := dbase.SourceMetadata(th.Context, false) srcMeta, err := pool.SourceMetadata(th.Context, false)
require.NoError(b, err) require.NoError(b, err)
require.True(b, len(srcMeta.Tables) > len(tblNames)) require.True(b, len(srcMeta.Tables) > len(tblNames))
} }

View File

@ -130,8 +130,8 @@ func (d *driveri) DriverMetadata() driver.Metadata {
} }
} }
// Open implements driver.DatabaseOpener. // Open implements driver.PoolOpener.
func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) { func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) {
lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src)
db, err := d.doOpen(ctx, src) db, err := d.doOpen(ctx, src)
@ -143,7 +143,7 @@ func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database
return nil, err return nil, err
} }
return &database{log: d.log, db: db, src: src, drvr: d}, nil return &pool{log: d.log, db: db, src: src, drvr: d}, nil
} }
func (d *driveri) doOpen(ctx context.Context, src *source.Source) (*sql.DB, error) { func (d *driveri) doOpen(ctx context.Context, src *source.Source) (*sql.DB, error) {
@ -896,8 +896,8 @@ func (d *driveri) getTableRecordMeta(ctx context.Context, db sqlz.DB, tblName st
return destCols, nil return destCols, nil
} }
// database implements driver.Database. // pool implements driver.Pool.
type database struct { type pool struct {
log *slog.Logger log *slog.Logger
db *sql.DB db *sql.DB
src *source.Source src *source.Source
@ -908,24 +908,24 @@ type database struct {
closed bool closed bool
} }
// DB implements driver.Database. // DB implements driver.Pool.
func (d *database) DB(context.Context) (*sql.DB, error) { func (p *pool) DB(context.Context) (*sql.DB, error) {
return d.db, nil return p.db, nil
} }
// SQLDriver implements driver.Database. // SQLDriver implements driver.Pool.
func (d *database) SQLDriver() driver.SQLDriver { func (p *pool) SQLDriver() driver.SQLDriver {
return d.drvr return p.drvr
} }
// Source implements driver.Database. // Source implements driver.Pool.
func (d *database) Source() *source.Source { func (p *pool) Source() *source.Source {
return d.src return p.src
} }
// TableMetadata implements driver.Database. // TableMetadata implements driver.Pool.
func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { func (p *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) {
db, err := d.DB(ctx) db, err := p.DB(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -933,20 +933,20 @@ func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.T
return getTableMetadata(ctx, db, tblName) return getTableMetadata(ctx, db, tblName)
} }
// SourceMetadata implements driver.Database. // SourceMetadata implements driver.Pool.
func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { func (p *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) {
// https://stackoverflow.com/questions/9646353/how-to-find-sqlite-database-file-version // https://stackoverflow.com/questions/9646353/how-to-find-sqlite-database-file-version
md := &source.Metadata{Handle: d.src.Handle, Driver: Type, DBDriver: dbDrvr} md := &source.Metadata{Handle: p.src.Handle, Driver: Type, DBDriver: dbDrvr}
dsn, err := PathFromLocation(d.src) dsn, err := PathFromLocation(p.src)
if err != nil { if err != nil {
return nil, err return nil, err
} }
const q = "SELECT sqlite_version(), (SELECT name FROM pragma_database_list ORDER BY seq limit 1);" const q = "SELECT sqlite_version(), (SELECT name FROM pragma_database_list ORDER BY seq limit 1);"
err = d.db.QueryRowContext(ctx, q).Scan(&md.DBVersion, &md.Schema) err = p.db.QueryRowContext(ctx, q).Scan(&md.DBVersion, &md.Schema)
if err != nil { if err != nil {
return nil, errw(err) return nil, errw(err)
} }
@ -961,9 +961,9 @@ func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.M
md.Size = fi.Size() md.Size = fi.Size()
md.Name = fi.Name() md.Name = fi.Name()
md.FQName = fi.Name() + "/" + md.Schema md.FQName = fi.Name() + "/" + md.Schema
md.Location = d.src.Location md.Location = p.src.Location
md.DBProperties, err = getDBProperties(ctx, d.db) md.DBProperties, err = getDBProperties(ctx, p.db)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -972,7 +972,7 @@ func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.M
return md, nil return md, nil
} }
md.Tables, err = getAllTableMetadata(ctx, d.db, md.Schema) md.Tables, err = getAllTableMetadata(ctx, p.db, md.Schema)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -988,19 +988,19 @@ func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.M
return md, nil return md, nil
} }
// Close implements driver.Database. // Close implements driver.Pool.
func (d *database) Close() error { func (p *pool) Close() error {
d.closeMu.Lock() p.closeMu.Lock()
defer d.closeMu.Unlock() defer p.closeMu.Unlock()
if d.closed { if p.closed {
d.log.Warn("SQLite DB already closed", lga.Src, d.src) p.log.Warn("SQLite DB already closed", lga.Src, p.src)
return nil return nil
} }
d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) p.log.Debug(lgm.CloseDB, lga.Handle, p.src.Handle)
err := errw(d.db.Close()) err := errw(p.db.Close())
d.closed = true p.closed = true
return err return err
} }

View File

@ -162,8 +162,8 @@ func (d *driveri) Renderer() *render.Renderer {
return r return r
} }
// Open implements driver.DatabaseOpener. // Open implements driver.PoolOpener.
func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) { func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) {
lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src)
db, err := d.doOpen(ctx, src) db, err := d.doOpen(ctx, src)
@ -175,7 +175,7 @@ func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database
return nil, err return nil, err
} }
return &database{log: d.log, db: db, src: src, drvr: d}, nil return &pool{log: d.log, db: db, src: src, drvr: d}, nil
} }
func (d *driveri) doOpen(ctx context.Context, src *source.Source) (*sql.DB, error) { func (d *driveri) doOpen(ctx context.Context, src *source.Source) (*sql.DB, error) {
@ -638,33 +638,33 @@ func (d *driveri) getTableColsMeta(ctx context.Context, db sqlz.DB, tblName stri
return destCols, nil return destCols, nil
} }
// database implements driver.Database. // pool implements driver.Pool.
type database struct { type pool struct {
log *slog.Logger log *slog.Logger
drvr *driveri drvr *driveri
db *sql.DB db *sql.DB
src *source.Source src *source.Source
} }
var _ driver.Database = (*database)(nil) var _ driver.Pool = (*pool)(nil)
// DB implements driver.Database. // DB implements driver.Pool.
func (d *database) DB(context.Context) (*sql.DB, error) { func (d *pool) DB(context.Context) (*sql.DB, error) {
return d.db, nil return d.db, nil
} }
// SQLDriver implements driver.Database. // SQLDriver implements driver.Pool.
func (d *database) SQLDriver() driver.SQLDriver { func (d *pool) SQLDriver() driver.SQLDriver {
return d.drvr return d.drvr
} }
// Source implements driver.Database. // Source implements driver.Pool.
func (d *database) Source() *source.Source { func (d *pool) Source() *source.Source {
return d.src return d.src
} }
// TableMetadata implements driver.Database. // TableMetadata implements driver.Pool.
func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { func (d *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) {
const query = `SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_TYPE const query = `SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_TYPE
FROM INFORMATION_SCHEMA.TABLES FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_NAME = @p1` WHERE TABLE_NAME = @p1`
@ -680,13 +680,13 @@ WHERE TABLE_NAME = @p1`
return getTableMetadata(ctx, d.db, catalog, schema, tblName, tblType) return getTableMetadata(ctx, d.db, catalog, schema, tblName, tblType)
} }
// SourceMetadata implements driver.Database. // SourceMetadata implements driver.Pool.
func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { func (d *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) {
return getSourceMetadata(ctx, d.src, d.db, noSchema) return getSourceMetadata(ctx, d.src, d.db, noSchema)
} }
// Close implements driver.Database. // Close implements driver.Pool.
func (d *database) Close() error { func (d *pool) Close() error {
d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle)
return errw(d.db.Close()) return errw(d.db.Close())

View File

@ -24,15 +24,15 @@ import (
) )
// ImportFunc is a function that can import // ImportFunc is a function that can import
// data (as defined in def) to destDB. // data (as defined in def) to destPool.
type ImportFunc func(ctx context.Context, def *DriverDef, type ImportFunc func(ctx context.Context, def *DriverDef,
data io.Reader, destDB driver.Database) error data io.Reader, destPool driver.Pool) error
// Provider implements driver.Provider for a DriverDef. // Provider implements driver.Provider for a DriverDef.
type Provider struct { type Provider struct {
Log *slog.Logger Log *slog.Logger
DriverDef *DriverDef DriverDef *DriverDef
Scratcher driver.ScratchDatabaseOpener Scratcher driver.ScratchPoolOpener
Files *source.Files Files *source.Files
ImportFn ImportFunc ImportFn ImportFunc
} }
@ -43,7 +43,7 @@ func (p *Provider) DriverFor(typ source.DriverType) (driver.Driver, error) {
return nil, errz.Errorf("unsupported driver type {%s}", typ) return nil, errz.Errorf("unsupported driver type {%s}", typ)
} }
return &drvr{ return &driveri{
log: p.Log, log: p.Log,
typ: typ, typ: typ,
def: p.DriverDef, def: p.DriverDef,
@ -62,17 +62,17 @@ func (p *Provider) Detectors() []source.DriverDetectFunc {
} }
// Driver implements driver.Driver. // Driver implements driver.Driver.
type drvr struct { type driveri struct {
log *slog.Logger log *slog.Logger
typ source.DriverType typ source.DriverType
def *DriverDef def *DriverDef
files *source.Files files *source.Files
scratcher driver.ScratchDatabaseOpener scratcher driver.ScratchPoolOpener
importFn ImportFunc importFn ImportFunc
} }
// DriverMetadata implements driver.Driver. // DriverMetadata implements driver.Driver.
func (d *drvr) DriverMetadata() driver.Metadata { func (d *driveri) DriverMetadata() driver.Metadata {
return driver.Metadata{ return driver.Metadata{
Type: source.DriverType(d.def.Name), Type: source.DriverType(d.def.Name),
Description: d.def.Title, Description: d.def.Title,
@ -81,8 +81,8 @@ func (d *drvr) DriverMetadata() driver.Metadata {
} }
} }
// Open implements driver.DatabaseOpener. // Open implements driver.PoolOpener.
func (d *drvr) Open(ctx context.Context, src *source.Source) (driver.Database, error) { func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) {
lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src)
clnup := cleanup.New() clnup := cleanup.New()
@ -106,16 +106,16 @@ func (d *drvr) Open(ctx context.Context, src *source.Source) (driver.Database, e
return nil, errz.Wrap(err, d.def.Name) return nil, errz.Wrap(err, d.def.Name)
} }
return &database{log: d.log, src: src, impl: scratchDB, clnup: clnup}, nil return &pool{log: d.log, src: src, impl: scratchDB, clnup: clnup}, nil
} }
// Truncate implements driver.Driver. // Truncate implements driver.Driver.
func (d *drvr) Truncate(_ context.Context, _ *source.Source, _ string, _ bool) (int64, error) { func (d *driveri) Truncate(_ context.Context, _ *source.Source, _ string, _ bool) (int64, error) {
return 0, errz.Errorf("truncate not supported for %s", d.DriverMetadata().Type) return 0, errz.Errorf("truncate not supported for %s", d.DriverMetadata().Type)
} }
// ValidateSource implements driver.Driver. // ValidateSource implements driver.Driver.
func (d *drvr) ValidateSource(src *source.Source) (*source.Source, error) { func (d *driveri) ValidateSource(src *source.Source) (*source.Source, error) {
d.log.Debug("Validating source", lga.Src, src) d.log.Debug("Validating source", lga.Src, src)
if string(src.Type) != d.def.Name { if string(src.Type) != d.def.Name {
return nil, errz.Errorf("expected driver type {%s} but got {%s}", d.def.Name, src.Type) return nil, errz.Errorf("expected driver type {%s} but got {%s}", d.def.Name, src.Type)
@ -124,7 +124,7 @@ func (d *drvr) ValidateSource(src *source.Source) (*source.Source, error) {
} }
// Ping implements driver.Driver. // Ping implements driver.Driver.
func (d *drvr) Ping(_ context.Context, src *source.Source) error { func (d *driveri) Ping(_ context.Context, src *source.Source) error {
d.log.Debug("Ping source", d.log.Debug("Ping source",
lga.Driver, d.typ, lga.Driver, d.typ,
lga.Src, src, lga.Src, src,
@ -141,39 +141,39 @@ func (d *drvr) Ping(_ context.Context, src *source.Source) error {
return r.Close() return r.Close()
} }
// database implements driver.Database. // pool implements driver.Pool.
type database struct { type pool struct {
log *slog.Logger log *slog.Logger
src *source.Source src *source.Source
impl driver.Database impl driver.Pool
// clnup will ultimately invoke impl.Close to dispose of // clnup will ultimately invoke impl.Close to dispose of
// the scratch DB. // the scratch DB.
clnup *cleanup.Cleanup clnup *cleanup.Cleanup
} }
// DB implements driver.Database. // DB implements driver.Pool.
func (d *database) DB(ctx context.Context) (*sql.DB, error) { func (d *pool) DB(ctx context.Context) (*sql.DB, error) {
return d.impl.DB(ctx) return d.impl.DB(ctx)
} }
// SQLDriver implements driver.Database. // SQLDriver implements driver.Pool.
func (d *database) SQLDriver() driver.SQLDriver { func (d *pool) SQLDriver() driver.SQLDriver {
return d.impl.SQLDriver() return d.impl.SQLDriver()
} }
// Source implements driver.Database. // Source implements driver.Pool.
func (d *database) Source() *source.Source { func (d *pool) Source() *source.Source {
return d.src return d.src
} }
// TableMetadata implements driver.Database. // TableMetadata implements driver.Pool.
func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { func (d *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) {
return d.impl.TableMetadata(ctx, tblName) return d.impl.TableMetadata(ctx, tblName)
} }
// SourceMetadata implements driver.Database. // SourceMetadata implements driver.Pool.
func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { func (d *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) {
meta, err := d.impl.SourceMetadata(ctx, noSchema) meta, err := d.impl.SourceMetadata(ctx, noSchema)
if err != nil { if err != nil {
return nil, err return nil, err
@ -190,8 +190,8 @@ func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.M
return meta, nil return meta, nil
} }
// Close implements driver.Database. // Close implements driver.Pool.
func (d *database) Close() error { func (d *pool) Close() error {
d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle)
// We don't need to explicitly invoke c.impl.Close // We don't need to explicitly invoke c.impl.Close

View File

@ -40,15 +40,15 @@ func TestDriver(t *testing.T) {
err := drvr.Ping(th.Context, src) err := drvr.Ping(th.Context, src)
require.NoError(t, err) require.NoError(t, err)
dbase, err := drvr.Open(th.Context, src) pool, err := drvr.Open(th.Context, src)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { assert.NoError(t, dbase.Close()) }) t.Cleanup(func() { assert.NoError(t, pool.Close()) })
srcMeta, err := dbase.SourceMetadata(th.Context, false) srcMeta, err := pool.SourceMetadata(th.Context, false)
require.NoError(t, err) require.NoError(t, err)
require.True(t, stringz.InSlice(srcMeta.TableNames(), tc.tbl)) require.True(t, stringz.InSlice(srcMeta.TableNames(), tc.tbl))
tblMeta, err := dbase.TableMetadata(th.Context, tc.tbl) tblMeta, err := pool.TableMetadata(th.Context, tc.tbl)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, tc.tbl, tblMeta.Name) require.Equal(t, tc.tbl, tblMeta.Name)

View File

@ -29,7 +29,7 @@ import (
const Genre = "xml" const Genre = "xml"
// Import implements userdriver.ImportFunc. // Import implements userdriver.ImportFunc.
func Import(ctx context.Context, def *userdriver.DriverDef, data io.Reader, destDB driver.Database) error { func Import(ctx context.Context, def *userdriver.DriverDef, data io.Reader, destPool driver.Pool) error {
if def.Genre != Genre { if def.Genre != Genre {
return errz.Errorf("xmlud.Import does not support genre {%s}", def.Genre) return errz.Errorf("xmlud.Import does not support genre {%s}", def.Genre)
} }
@ -47,7 +47,7 @@ func Import(ctx context.Context, def *userdriver.DriverDef, data io.Reader, dest
msgOnce: map[string]struct{}{}, msgOnce: map[string]struct{}{},
} }
err := im.execImport(ctx, data, destDB) err := im.execImport(ctx, data, destPool)
err2 := im.clnup.Run() err2 := im.clnup.Run()
if err != nil { if err != nil {
return errz.Wrap(err, "xml import") return errz.Wrap(err, "xml import")
@ -61,7 +61,7 @@ type importer struct {
log *slog.Logger log *slog.Logger
def *userdriver.DriverDef def *userdriver.DriverDef
data io.Reader data io.Reader
destDB driver.Database destPool driver.Pool
selStack *selStack selStack *selStack
rowStack *rowStack rowStack *rowStack
tblDefs map[string]*sqlmodel.TableDef tblDefs map[string]*sqlmodel.TableDef
@ -88,8 +88,8 @@ type importer struct {
msgOnce map[string]struct{} msgOnce map[string]struct{}
} }
func (im *importer) execImport(ctx context.Context, r io.Reader, destDB driver.Database) error { //nolint:gocognit func (im *importer) execImport(ctx context.Context, r io.Reader, destPool driver.Pool) error { //nolint:gocognit
im.data, im.destDB = r, destDB im.data, im.destPool = r, destPool
err := im.createTables(ctx) err := im.createTables(ctx)
if err != nil { if err != nil {
@ -431,13 +431,13 @@ func (im *importer) dbInsert(ctx context.Context, row *rowState) error {
execInsertFn, ok := im.execInsertFns[cacheKey] execInsertFn, ok := im.execInsertFns[cacheKey]
if !ok { if !ok {
db, err := im.destDB.DB(ctx) db, err := im.destPool.DB(ctx)
if err != nil { if err != nil {
return err return err
} }
// Nothing cached, prepare the insert statement and insert munge func // Nothing cached, prepare the insert statement and insert munge func
stmtExecer, err := im.destDB.SQLDriver().PrepareInsertStmt(ctx, db, tblName, colNames, 1) stmtExecer, err := im.destPool.SQLDriver().PrepareInsertStmt(ctx, db, tblName, colNames, 1)
if err != nil { if err != nil {
return err return err
} }
@ -471,7 +471,7 @@ func (im *importer) dbInsert(ctx context.Context, row *rowState) error {
// dbUpdate updates row's table with row's dirty values, using row's // dbUpdate updates row's table with row's dirty values, using row's
// primary key cols as the args to the WHERE clause. // primary key cols as the args to the WHERE clause.
func (im *importer) dbUpdate(ctx context.Context, row *rowState) error { func (im *importer) dbUpdate(ctx context.Context, row *rowState) error {
drvr := im.destDB.SQLDriver() drvr := im.destPool.SQLDriver()
tblName := row.tbl.Name tblName := row.tbl.Name
pkColNames := row.tbl.PrimaryKey pkColNames := row.tbl.PrimaryKey
@ -508,7 +508,7 @@ func (im *importer) dbUpdate(ctx context.Context, row *rowState) error {
cacheKey := "##update_func__" + tblName + "__" + strings.Join(colNames, ",") + whereClause cacheKey := "##update_func__" + tblName + "__" + strings.Join(colNames, ",") + whereClause
execUpdateFn, ok := im.execUpdateFns[cacheKey] execUpdateFn, ok := im.execUpdateFns[cacheKey]
if !ok { if !ok {
db, err := im.destDB.DB(ctx) db, err := im.destPool.DB(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -578,16 +578,16 @@ func (im *importer) createTables(ctx context.Context) error {
im.tblDefs[tblDef.Name] = tblDef im.tblDefs[tblDef.Name] = tblDef
db, err := im.destDB.DB(ctx) db, err := im.destPool.DB(ctx)
if err != nil { if err != nil {
return err return err
} }
err = im.destDB.SQLDriver().CreateTable(ctx, db, tblDef) err = im.destPool.SQLDriver().CreateTable(ctx, db, tblDef)
if err != nil { if err != nil {
return err return err
} }
im.log.Debug("Created table", lga.Target, source.Target(im.destDB.Source(), tblDef.Name)) im.log.Debug("Created table", lga.Target, source.Target(im.destPool.Source(), tblDef.Name))
} }
return nil return nil

View File

@ -33,7 +33,7 @@ func TestImport_Ppl(t *testing.T) {
require.Equal(t, driverPpl, udDef.Name) require.Equal(t, driverPpl, udDef.Name)
require.Equal(t, xmlud.Genre, udDef.Genre) require.Equal(t, xmlud.Genre, udDef.Genre)
scratchDB, err := th.Databases().OpenScratch(th.Context, "ppl") scratchDB, err := th.Pools().OpenScratch(th.Context, "ppl")
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
assert.NoError(t, scratchDB.Close()) assert.NoError(t, scratchDB.Close())
@ -78,7 +78,7 @@ func TestImport_RSS(t *testing.T) {
require.Equal(t, driverRSS, udDef.Name) require.Equal(t, driverRSS, udDef.Name)
require.Equal(t, xmlud.Genre, udDef.Genre) require.Equal(t, xmlud.Genre, udDef.Genre)
scratchDB, err := th.Databases().OpenScratch(th.Context, "rss") scratchDB, err := th.Pools().OpenScratch(th.Context, "rss")
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
assert.NoError(t, scratchDB.Close()) assert.NoError(t, scratchDB.Close())

View File

@ -18,15 +18,15 @@ import (
"github.com/neilotoole/sq/libsq/source" "github.com/neilotoole/sq/libsq/source"
) )
// database implements driver.Database. It implements a deferred ingest // pool implements driver.Pool. It implements a deferred ingest
// of the Excel data. // of the Excel data.
type database struct { type pool struct {
// REVISIT: do we need database.log, or can we use lg.FromContext? // REVISIT: do we need pool.log, or can we use lg.FromContext?
log *slog.Logger log *slog.Logger
src *source.Source src *source.Source
files *source.Files files *source.Files
scratchDB driver.Database scratchPool driver.Pool
clnup *cleanup.Cleanup clnup *cleanup.Cleanup
mu sync.Mutex mu sync.Mutex
@ -40,28 +40,28 @@ type database struct {
} }
// checkIngest performs data ingestion if not already done. // checkIngest performs data ingestion if not already done.
func (d *database) checkIngest(ctx context.Context) error { func (p *pool) checkIngest(ctx context.Context) error {
d.ingestOnce.Do(func() { p.ingestOnce.Do(func() {
d.ingestErr = d.doIngest(ctx, d.ingestSheetNames) p.ingestErr = p.doIngest(ctx, p.ingestSheetNames)
}) })
return d.ingestErr return p.ingestErr
} }
// doIngest performs data ingest. It must only be invoked from checkIngest. // doIngest performs data ingest. It must only be invoked from checkIngest.
func (d *database) doIngest(ctx context.Context, includeSheetNames []string) error { func (p *pool) doIngest(ctx context.Context, includeSheetNames []string) error {
log := lg.FromContext(ctx) log := lg.FromContext(ctx)
// Because of the deferred ingest mechanism, we need to ensure that // Because of the deferred ingest mechanism, we need to ensure that
// the context being passed down the stack (in particular to ingestXLSX) // the context being passed down the stack (in particular to ingestXLSX)
// has the source's options on it. // has the source's options on it.
ctx = options.NewContext(ctx, options.Merge(options.FromContext(ctx), d.src.Options)) ctx = options.NewContext(ctx, options.Merge(options.FromContext(ctx), p.src.Options))
r, err := d.files.Open(d.src) r, err := p.files.Open(p.src)
if err != nil { if err != nil {
return err return err
} }
defer lg.WarnIfCloseError(d.log, lgm.CloseFileReader, r) defer lg.WarnIfCloseError(p.log, lgm.CloseFileReader, r)
xfile, err := excelize.OpenReader(r, excelize.Options{RawCellValue: false}) xfile, err := excelize.OpenReader(r, excelize.Options{RawCellValue: false})
if err != nil { if err != nil {
@ -70,83 +70,83 @@ func (d *database) doIngest(ctx context.Context, includeSheetNames []string) err
defer lg.WarnIfCloseError(log, lgm.CloseFileReader, xfile) defer lg.WarnIfCloseError(log, lgm.CloseFileReader, xfile)
err = ingestXLSX(ctx, d.src, d.scratchDB, xfile, includeSheetNames) err = ingestXLSX(ctx, p.src, p.scratchPool, xfile, includeSheetNames)
if err != nil { if err != nil {
lg.WarnIfError(d.log, lgm.CloseDB, d.clnup.Run()) lg.WarnIfError(p.log, lgm.CloseDB, p.clnup.Run())
return err return err
} }
return err return err
} }
// DB implements driver.Database. // DB implements driver.Pool.
func (d *database) DB(ctx context.Context) (*sql.DB, error) { func (p *pool) DB(ctx context.Context) (*sql.DB, error) {
d.mu.Lock() p.mu.Lock()
defer d.mu.Unlock() defer p.mu.Unlock()
if err := d.checkIngest(ctx); err != nil { if err := p.checkIngest(ctx); err != nil {
return nil, err return nil, err
} }
return d.scratchDB.DB(ctx) return p.scratchPool.DB(ctx)
} }
// SQLDriver implements driver.Database. // SQLDriver implements driver.Pool.
func (d *database) SQLDriver() driver.SQLDriver { func (p *pool) SQLDriver() driver.SQLDriver {
return d.scratchDB.SQLDriver() return p.scratchPool.SQLDriver()
} }
// Source implements driver.Database. // Source implements driver.Pool.
func (d *database) Source() *source.Source { func (p *pool) Source() *source.Source {
return d.src return p.src
} }
// SourceMetadata implements driver.Database. // SourceMetadata implements driver.Pool.
func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { func (p *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) {
d.mu.Lock() p.mu.Lock()
defer d.mu.Unlock() defer p.mu.Unlock()
if err := d.checkIngest(ctx); err != nil { if err := p.checkIngest(ctx); err != nil {
return nil, err return nil, err
} }
md, err := d.scratchDB.SourceMetadata(ctx, noSchema) md, err := p.scratchPool.SourceMetadata(ctx, noSchema)
if err != nil { if err != nil {
return nil, err return nil, err
} }
md.Handle = d.src.Handle md.Handle = p.src.Handle
md.Driver = Type md.Driver = Type
md.Location = d.src.Location md.Location = p.src.Location
if md.Name, err = source.LocationFileName(d.src); err != nil { if md.Name, err = source.LocationFileName(p.src); err != nil {
return nil, err return nil, err
} }
md.FQName = md.Name md.FQName = md.Name
if md.Size, err = d.files.Size(d.src); err != nil { if md.Size, err = p.files.Size(p.src); err != nil {
return nil, err return nil, err
} }
return md, nil return md, nil
} }
// TableMetadata implements driver.Database. // TableMetadata implements driver.Pool.
func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { func (p *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) {
d.mu.Lock() p.mu.Lock()
defer d.mu.Unlock() defer p.mu.Unlock()
d.ingestSheetNames = []string{tblName} p.ingestSheetNames = []string{tblName}
if err := d.checkIngest(ctx); err != nil { if err := p.checkIngest(ctx); err != nil {
return nil, err return nil, err
} }
return d.scratchDB.TableMetadata(ctx, tblName) return p.scratchPool.TableMetadata(ctx, tblName)
} }
// Close implements driver.Database. // Close implements driver.Pool.
func (d *database) Close() error { func (p *pool) Close() error {
d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) p.log.Debug(lgm.CloseDB, lga.Handle, p.src.Handle)
// No need to explicitly invoke c.scratchDB.Close because // No need to explicitly invoke c.scratchPool.Close because
// that's already added to c.clnup // that's already added to c.clnup
return d.clnup.Run() return p.clnup.Run()
} }

View File

@ -92,16 +92,16 @@ func (xs *xSheet) loadSampleRows(ctx context.Context, sampleSize int) error {
return nil return nil
} }
// ingestXLSX loads the data in xfile into scratchDB. // ingestXLSX loads the data in xfile into scratchPool.
// If includeSheetNames is non-empty, only the named sheets are ingested. // If includeSheetNames is non-empty, only the named sheets are ingested.
func ingestXLSX(ctx context.Context, src *source.Source, scratchDB driver.Database, func ingestXLSX(ctx context.Context, src *source.Source, scratchPool driver.Pool,
xfile *excelize.File, includeSheetNames []string, xfile *excelize.File, includeSheetNames []string,
) error { ) error {
log := lg.FromContext(ctx) log := lg.FromContext(ctx)
start := time.Now() start := time.Now()
log.Debug("Beginning import from XLSX", log.Debug("Beginning import from XLSX",
lga.Src, src, lga.Src, src,
lga.Target, scratchDB.Source()) lga.Target, scratchPool.Source())
var sheets []*xSheet var sheets []*xSheet
if len(includeSheetNames) > 0 { if len(includeSheetNames) > 0 {
@ -132,18 +132,18 @@ func ingestXLSX(ctx context.Context, src *source.Source, scratchDB driver.Databa
} }
var db *sql.DB var db *sql.DB
if db, err = scratchDB.DB(ctx); err != nil { if db, err = scratchPool.DB(ctx); err != nil {
return err return err
} }
if err = scratchDB.SQLDriver().CreateTable(ctx, db, sheetTbl.def); err != nil { if err = scratchPool.SQLDriver().CreateTable(ctx, db, sheetTbl.def); err != nil {
return err return err
} }
} }
log.Debug("Tables created (but not yet populated)", log.Debug("Tables created (but not yet populated)",
lga.Count, len(sheetTbls), lga.Count, len(sheetTbls),
lga.Target, scratchDB.Source(), lga.Target, scratchPool.Source(),
lga.Elapsed, time.Since(start)) lga.Elapsed, time.Since(start))
var imported, skipped int var imported, skipped int
@ -154,7 +154,7 @@ func ingestXLSX(ctx context.Context, src *source.Source, scratchDB driver.Databa
continue continue
} }
if err = ingestSheetToTable(ctx, scratchDB, sheetTbls[i]); err != nil { if err = ingestSheetToTable(ctx, scratchPool, sheetTbls[i]); err != nil {
return err return err
} }
imported++ imported++
@ -164,7 +164,7 @@ func ingestXLSX(ctx context.Context, src *source.Source, scratchDB driver.Databa
lga.Count, imported, lga.Count, imported,
"skipped", skipped, "skipped", skipped,
lga.From, src, lga.From, src,
lga.To, scratchDB.Source(), lga.To, scratchPool.Source(),
lga.Elapsed, time.Since(start), lga.Elapsed, time.Since(start),
) )
@ -172,8 +172,8 @@ func ingestXLSX(ctx context.Context, src *source.Source, scratchDB driver.Databa
} }
// ingestSheetToTable imports the sheet data into the appropriate table // ingestSheetToTable imports the sheet data into the appropriate table
// in scratchDB. The scratch table must already exist. // in scratchPool. The scratch table must already exist.
func ingestSheetToTable(ctx context.Context, scratchDB driver.Database, sheetTbl *sheetTable) error { func ingestSheetToTable(ctx context.Context, scratchPool driver.Pool, sheetTbl *sheetTable) error {
var ( var (
log = lg.FromContext(ctx) log = lg.FromContext(ctx)
startTime = time.Now() startTime = time.Now()
@ -183,7 +183,7 @@ func ingestSheetToTable(ctx context.Context, scratchDB driver.Database, sheetTbl
destColKinds = tblDef.ColKinds() destColKinds = tblDef.ColKinds()
) )
db, err := scratchDB.DB(ctx) db, err := scratchPool.DB(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -194,7 +194,7 @@ func ingestSheetToTable(ctx context.Context, scratchDB driver.Database, sheetTbl
} }
defer lg.WarnIfCloseError(log, lgm.CloseDB, conn) defer lg.WarnIfCloseError(log, lgm.CloseDB, conn)
drvr := scratchDB.SQLDriver() drvr := scratchPool.SQLDriver()
batchSize := driver.MaxBatchRows(drvr, len(destColKinds)) batchSize := driver.MaxBatchRows(drvr, len(destColKinds))
bi, err := driver.NewBatchInsert(ctx, drvr, conn, tblDef.Name, tblDef.ColNames(), batchSize) bi, err := driver.NewBatchInsert(ctx, drvr, conn, tblDef.Name, tblDef.ColNames(), batchSize)
@ -264,7 +264,7 @@ func ingestSheetToTable(ctx context.Context, scratchDB driver.Database, sheetTbl
log.Debug("Inserted rows from sheet into table", log.Debug("Inserted rows from sheet into table",
lga.Count, bi.Written(), lga.Count, bi.Written(),
laSheet, sheet.name, laSheet, sheet.name,
lga.Target, source.Target(scratchDB.Source(), tblDef.Name), lga.Target, source.Target(scratchPool.Source(), tblDef.Name),
lga.Elapsed, time.Since(startTime)) lga.Elapsed, time.Since(startTime))
return nil return nil

View File

@ -31,7 +31,7 @@ const (
type Provider struct { type Provider struct {
Log *slog.Logger Log *slog.Logger
Files *source.Files Files *source.Files
Scratcher driver.ScratchDatabaseOpener Scratcher driver.ScratchPoolOpener
} }
// DriverFor implements driver.Provider. // DriverFor implements driver.Provider.
@ -46,7 +46,7 @@ func (p *Provider) DriverFor(typ source.DriverType) (driver.Driver, error) {
// Driver implements driver.Driver. // Driver implements driver.Driver.
type Driver struct { type Driver struct {
log *slog.Logger log *slog.Logger
scratcher driver.ScratchDatabaseOpener scratcher driver.ScratchPoolOpener
files *source.Files files *source.Files
} }
@ -59,27 +59,27 @@ func (d *Driver) DriverMetadata() driver.Metadata {
} }
} }
// Open implements driver.DatabaseOpener. // Open implements driver.PoolOpener.
func (d *Driver) Open(ctx context.Context, src *source.Source) (driver.Database, error) { func (d *Driver) Open(ctx context.Context, src *source.Source) (driver.Pool, error) {
lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src)
scratchDB, err := d.scratcher.OpenScratch(ctx, src.Handle) scratchPool, err := d.scratcher.OpenScratch(ctx, src.Handle)
if err != nil { if err != nil {
return nil, err return nil, err
} }
clnup := cleanup.New() clnup := cleanup.New()
clnup.AddE(scratchDB.Close) clnup.AddE(scratchPool.Close)
dbase := &database{ p := &pool{
log: d.log, log: d.log,
src: src, src: src,
scratchDB: scratchDB, scratchPool: scratchPool,
files: d.files, files: d.files,
clnup: clnup, clnup: clnup,
} }
return dbase, nil return p, nil
} }
// Truncate implements driver.Driver. // Truncate implements driver.Driver.

View File

@ -161,9 +161,9 @@ func TestOpenFileFormats(t *testing.T) {
Location: filepath.Join("testdata", "file_formats", tc.filename), Location: filepath.Join("testdata", "file_formats", tc.filename),
}) })
dbase, err := th.Databases().Open(th.Context, src) pool, err := th.Pools().Open(th.Context, src)
require.NoError(t, err) require.NoError(t, err)
db, err := dbase.DB(th.Context) db, err := pool.DB(th.Context)
if tc.wantErr { if tc.wantErr {
require.Error(t, err) require.Error(t, err)
return return

View File

@ -26,7 +26,7 @@ import (
type DBWriter struct { type DBWriter struct {
wg *sync.WaitGroup wg *sync.WaitGroup
cancelFn context.CancelFunc cancelFn context.CancelFunc
destDB driver.Database destPool driver.Pool
destTbl string destTbl string
recordCh chan record.Record recordCh chan record.Record
bi *driver.BatchInsert bi *driver.BatchInsert
@ -42,17 +42,17 @@ type DBWriter struct {
// DBWriterPreWriteHook is a function that is invoked before DBWriter // DBWriterPreWriteHook is a function that is invoked before DBWriter
// begins writing. // begins writing.
type DBWriterPreWriteHook func(ctx context.Context, recMeta record.Meta, destDB driver.Database, tx sqlz.DB) error type DBWriterPreWriteHook func(ctx context.Context, recMeta record.Meta, destPool driver.Pool, tx sqlz.DB) error
// DBWriterCreateTableIfNotExistsHook returns a hook that // DBWriterCreateTableIfNotExistsHook returns a hook that
// creates destTblName if it does not exist. // creates destTblName if it does not exist.
func DBWriterCreateTableIfNotExistsHook(destTblName string) DBWriterPreWriteHook { func DBWriterCreateTableIfNotExistsHook(destTblName string) DBWriterPreWriteHook {
return func(ctx context.Context, recMeta record.Meta, destDB driver.Database, tx sqlz.DB) error { return func(ctx context.Context, recMeta record.Meta, destPool driver.Pool, tx sqlz.DB) error {
db, err := destDB.DB(ctx) db, err := destPool.DB(ctx)
if err != nil { if err != nil {
return err return err
} }
tblExists, err := destDB.SQLDriver().TableExists(ctx, db, destTblName) tblExists, err := destPool.SQLDriver().TableExists(ctx, db, destTblName)
if err != nil { if err != nil {
return errz.Err(err) return errz.Err(err)
} }
@ -65,9 +65,9 @@ func DBWriterCreateTableIfNotExistsHook(destTblName string) DBWriterPreWriteHook
destColKinds := recMeta.Kinds() destColKinds := recMeta.Kinds()
destTblDef := sqlmodel.NewTableDef(destTblName, destColNames, destColKinds) destTblDef := sqlmodel.NewTableDef(destTblName, destColNames, destColKinds)
err = destDB.SQLDriver().CreateTable(ctx, tx, destTblDef) err = destPool.SQLDriver().CreateTable(ctx, tx, destTblDef)
if err != nil { if err != nil {
return errz.Wrapf(err, "failed to create dest table %s.%s", destDB.Source().Handle, destTblName) return errz.Wrapf(err, "failed to create dest table %s.%s", destPool.Source().Handle, destTblName)
} }
return nil return nil
@ -76,13 +76,13 @@ func DBWriterCreateTableIfNotExistsHook(destTblName string) DBWriterPreWriteHook
// NewDBWriter returns a new writer than implements RecordWriter. // NewDBWriter returns a new writer than implements RecordWriter.
// The writer writes records from recordCh to destTbl // The writer writes records from recordCh to destTbl
// in destDB. The recChSize param controls the size of recordCh // in destPool. The recChSize param controls the size of recordCh
// returned by the writer's Open method. // returned by the writer's Open method.
func NewDBWriter(destDB driver.Database, destTbl string, recChSize int, func NewDBWriter(destPool driver.Pool, destTbl string, recChSize int,
preWriteHooks ...DBWriterPreWriteHook, preWriteHooks ...DBWriterPreWriteHook,
) *DBWriter { ) *DBWriter {
return &DBWriter{ return &DBWriter{
destDB: destDB, destPool: destPool,
destTbl: destTbl, destTbl: destTbl,
recordCh: make(chan record.Record, recChSize), recordCh: make(chan record.Record, recChSize),
errCh: make(chan error, 3), errCh: make(chan error, 3),
@ -102,7 +102,7 @@ func (w *DBWriter) Open(ctx context.Context, cancelFn context.CancelFunc, recMet
) { ) {
w.cancelFn = cancelFn w.cancelFn = cancelFn
db, err := w.destDB.DB(ctx) db, err := w.destPool.DB(ctx)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -110,19 +110,19 @@ func (w *DBWriter) Open(ctx context.Context, cancelFn context.CancelFunc, recMet
// REVISIT: tx could potentially be passed to NewDBWriter? // REVISIT: tx could potentially be passed to NewDBWriter?
tx, err := db.BeginTx(ctx, nil) tx, err := db.BeginTx(ctx, nil)
if err != nil { if err != nil {
return nil, nil, errz.Wrapf(err, "failed to open tx for %s.%s", w.destDB.Source().Handle, w.destTbl) return nil, nil, errz.Wrapf(err, "failed to open tx for %s.%s", w.destPool.Source().Handle, w.destTbl)
} }
for _, hook := range w.preWriteHooks { for _, hook := range w.preWriteHooks {
err = hook(ctx, recMeta, w.destDB, tx) err = hook(ctx, recMeta, w.destPool, tx)
if err != nil { if err != nil {
w.rollback(ctx, tx, err) w.rollback(ctx, tx, err)
return nil, nil, err return nil, nil, err
} }
} }
batchSize := driver.MaxBatchRows(w.destDB.SQLDriver(), len(recMeta.Names())) batchSize := driver.MaxBatchRows(w.destPool.SQLDriver(), len(recMeta.Names()))
w.bi, err = driver.NewBatchInsert(ctx, w.destDB.SQLDriver(), tx, w.destTbl, recMeta.Names(), batchSize) w.bi, err = driver.NewBatchInsert(ctx, w.destPool.SQLDriver(), tx, w.destTbl, recMeta.Names(), batchSize)
if err != nil { if err != nil {
w.rollback(ctx, tx, err) w.rollback(ctx, tx, err)
return nil, nil, err return nil, nil, err
@ -169,7 +169,7 @@ func (w *DBWriter) Open(ctx context.Context, cancelFn context.CancelFunc, recMet
w.addErrs(commitErr) w.addErrs(commitErr)
} else { } else {
lg.FromContext(ctx).Debug("Tx commit success", lg.FromContext(ctx).Debug("Tx commit success",
lga.Target, source.Target(w.destDB.Source(), w.destTbl)) lga.Target, source.Target(w.destPool.Source(), w.destTbl))
} }
return return
@ -224,7 +224,7 @@ func (w *DBWriter) addErrs(errs ...error) {
func (w *DBWriter) rollback(ctx context.Context, tx *sql.Tx, causeErrs ...error) { func (w *DBWriter) rollback(ctx context.Context, tx *sql.Tx, causeErrs ...error) {
// Guaranteed to be at least one causeErr // Guaranteed to be at least one causeErr
lg.FromContext(ctx).Error("failed to insert data: tx will rollback", lg.FromContext(ctx).Error("failed to insert data: tx will rollback",
lga.Target, w.destDB.Source().Handle+"."+w.destTbl, lga.Target, w.destPool.Source().Handle+"."+w.destTbl,
lga.Err, causeErrs[0]) lga.Err, causeErrs[0])
rollbackErr := errz.Err(tx.Rollback()) rollbackErr := errz.Err(tx.Rollback())

View File

@ -166,31 +166,31 @@ type Provider interface {
DriverFor(typ source.DriverType) (Driver, error) DriverFor(typ source.DriverType) (Driver, error)
} }
// DatabaseOpener opens a Database. // PoolOpener opens a Pool.
type DatabaseOpener interface { type PoolOpener interface {
// Open returns a Database instance for src. // Open returns a Pool instance for src.
Open(ctx context.Context, src *source.Source) (Database, error) Open(ctx context.Context, src *source.Source) (Pool, error)
} }
// JoinDatabaseOpener can open a join database. // JoinPoolOpener can open a join database.
type JoinDatabaseOpener interface { type JoinPoolOpener interface {
// OpenJoin opens an appropriate Database for use as // OpenJoin opens an appropriate Pool for use as
// a work DB for joining across sources. // a work DB for joining across sources.
OpenJoin(ctx context.Context, srcs ...*source.Source) (Database, error) OpenJoin(ctx context.Context, srcs ...*source.Source) (Pool, error)
} }
// ScratchDatabaseOpener opens a scratch database. A scratch database is // ScratchPoolOpener opens a scratch database pool. A scratch database is
// typically a short-lived database used as a target for loading // typically a short-lived database used as a target for loading
// non-SQL data (such as CSV). // non-SQL data (such as CSV).
type ScratchDatabaseOpener interface { type ScratchPoolOpener interface {
// OpenScratch returns a database for scratch use. // OpenScratch returns a pool for scratch use.
OpenScratch(ctx context.Context, name string) (Database, error) OpenScratch(ctx context.Context, name string) (Pool, error)
} }
// Driver is the core interface that must be implemented for each type // Driver is the core interface that must be implemented for each type
// of data source. // of data source.
type Driver interface { type Driver interface {
DatabaseOpener PoolOpener
// DriverMetadata returns driver metadata. // DriverMetadata returns driver metadata.
DriverMetadata() Metadata DriverMetadata() Metadata
@ -341,13 +341,12 @@ type SQLDriver interface {
DBProperties(ctx context.Context, db sqlz.DB) (map[string]any, error) DBProperties(ctx context.Context, db sqlz.DB) (map[string]any, error)
} }
// Database models a database handle. It is conceptually equivalent to // Pool models a database handle representing a pool of underlying
// connections. It is conceptually equivalent to
// stdlib sql.DB, and in fact encapsulates a sql.DB instance. The // stdlib sql.DB, and in fact encapsulates a sql.DB instance. The
// realized sql.DB instance can be accessed via the DB method. // realized sql.DB instance can be accessed via the DB method.
// type Pool interface {
// REVISIT: maybe rename driver.Database to driver.Pool or such? // DB returns the sql.DB object for this Pool.
type Database interface {
// DB returns the sql.DB object for this Database.
// This operation can take a long time if opening the DB requires // This operation can take a long time if opening the DB requires
// an ingest of data. // an ingest of data.
// For example, with file-based sources such as XLSX, invoking Open // For example, with file-based sources such as XLSX, invoking Open
@ -366,13 +365,13 @@ type Database interface {
// If noSchema is true, schema details are not populated // If noSchema is true, schema details are not populated
// on the returned source.Metadata. // on the returned source.Metadata.
// //
// TODO: SourceMetadata doesn't really belong on driver.Database? It // TODO: SourceMetadata doesn't really belong on driver.Pool? It
// should be moved to driver.Driver? // should be moved to driver.Driver?
SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error)
// TableMetadata returns metadata for the specified table in the data source. // TableMetadata returns metadata for the specified table in the data source.
// //
// TODO: TableMetadata doesn't really belong on driver.Database? It // TODO: TableMetadata doesn't really belong on driver.Pool? It
// should be moved to driver.Driver? // should be moved to driver.Driver?
TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error)
@ -411,45 +410,45 @@ type Metadata struct {
} }
var ( var (
_ DatabaseOpener = (*Databases)(nil) _ PoolOpener = (*Pools)(nil)
_ JoinDatabaseOpener = (*Databases)(nil) _ JoinPoolOpener = (*Pools)(nil)
) )
// Databases provides a mechanism for getting Database instances. // Pools provides a mechanism for getting Pool instances.
// Note that at this time instances returned by Open are cached // Note that at this time instances returned by Open are cached
// and then closed by Close. This may be a bad approach. // and then closed by Close. This may be a bad approach.
type Databases struct { type Pools struct {
log *slog.Logger log *slog.Logger
drvrs Provider drvrs Provider
mu sync.Mutex mu sync.Mutex
scratchSrcFn ScratchSrcFunc scratchSrcFn ScratchSrcFunc
dbases map[string]Database pools map[string]Pool
clnup *cleanup.Cleanup clnup *cleanup.Cleanup
} }
// NewDatabases returns a Databases instances. // NewPools returns a Pools instances.
func NewDatabases(log *slog.Logger, drvrs Provider, scratchSrcFn ScratchSrcFunc) *Databases { func NewPools(log *slog.Logger, drvrs Provider, scratchSrcFn ScratchSrcFunc) *Pools {
return &Databases{ return &Pools{
log: log, log: log,
drvrs: drvrs, drvrs: drvrs,
mu: sync.Mutex{}, mu: sync.Mutex{},
scratchSrcFn: scratchSrcFn, scratchSrcFn: scratchSrcFn,
dbases: map[string]Database{}, pools: map[string]Pool{},
clnup: cleanup.New(), clnup: cleanup.New(),
} }
} }
// Open returns an opened Database for src. The returned Database // Open returns an opened Pool for src. The returned Pool
// may be cached and returned on future invocations for the // may be cached and returned on future invocations for the
// same source (where each source fields is identical). // same source (where each source fields is identical).
// Thus, the caller should typically not close // Thus, the caller should typically not close
// the Database: it will be closed via d.Close. // the Pool: it will be closed via d.Close.
// //
// NOTE: This entire logic re caching/not-closing is a bit sketchy, // NOTE: This entire logic re caching/not-closing is a bit sketchy,
// and needs to be revisited. // and needs to be revisited.
// //
// Open implements DatabaseOpener. // Open implements PoolOpener.
func (d *Databases) Open(ctx context.Context, src *source.Source) (Database, error) { func (d *Pools) Open(ctx context.Context, src *source.Source) (Pool, error) {
lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src)
d.mu.Lock() d.mu.Lock()
@ -457,9 +456,9 @@ func (d *Databases) Open(ctx context.Context, src *source.Source) (Database, err
key := src.Handle + "_" + hashSource(src) key := src.Handle + "_" + hashSource(src)
dbase, ok := d.dbases[key] pool, ok := d.pools[key]
if ok { if ok {
return dbase, nil return pool, nil
} }
drvr, err := d.drvrs.DriverFor(src.Type) drvr, err := d.drvrs.DriverFor(src.Type)
@ -471,23 +470,23 @@ func (d *Databases) Open(ctx context.Context, src *source.Source) (Database, err
o := options.Merge(baseOptions, src.Options) o := options.Merge(baseOptions, src.Options)
ctx = options.NewContext(ctx, o) ctx = options.NewContext(ctx, o)
dbase, err = drvr.Open(ctx, src) pool, err = drvr.Open(ctx, src)
if err != nil { if err != nil {
return nil, err return nil, err
} }
d.clnup.AddC(dbase) d.clnup.AddC(pool)
d.dbases[key] = dbase d.pools[key] = pool
return dbase, nil return pool, nil
} }
// OpenScratch returns a scratch database instance. It is not // OpenScratch returns a scratch database instance. It is not
// necessary for the caller to close the returned Database as // necessary for the caller to close the returned Pool as
// its Close method will be invoked by d.Close. // its Close method will be invoked by d.Close.
// //
// OpenScratch implements ScratchDatabaseOpener. // OpenScratch implements ScratchPoolOpener.
func (d *Databases) OpenScratch(ctx context.Context, name string) (Database, error) { func (d *Pools) OpenScratch(ctx context.Context, name string) (Pool, error) {
const msgCloseScratch = "close scratch db" const msgCloseScratch = "close scratch db"
scratchSrc, cleanFn, err := d.scratchSrcFn(ctx, name) scratchSrc, cleanFn, err := d.scratchSrcFn(ctx, name)
@ -509,15 +508,15 @@ func (d *Databases) OpenScratch(ctx context.Context, name string) (Database, err
return nil, errz.Errorf("driver for scratch source %s is not a SQLDriver but is %T", scratchSrc.Handle, drvr) return nil, errz.Errorf("driver for scratch source %s is not a SQLDriver but is %T", scratchSrc.Handle, drvr)
} }
var backingDB Database var backingPool Pool
backingDB, err = sqlDrvr.Open(ctx, scratchSrc) backingPool, err = sqlDrvr.Open(ctx, scratchSrc)
if err != nil { if err != nil {
lg.WarnIfFuncError(d.log, msgCloseScratch, cleanFn) lg.WarnIfFuncError(d.log, msgCloseScratch, cleanFn)
return nil, err return nil, err
} }
d.clnup.AddE(cleanFn) d.clnup.AddE(cleanFn)
return backingDB, nil return backingPool, nil
} }
// OpenJoin opens an appropriate database for use as // OpenJoin opens an appropriate database for use as
@ -530,8 +529,8 @@ func (d *Databases) OpenScratch(ctx context.Context, name string) (Database, err
// the join etc.). Currently the implementation simply delegates // the join etc.). Currently the implementation simply delegates
// to OpenScratch. // to OpenScratch.
// //
// OpenJoin implements JoinDatabaseOpener. // OpenJoin implements JoinPoolOpener.
func (d *Databases) OpenJoin(ctx context.Context, srcs ...*source.Source) (Database, error) { func (d *Pools) OpenJoin(ctx context.Context, srcs ...*source.Source) (Pool, error) {
var names []string var names []string
for _, src := range srcs { for _, src := range srcs {
names = append(names, src.Handle[1:]) names = append(names, src.Handle[1:])
@ -542,7 +541,7 @@ func (d *Databases) OpenJoin(ctx context.Context, srcs ...*source.Source) (Datab
} }
// Close closes d, invoking Close on any instances opened via d.Open. // Close closes d, invoking Close on any instances opened via d.Open.
func (d *Databases) Close() error { func (d *Pools) Close() error {
d.log.Debug("Closing databases(s)...", lga.Count, d.clnup.Len()) d.log.Debug("Closing databases(s)...", lga.Count, d.clnup.Len())
return d.clnup.Run() return d.clnup.Run()
} }

View File

@ -272,12 +272,12 @@ func TestDriver_Open(t *testing.T) {
th := testh.New(t) th := testh.New(t)
src := th.Source(handle) src := th.Source(handle)
drvr := th.DriverFor(src) drvr := th.DriverFor(src)
dbase, err := drvr.Open(th.Context, src) pool, err := drvr.Open(th.Context, src)
require.NoError(t, err) require.NoError(t, err)
db, err := dbase.DB(th.Context) db, err := pool.DB(th.Context)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, db.PingContext(th.Context)) require.NoError(t, db.PingContext(th.Context))
require.NoError(t, dbase.Close()) require.NoError(t, pool.Close())
}) })
} }
} }
@ -441,9 +441,9 @@ func TestDatabase_TableMetadata(t *testing.T) { //nolint:tparallel
t.Run(handle, func(t *testing.T) { t.Run(handle, func(t *testing.T) {
t.Parallel() t.Parallel()
th, _, _, dbase, _ := testh.NewWith(t, handle) th, _, _, pool, _ := testh.NewWith(t, handle)
tblMeta, err := dbase.TableMetadata(th.Context, sakila.TblActor) tblMeta, err := pool.TableMetadata(th.Context, sakila.TblActor)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, sakila.TblActor, tblMeta.Name) require.Equal(t, sakila.TblActor, tblMeta.Name)
require.Equal(t, int64(sakila.TblActorCount), tblMeta.RowCount) require.Equal(t, int64(sakila.TblActorCount), tblMeta.RowCount)
@ -460,9 +460,9 @@ func TestDatabase_SourceMetadata(t *testing.T) {
t.Run(handle, func(t *testing.T) { t.Run(handle, func(t *testing.T) {
t.Parallel() t.Parallel()
th, _, _, dbase, _ := testh.NewWith(t, handle) th, _, _, pool, _ := testh.NewWith(t, handle)
md, err := dbase.SourceMetadata(th.Context, false) md, err := pool.SourceMetadata(th.Context, false)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, sakila.TblActor, md.Tables[0].Name) require.Equal(t, sakila.TblActor, md.Tables[0].Name)
require.Equal(t, int64(sakila.TblActorCount), md.Tables[0].RowCount) require.Equal(t, int64(sakila.TblActorCount), md.Tables[0].RowCount)
@ -482,11 +482,11 @@ func TestDatabase_SourceMetadata_concurrent(t *testing.T) { //nolint:tparallel
t.Run(handle, func(t *testing.T) { t.Run(handle, func(t *testing.T) {
t.Parallel() t.Parallel()
th, _, _, dbase, _ := testh.NewWith(t, handle) th, _, _, pool, _ := testh.NewWith(t, handle)
g, gCtx := errgroup.WithContext(th.Context) g, gCtx := errgroup.WithContext(th.Context)
for i := 0; i < concurrency; i++ { for i := 0; i < concurrency; i++ {
g.Go(func() error { g.Go(func() error {
md, err := dbase.SourceMetadata(gCtx, false) md, err := pool.SourceMetadata(gCtx, false)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, md) require.NotNil(t, md)
gotTbl := md.Table(sakila.TblActor) gotTbl := md.Table(sakila.TblActor)
@ -539,7 +539,7 @@ func TestSQLDriver_AlterTableRename(t *testing.T) {
handle := handle handle := handle
t.Run(handle, func(t *testing.T) { t.Run(handle, func(t *testing.T) {
th, src, drvr, dbase, db := testh.NewWith(t, handle) th, src, drvr, pool, db := testh.NewWith(t, handle)
// Make a copy of the table to play with // Make a copy of the table to play with
tbl := th.CopyTable(true, src, tablefq.From(sakila.TblActor), tablefq.T{}, true) tbl := th.CopyTable(true, src, tablefq.From(sakila.TblActor), tablefq.T{}, true)
@ -550,7 +550,7 @@ func TestSQLDriver_AlterTableRename(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer th.DropTable(src, tablefq.From(newName)) defer th.DropTable(src, tablefq.From(newName))
md, err := dbase.TableMetadata(th.Context, newName) md, err := pool.TableMetadata(th.Context, newName)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, newName, md.Name) require.Equal(t, newName, md.Name)
sink, err := th.QuerySQL(src, nil, "SELECT * FROM "+newName) sink, err := th.QuerySQL(src, nil, "SELECT * FROM "+newName)
@ -567,7 +567,7 @@ func TestSQLDriver_AlterTableRenameColumn(t *testing.T) {
handle := handle handle := handle
t.Run(handle, func(t *testing.T) { t.Run(handle, func(t *testing.T) {
th, src, drvr, dbase, db := testh.NewWith(t, handle) th, src, drvr, pool, db := testh.NewWith(t, handle)
// Make a copy of the table to play with // Make a copy of the table to play with
tbl := th.CopyTable(true, src, tablefq.From(sakila.TblActor), tablefq.T{}, true) tbl := th.CopyTable(true, src, tablefq.From(sakila.TblActor), tablefq.T{}, true)
@ -576,7 +576,7 @@ func TestSQLDriver_AlterTableRenameColumn(t *testing.T) {
err := drvr.AlterTableRenameColumn(th.Context, db, tbl, "first_name", newName) err := drvr.AlterTableRenameColumn(th.Context, db, tbl, "first_name", newName)
require.NoError(t, err) require.NoError(t, err)
md, err := dbase.TableMetadata(th.Context, tbl) md, err := pool.TableMetadata(th.Context, tbl)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, md.Column(newName)) require.NotNil(t, md.Column(newName))
sink, err := th.QuerySQL(src, nil, fmt.Sprintf("SELECT %s FROM %s", newName, tbl)) sink, err := th.QuerySQL(src, nil, fmt.Sprintf("SELECT %s FROM %s", newName, tbl))
@ -623,13 +623,13 @@ func TestSQLDriver_CurrentSchema(t *testing.T) {
tc := tc tc := tc
t.Run(tc.handle, func(t *testing.T) { t.Run(tc.handle, func(t *testing.T) {
th, _, drvr, dbase, db := testh.NewWith(t, tc.handle) th, _, drvr, pool, db := testh.NewWith(t, tc.handle)
gotSchema, err := drvr.CurrentSchema(th.Context, db) gotSchema, err := drvr.CurrentSchema(th.Context, db)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, tc.want, gotSchema) require.Equal(t, tc.want, gotSchema)
md, err := dbase.SourceMetadata(th.Context, false) md, err := pool.SourceMetadata(th.Context, false)
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, md) require.NotNil(t, md)
require.Equal(t, md.Schema, gotSchema) require.Equal(t, md.Schema, gotSchema)

View File

@ -30,14 +30,14 @@ type QueryContext struct {
// Collection is the set of sources. // Collection is the set of sources.
Collection *source.Collection Collection *source.Collection
// DBOpener is used to open databases. // PoolOpener is used to open databases.
DBOpener driver.DatabaseOpener PoolOpener driver.PoolOpener
// JoinDBOpener is used to open the joindb. // JoinPoolOpener is used to open the joindb.
JoinDBOpener driver.JoinDatabaseOpener JoinPoolOpener driver.JoinPoolOpener
// ScratchDBOpener is used to open the scratchdb. // ScratchPoolOpener is used to open the scratchdb.
ScratchDBOpener driver.ScratchDatabaseOpener ScratchPoolOpener driver.ScratchPoolOpener
// Args defines variables that are substituted into the query. // Args defines variables that are substituted into the query.
// May be nil or empty. // May be nil or empty.
@ -118,26 +118,26 @@ func SLQ2SQL(ctx context.Context, qc *QueryContext, query string) (targetSQL str
// QuerySQL executes the SQL query, writing the results to recw. If db is // QuerySQL executes the SQL query, writing the results to recw. If db is
// non-nil, the query is executed against it. Otherwise, the connection is // non-nil, the query is executed against it. Otherwise, the connection is
// obtained from dbase. // obtained from pool.
// Note that QuerySQL may return before recw has finished writing, thus the // Note that QuerySQL may return before recw has finished writing, thus the
// caller may wish to wait for recw to complete. // caller may wish to wait for recw to complete.
// The caller is responsible for closing dbase (and db, if non-nil). // The caller is responsible for closing pool (and db, if non-nil).
func QuerySQL(ctx context.Context, dbase driver.Database, db sqlz.DB, func QuerySQL(ctx context.Context, pool driver.Pool, db sqlz.DB,
recw RecordWriter, query string, args ...any, recw RecordWriter, query string, args ...any,
) error { ) error {
log := lg.FromContext(ctx) log := lg.FromContext(ctx)
errw := dbase.SQLDriver().ErrWrapFunc() errw := pool.SQLDriver().ErrWrapFunc()
if db == nil { if db == nil {
var err error var err error
if db, err = dbase.DB(ctx); err != nil { if db, err = pool.DB(ctx); err != nil {
return err return err
} }
} }
rows, err := db.QueryContext(ctx, query, args...) rows, err := db.QueryContext(ctx, query, args...)
if err != nil { if err != nil {
return errz.Wrapf(errw(err), `SQL query against %s failed: %s`, dbase.Source().Handle, query) return errz.Wrapf(errw(err), `SQL query against %s failed: %s`, pool.Source().Handle, query)
} }
defer lg.WarnIfCloseError(log, lgm.CloseDBRows, rows) defer lg.WarnIfCloseError(log, lgm.CloseDBRows, rows)
@ -183,7 +183,7 @@ func QuerySQL(ctx context.Context, dbase driver.Database, db sqlz.DB,
} }
} }
drvr := dbase.SQLDriver() drvr := pool.SQLDriver()
recMeta, recFromScanRowFn, err := drvr.RecordMeta(ctx, colTypes) recMeta, recFromScanRowFn, err := drvr.RecordMeta(ctx, colTypes)
if err != nil { if err != nil {
return errw(err) return errw(err)
@ -213,7 +213,7 @@ func QuerySQL(ctx context.Context, dbase driver.Database, db sqlz.DB,
err = rows.Scan(scanRow...) err = rows.Scan(scanRow...)
if err != nil { if err != nil {
cancelFn() cancelFn()
return errz.Wrapf(errw(err), "query against %s", dbase.Source().Handle) return errz.Wrapf(errw(err), "query against %s", pool.Source().Handle)
} }
// recFromScanRowFn returns a new Record with appropriate // recFromScanRowFn returns a new Record with appropriate

View File

@ -44,16 +44,16 @@ type pipeline struct {
rc *render.Context rc *render.Context
// tasks contains tasks that must be completed before targetSQL // tasks contains tasks that must be completed before targetSQL
// is executed against targetDB. Typically tasks is used to // is executed against targetPool. Typically tasks is used to
// set up the joindb before it is queried. // set up the joindb before it is queried.
tasks []tasker tasks []tasker
// targetSQL is the ultimate SQL query to be executed against targetDB. // targetSQL is the ultimate SQL query to be executed against targetPool.
targetSQL string targetSQL string
// targetDB is the destination for the ultimate SQL query to // targetPool is the destination for the ultimate SQL query to
// be executed against. // be executed against.
targetDB driver.Database targetPool driver.Pool
} }
// newPipeline parses query, returning a pipeline prepared for // newPipeline parses query, returning a pipeline prepared for
@ -87,7 +87,7 @@ func newPipeline(ctx context.Context, qc *QueryContext, query string) (*pipeline
func (p *pipeline) execute(ctx context.Context, recw RecordWriter) error { func (p *pipeline) execute(ctx context.Context, recw RecordWriter) error {
lg.FromContext(ctx).Debug( lg.FromContext(ctx).Debug(
"Execute SQL query", "Execute SQL query",
lga.Src, p.targetDB.Source(), lga.Src, p.targetPool.Source(),
lga.SQL, p.targetSQL, lga.SQL, p.targetSQL,
) )
@ -95,7 +95,7 @@ func (p *pipeline) execute(ctx context.Context, recw RecordWriter) error {
return err return err
} }
return QuerySQL(ctx, p.targetDB, nil, recw, p.targetSQL) return QuerySQL(ctx, p.targetPool, nil, recw, p.targetSQL)
} }
// executeTasks executes any tasks in pipeline.tasks. // executeTasks executes any tasks in pipeline.tasks.
@ -146,15 +146,15 @@ func (p *pipeline) prepareNoTable(ctx context.Context, qm *queryModel) error {
if handle == "" { if handle == "" {
if src = p.qc.Collection.Active(); src == nil { if src = p.qc.Collection.Active(); src == nil {
log.Debug("No active source, will use scratchdb.") log.Debug("No active source, will use scratchdb.")
p.targetDB, err = p.qc.ScratchDBOpener.OpenScratch(ctx, "scratch") p.targetPool, err = p.qc.ScratchPoolOpener.OpenScratch(ctx, "scratch")
if err != nil { if err != nil {
return err return err
} }
p.rc = &render.Context{ p.rc = &render.Context{
Renderer: p.targetDB.SQLDriver().Renderer(), Renderer: p.targetPool.SQLDriver().Renderer(),
Args: p.qc.Args, Args: p.qc.Args,
Dialect: p.targetDB.SQLDriver().Dialect(), Dialect: p.targetPool.SQLDriver().Dialect(),
} }
return nil return nil
} }
@ -165,14 +165,14 @@ func (p *pipeline) prepareNoTable(ctx context.Context, qm *queryModel) error {
} }
// At this point, src is non-nil. // At this point, src is non-nil.
if p.targetDB, err = p.qc.DBOpener.Open(ctx, src); err != nil { if p.targetPool, err = p.qc.PoolOpener.Open(ctx, src); err != nil {
return err return err
} }
p.rc = &render.Context{ p.rc = &render.Context{
Renderer: p.targetDB.SQLDriver().Renderer(), Renderer: p.targetPool.SQLDriver().Renderer(),
Args: p.qc.Args, Args: p.qc.Args,
Dialect: p.targetDB.SQLDriver().Dialect(), Dialect: p.targetPool.SQLDriver().Dialect(),
} }
return nil return nil
@ -182,7 +182,7 @@ func (p *pipeline) prepareNoTable(ctx context.Context, qm *queryModel) error {
// //
// When this function returns, pipeline.rc will be set. // When this function returns, pipeline.rc will be set.
func (p *pipeline) prepareFromTable(ctx context.Context, tblSel *ast.TblSelectorNode) (fromClause string, func (p *pipeline) prepareFromTable(ctx context.Context, tblSel *ast.TblSelectorNode) (fromClause string,
fromConn driver.Database, err error, fromPool driver.Pool, err error,
) { ) {
handle := tblSel.Handle() handle := tblSel.Handle()
if handle == "" { if handle == "" {
@ -197,16 +197,16 @@ func (p *pipeline) prepareFromTable(ctx context.Context, tblSel *ast.TblSelector
return "", nil, err return "", nil, err
} }
fromConn, err = p.qc.DBOpener.Open(ctx, src) fromPool, err = p.qc.PoolOpener.Open(ctx, src)
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }
rndr := fromConn.SQLDriver().Renderer() rndr := fromPool.SQLDriver().Renderer()
p.rc = &render.Context{ p.rc = &render.Context{
Renderer: rndr, Renderer: rndr,
Args: p.qc.Args, Args: p.qc.Args,
Dialect: fromConn.SQLDriver().Dialect(), Dialect: fromPool.SQLDriver().Dialect(),
} }
fromClause, err = rndr.FromTable(p.rc, tblSel) fromClause, err = rndr.FromTable(p.rc, tblSel)
@ -214,7 +214,7 @@ func (p *pipeline) prepareFromTable(ctx context.Context, tblSel *ast.TblSelector
return "", nil, err return "", nil, err
} }
return fromClause, fromConn, nil return fromClause, fromPool, nil
} }
// joinClause models the SQL "JOIN" construct. // joinClause models the SQL "JOIN" construct.
@ -270,7 +270,7 @@ func (jc *joinClause) isSingleSource() bool {
// //
// When this function returns, pipeline.rc will be set. // When this function returns, pipeline.rc will be set.
func (p *pipeline) prepareFromJoin(ctx context.Context, jc *joinClause) (fromClause string, func (p *pipeline) prepareFromJoin(ctx context.Context, jc *joinClause) (fromClause string,
fromConn driver.Database, err error, fromConn driver.Pool, err error,
) { ) {
if jc.isSingleSource() { if jc.isSingleSource() {
return p.joinSingleSource(ctx, jc) return p.joinSingleSource(ctx, jc)
@ -283,23 +283,23 @@ func (p *pipeline) prepareFromJoin(ctx context.Context, jc *joinClause) (fromCla
// //
// On return, pipeline.rc will be set. // On return, pipeline.rc will be set.
func (p *pipeline) joinSingleSource(ctx context.Context, jc *joinClause) (fromClause string, func (p *pipeline) joinSingleSource(ctx context.Context, jc *joinClause) (fromClause string,
fromDB driver.Database, err error, fromPool driver.Pool, err error,
) { ) {
src, err := p.qc.Collection.Get(jc.leftTbl.Handle()) src, err := p.qc.Collection.Get(jc.leftTbl.Handle())
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }
fromDB, err = p.qc.DBOpener.Open(ctx, src) fromPool, err = p.qc.PoolOpener.Open(ctx, src)
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }
rndr := fromDB.SQLDriver().Renderer() rndr := fromPool.SQLDriver().Renderer()
p.rc = &render.Context{ p.rc = &render.Context{
Renderer: rndr, Renderer: rndr,
Args: p.qc.Args, Args: p.qc.Args,
Dialect: fromDB.SQLDriver().Dialect(), Dialect: fromPool.SQLDriver().Dialect(),
} }
fromClause, err = rndr.Join(p.rc, jc.leftTbl, jc.joins) fromClause, err = rndr.Join(p.rc, jc.leftTbl, jc.joins)
@ -307,7 +307,7 @@ func (p *pipeline) joinSingleSource(ctx context.Context, jc *joinClause) (fromCl
return "", nil, err return "", nil, err
} }
return fromClause, fromDB, nil return fromClause, fromPool, nil
} }
// joinCrossSource returns a FROM clause that forms part of // joinCrossSource returns a FROM clause that forms part of
@ -315,10 +315,8 @@ func (p *pipeline) joinSingleSource(ctx context.Context, jc *joinClause) (fromCl
// //
// On return, pipeline.rc will be set. // On return, pipeline.rc will be set.
func (p *pipeline) joinCrossSource(ctx context.Context, jc *joinClause) (fromClause string, func (p *pipeline) joinCrossSource(ctx context.Context, jc *joinClause) (fromClause string,
fromDB driver.Database, err error, fromDB driver.Pool, err error,
) { ) {
// FIXME: finish tidying up
handles := jc.handles() handles := jc.handles()
srcs := make([]*source.Source, 0, len(handles)) srcs := make([]*source.Source, 0, len(handles))
for _, handle := range handles { for _, handle := range handles {
@ -330,16 +328,16 @@ func (p *pipeline) joinCrossSource(ctx context.Context, jc *joinClause) (fromCla
} }
// Open the join db // Open the join db
joinDB, err := p.qc.JoinDBOpener.OpenJoin(ctx, srcs...) joinPool, err := p.qc.JoinPoolOpener.OpenJoin(ctx, srcs...)
if err != nil { if err != nil {
return "", nil, err return "", nil, err
} }
rndr := joinDB.SQLDriver().Renderer() rndr := joinPool.SQLDriver().Renderer()
p.rc = &render.Context{ p.rc = &render.Context{
Renderer: rndr, Renderer: rndr,
Args: p.qc.Args, Args: p.qc.Args,
Dialect: joinDB.SQLDriver().Dialect(), Dialect: joinPool.SQLDriver().Dialect(),
} }
leftHandle := jc.leftTbl.Handle() leftHandle := jc.leftTbl.Handle()
@ -356,15 +354,15 @@ func (p *pipeline) joinCrossSource(ctx context.Context, jc *joinClause) (fromCla
if src, err = p.qc.Collection.Get(handle); err != nil { if src, err = p.qc.Collection.Get(handle); err != nil {
return "", nil, err return "", nil, err
} }
var db driver.Database var db driver.Pool
if db, err = p.qc.DBOpener.Open(ctx, src); err != nil { if db, err = p.qc.PoolOpener.Open(ctx, src); err != nil {
return "", nil, err return "", nil, err
} }
task := &joinCopyTask{ task := &joinCopyTask{
fromDB: db, fromPool: db,
fromTbl: tbl.Table(), fromTbl: tbl.Table(),
toDB: joinDB, toPool: joinPool,
toTbl: tbl.TblAliasOrName(), toTbl: tbl.TblAliasOrName(),
} }
@ -378,7 +376,7 @@ func (p *pipeline) joinCrossSource(ctx context.Context, jc *joinClause) (fromCla
return "", nil, err return "", nil, err
} }
return fromClause, joinDB, nil return fromClause, joinPool, nil
} }
// tasker is the interface for executing a DB task. // tasker is the interface for executing a DB task.
@ -389,59 +387,59 @@ type tasker interface {
// joinCopyTask is a specification of a table data copy task to be performed // joinCopyTask is a specification of a table data copy task to be performed
// for a cross-source join. That is, the data in fromDB.fromTblName will // for a cross-source join. That is, the data in fromDB.fromTblName will
// be copied to a table in toDB. If colNames is // be copied to a table in toPool. If colNames is
// empty, all cols in fromTbl are to be copied. // empty, all cols in fromTbl are to be copied.
type joinCopyTask struct { type joinCopyTask struct {
fromDB driver.Database fromPool driver.Pool
fromTbl tablefq.T fromTbl tablefq.T
toDB driver.Database toPool driver.Pool
toTbl tablefq.T toTbl tablefq.T
} }
func (jt *joinCopyTask) executeTask(ctx context.Context) error { func (jt *joinCopyTask) executeTask(ctx context.Context) error {
return execCopyTable(ctx, jt.fromDB, jt.fromTbl, jt.toDB, jt.toTbl) return execCopyTable(ctx, jt.fromPool, jt.fromTbl, jt.toPool, jt.toTbl)
} }
// execCopyTable performs the work of copying fromDB.fromTbl to destDB.destTbl. // execCopyTable performs the work of copying fromDB.fromTbl to destPool.destTbl.
func execCopyTable(ctx context.Context, fromDB driver.Database, fromTbl tablefq.T, func execCopyTable(ctx context.Context, fromDB driver.Pool, fromTbl tablefq.T,
destDB driver.Database, destTbl tablefq.T, destPool driver.Pool, destTbl tablefq.T,
) error { ) error {
log := lg.FromContext(ctx) log := lg.FromContext(ctx)
createTblHook := func(ctx context.Context, originRecMeta record.Meta, destDB driver.Database, createTblHook := func(ctx context.Context, originRecMeta record.Meta, destPool driver.Pool,
tx sqlz.DB, tx sqlz.DB,
) error { ) error {
destColNames := originRecMeta.Names() destColNames := originRecMeta.Names()
destColKinds := originRecMeta.Kinds() destColKinds := originRecMeta.Kinds()
destTblDef := sqlmodel.NewTableDef(destTbl.Table, destColNames, destColKinds) destTblDef := sqlmodel.NewTableDef(destTbl.Table, destColNames, destColKinds)
err := destDB.SQLDriver().CreateTable(ctx, tx, destTblDef) err := destPool.SQLDriver().CreateTable(ctx, tx, destTblDef)
if err != nil { if err != nil {
return errz.Wrapf(err, "failed to create dest table %s.%s", destDB.Source().Handle, destTbl) return errz.Wrapf(err, "failed to create dest table %s.%s", destPool.Source().Handle, destTbl)
} }
return nil return nil
} }
inserter := NewDBWriter( inserter := NewDBWriter(
destDB, destPool,
destTbl.Table, destTbl.Table,
driver.OptTuningRecChanSize.Get(destDB.Source().Options), driver.OptTuningRecChanSize.Get(destPool.Source().Options),
createTblHook, createTblHook,
) )
query := "SELECT * FROM " + fromTbl.Render(fromDB.SQLDriver().Dialect().Enquote) query := "SELECT * FROM " + fromTbl.Render(fromDB.SQLDriver().Dialect().Enquote)
err := QuerySQL(ctx, fromDB, nil, inserter, query) err := QuerySQL(ctx, fromDB, nil, inserter, query)
if err != nil { if err != nil {
return errz.Wrapf(err, "insert %s.%s failed", destDB.Source().Handle, destTbl) return errz.Wrapf(err, "insert %s.%s failed", destPool.Source().Handle, destTbl)
} }
affected, err := inserter.Wait() // Wait for the writer to finish processing affected, err := inserter.Wait() // Wait for the writer to finish processing
if err != nil { if err != nil {
return errz.Wrapf(err, "insert %s.%s failed", destDB.Source().Handle, destTbl) return errz.Wrapf(err, "insert %s.%s failed", destPool.Source().Handle, destTbl)
} }
log.Debug("Copied rows to dest", lga.Count, affected, log.Debug("Copied rows to dest", lga.Count, affected,
lga.From, fmt.Sprintf("%s.%s", fromDB.Source().Handle, fromTbl), lga.From, fmt.Sprintf("%s.%s", fromDB.Source().Handle, fromTbl),
lga.To, fmt.Sprintf("%s.%s", destDB.Source().Handle, destTbl)) lga.To, fmt.Sprintf("%s.%s", destPool.Source().Handle, destTbl))
return nil return nil
} }

View File

@ -7,9 +7,9 @@ import (
) )
// prepare prepares the pipeline to execute queryModel. // prepare prepares the pipeline to execute queryModel.
// When this method returns, targetDB and targetSQL will be set, // When this method returns, targetPool and targetSQL will be set,
// as will any tasks (which may be empty). The tasks must be executed // as will any tasks (which may be empty). The tasks must be executed
// against targetDB before targetSQL is executed (the pipeline.execute // against targetPool before targetSQL is executed (the pipeline.execute
// method does this work). // method does this work).
func (p *pipeline) prepare(ctx context.Context, qm *queryModel) error { func (p *pipeline) prepare(ctx context.Context, qm *queryModel) error {
var ( var (
@ -25,11 +25,11 @@ func (p *pipeline) prepare(ctx context.Context, qm *queryModel) error {
} }
case len(qm.Joins) > 0: case len(qm.Joins) > 0:
jc := &joinClause{leftTbl: qm.Table, joins: qm.Joins} jc := &joinClause{leftTbl: qm.Table, joins: qm.Joins}
if frags.From, p.targetDB, err = p.prepareFromJoin(ctx, jc); err != nil { if frags.From, p.targetPool, err = p.prepareFromJoin(ctx, jc); err != nil {
return err return err
} }
default: default:
if frags.From, p.targetDB, err = p.prepareFromTable(ctx, qm.Table); err != nil { if frags.From, p.targetPool, err = p.prepareFromTable(ctx, qm.Table); err != nil {
return err return err
} }
} }

View File

@ -31,13 +31,13 @@ func TestQuery_no_source(t *testing.T) {
t.Logf("\nquery: %s\n want: %s", tc.in, tc.want) t.Logf("\nquery: %s\n want: %s", tc.in, tc.want)
th := testh.New(t) th := testh.New(t)
coll := th.NewCollection() coll := th.NewCollection()
dbases := th.Databases() pools := th.Pools()
qc := &libsq.QueryContext{ qc := &libsq.QueryContext{
Collection: coll, Collection: coll,
DBOpener: dbases, PoolOpener: pools,
JoinDBOpener: dbases, JoinPoolOpener: pools,
ScratchDBOpener: dbases, ScratchPoolOpener: pools,
} }
gotSQL, gotErr := libsq.SLQ2SQL(th.Context, qc, tc.in) gotSQL, gotErr := libsq.SLQ2SQL(th.Context, qc, tc.in)

View File

@ -168,13 +168,13 @@ func doExecQueryTestCase(t *testing.T, tc queryTestCase) {
require.NoError(t, err) require.NoError(t, err)
th := testh.New(t) th := testh.New(t)
dbases := th.Databases() pools := th.Pools()
qc := &libsq.QueryContext{ qc := &libsq.QueryContext{
Collection: coll, Collection: coll,
DBOpener: dbases, PoolOpener: pools,
JoinDBOpener: dbases, JoinPoolOpener: pools,
ScratchDBOpener: dbases, ScratchPoolOpener: pools,
Args: tc.args, Args: tc.args,
} }

View File

@ -105,7 +105,7 @@ type Helper struct {
registry *driver.Registry registry *driver.Registry
files *source.Files files *source.Files
databases *driver.Databases pools *driver.Pools
run *run.Run run *run.Run
initOnce sync.Once initOnce sync.Once
@ -145,20 +145,20 @@ func New(t testing.TB, opts ...Option) *Helper {
} }
// NewWith is a convenience wrapper for New that also returns // NewWith is a convenience wrapper for New that also returns
// a Source for handle, the driver.SQLDriver, driver.Database, // a Source for handle, the driver.SQLDriver, driver.Pool,
// and the *sql.DB. // and the *sql.DB.
// //
// The function will fail if handle is not the handle for a // The function will fail if handle is not the handle for a
// source whose driver implements driver.SQLDriver. // source whose driver implements driver.SQLDriver.
func NewWith(t testing.TB, handle string) (*Helper, *source.Source, driver.SQLDriver, driver.Database, *sql.DB) { func NewWith(t testing.TB, handle string) (*Helper, *source.Source, driver.SQLDriver, driver.Pool, *sql.DB) {
th := New(t) th := New(t)
src := th.Source(handle) src := th.Source(handle)
drvr := th.SQLDriverFor(src) drvr := th.SQLDriverFor(src)
dbase := th.Open(src) pool := th.Open(src)
db, err := dbase.DB(th.Context) db, err := pool.DB(th.Context)
require.NoError(t, err) require.NoError(t, err)
return th, src, drvr, dbase, db return th, src, drvr, pool, db
} }
func (h *Helper) init() { func (h *Helper) init() {
@ -178,20 +178,20 @@ func (h *Helper) init() {
h.files.AddDriverDetectors(source.DetectMagicNumber) h.files.AddDriverDetectors(source.DetectMagicNumber)
h.databases = driver.NewDatabases(log, h.registry, sqlite3.NewScratchSource) h.pools = driver.NewPools(log, h.registry, sqlite3.NewScratchSource)
h.Cleanup.AddC(h.databases) h.Cleanup.AddC(h.pools)
h.registry.AddProvider(sqlite3.Type, &sqlite3.Provider{Log: log}) h.registry.AddProvider(sqlite3.Type, &sqlite3.Provider{Log: log})
h.registry.AddProvider(postgres.Type, &postgres.Provider{Log: log}) h.registry.AddProvider(postgres.Type, &postgres.Provider{Log: log})
h.registry.AddProvider(sqlserver.Type, &sqlserver.Provider{Log: log}) h.registry.AddProvider(sqlserver.Type, &sqlserver.Provider{Log: log})
h.registry.AddProvider(mysql.Type, &mysql.Provider{Log: log}) h.registry.AddProvider(mysql.Type, &mysql.Provider{Log: log})
csvp := &csv.Provider{Log: log, Scratcher: h.databases, Files: h.files} csvp := &csv.Provider{Log: log, Scratcher: h.pools, Files: h.files}
h.registry.AddProvider(csv.TypeCSV, csvp) h.registry.AddProvider(csv.TypeCSV, csvp)
h.registry.AddProvider(csv.TypeTSV, csvp) h.registry.AddProvider(csv.TypeTSV, csvp)
h.files.AddDriverDetectors(csv.DetectCSV, csv.DetectTSV) h.files.AddDriverDetectors(csv.DetectCSV, csv.DetectTSV)
jsonp := &json.Provider{Log: log, Scratcher: h.databases, Files: h.files} jsonp := &json.Provider{Log: log, Scratcher: h.pools, Files: h.files}
h.registry.AddProvider(json.TypeJSON, jsonp) h.registry.AddProvider(json.TypeJSON, jsonp)
h.registry.AddProvider(json.TypeJSONA, jsonp) h.registry.AddProvider(json.TypeJSONA, jsonp)
h.registry.AddProvider(json.TypeJSONL, jsonp) h.registry.AddProvider(json.TypeJSONL, jsonp)
@ -201,7 +201,7 @@ func (h *Helper) init() {
json.DetectJSONL(driver.OptIngestSampleSize.Get(nil)), json.DetectJSONL(driver.OptIngestSampleSize.Get(nil)),
) )
h.registry.AddProvider(xlsx.Type, &xlsx.Provider{Log: log, Scratcher: h.databases, Files: h.files}) h.registry.AddProvider(xlsx.Type, &xlsx.Provider{Log: log, Scratcher: h.pools, Files: h.files})
h.files.AddDriverDetectors(xlsx.DetectXLSX) h.files.AddDriverDetectors(xlsx.DetectXLSX)
h.addUserDrivers() h.addUserDrivers()
@ -368,49 +368,49 @@ func (h *Helper) NewCollection(handles ...string) *source.Collection {
return coll return coll
} }
// Open opens a Database for src via h's internal Databases // Open opens a driver.Pool for src via h's internal Pools
// instance: thus subsequent calls to Open may return the // instance: thus subsequent calls to Open may return the
// same Database instance. The opened Database will be closed // same Pool instance. The opened driver.Pool will be closed
// during h.Close. // during h.Close.
func (h *Helper) Open(src *source.Source) driver.Database { func (h *Helper) Open(src *source.Source) driver.Pool {
ctx, cancelFn := context.WithTimeout(h.Context, h.dbOpenTimeout) ctx, cancelFn := context.WithTimeout(h.Context, h.dbOpenTimeout)
defer cancelFn() defer cancelFn()
dbase, err := h.Databases().Open(ctx, src) pool, err := h.Pools().Open(ctx, src)
require.NoError(h.T, err) require.NoError(h.T, err)
db, err := dbase.DB(ctx) db, err := pool.DB(ctx)
require.NoError(h.T, err) require.NoError(h.T, err)
require.NoError(h.T, db.PingContext(ctx)) require.NoError(h.T, db.PingContext(ctx))
return dbase return pool
} }
// OpenDB is a convenience method for getting the sql.DB for src. // OpenDB is a convenience method for getting the sql.DB for src.
// The returned sql.DB is closed during h.Close, via the closing // The returned sql.DB is closed during h.Close, via the closing
// of its parent driver.Database. // of its parent driver.Pool.
func (h *Helper) OpenDB(src *source.Source) *sql.DB { func (h *Helper) OpenDB(src *source.Source) *sql.DB {
dbase := h.Open(src) pool := h.Open(src)
db, err := dbase.DB(h.Context) db, err := pool.DB(h.Context)
require.NoError(h.T, err) require.NoError(h.T, err)
return db return db
} }
// openNew opens a new Database. It is the caller's responsibility // openNew opens a new driver.Pool. It is the caller's responsibility
// to close the returned Database. Unlike method Open, this method // to close the returned Pool. Unlike method Open, this method
// will always invoke the driver's Open method. // will always invoke the driver's Open method.
// //
// Some of Helper's methods (e.g. DropTable) need to use openNew rather // Some of Helper's methods (e.g. DropTable) need to use openNew rather
// than Open, as the Database returned by Open can be closed by test code, // than Open, as the Pool returned by Open can be closed by test code,
// potentially causing problems during Cleanup. // potentially causing problems during Cleanup.
func (h *Helper) openNew(src *source.Source) driver.Database { func (h *Helper) openNew(src *source.Source) driver.Pool {
h.Log.Debug("openNew", lga.Src, src) h.Log.Debug("openNew", lga.Src, src)
reg := h.Registry() reg := h.Registry()
drvr, err := reg.DriverFor(src.Type) drvr, err := reg.DriverFor(src.Type)
require.NoError(h.T, err) require.NoError(h.T, err)
dbase, err := drvr.Open(h.Context, src) pool, err := drvr.Open(h.Context, src)
require.NoError(h.T, err) require.NoError(h.T, err)
return dbase return pool
} }
// SQLDriverFor is a convenience method to get src's driver.SQLDriver. // SQLDriverFor is a convenience method to get src's driver.SQLDriver.
@ -436,12 +436,12 @@ func (h *Helper) DriverFor(src *source.Source) driver.Driver {
// RowCount returns the result of "SELECT COUNT(*) FROM tbl", // RowCount returns the result of "SELECT COUNT(*) FROM tbl",
// failing h's test on any error. // failing h's test on any error.
func (h *Helper) RowCount(src *source.Source, tbl string) int64 { func (h *Helper) RowCount(src *source.Source, tbl string) int64 {
dbase := h.openNew(src) pool := h.openNew(src)
defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, dbase) defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, pool)
query := "SELECT COUNT(*) FROM " + dbase.SQLDriver().Dialect().Enquote(tbl) query := "SELECT COUNT(*) FROM " + pool.SQLDriver().Dialect().Enquote(tbl)
var count int64 var count int64
db, err := dbase.DB(h.Context) db, err := pool.DB(h.Context)
require.NoError(h.T, err) require.NoError(h.T, err)
require.NoError(h.T, db.QueryRowContext(h.Context, query).Scan(&count)) require.NoError(h.T, db.QueryRowContext(h.Context, query).Scan(&count))
@ -454,13 +454,13 @@ func (h *Helper) RowCount(src *source.Source, tbl string) int64 {
func (h *Helper) CreateTable(dropAfter bool, src *source.Source, tblDef *sqlmodel.TableDef, func (h *Helper) CreateTable(dropAfter bool, src *source.Source, tblDef *sqlmodel.TableDef,
data ...[]any, data ...[]any,
) (affected int64) { ) (affected int64) {
dbase := h.openNew(src) pool := h.openNew(src)
defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, dbase) defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, pool)
db, err := dbase.DB(h.Context) db, err := pool.DB(h.Context)
require.NoError(h.T, err) require.NoError(h.T, err)
require.NoError(h.T, dbase.SQLDriver().CreateTable(h.Context, db, tblDef)) require.NoError(h.T, pool.SQLDriver().CreateTable(h.Context, db, tblDef))
h.T.Logf("Created table %s.%s", src.Handle, tblDef.Name) h.T.Logf("Created table %s.%s", src.Handle, tblDef.Name)
if dropAfter { if dropAfter {
@ -482,11 +482,11 @@ func (h *Helper) Insert(src *source.Source, tbl string, cols []string, records .
return 0 return 0
} }
dbase := h.openNew(src) pool := h.openNew(src)
defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, dbase) defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, pool)
drvr := dbase.SQLDriver() drvr := pool.SQLDriver()
db, err := dbase.DB(h.Context) db, err := pool.DB(h.Context)
require.NoError(h.T, err) require.NoError(h.T, err)
conn, err := db.Conn(h.Context) conn, err := db.Conn(h.Context)
@ -547,13 +547,13 @@ func (h *Helper) CopyTable(
toTable.Table = stringz.UniqTableName(fromTable.Table) toTable.Table = stringz.UniqTableName(fromTable.Table)
} }
dbase := h.openNew(src) pool := h.openNew(src)
defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, dbase) defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, pool)
db, err := dbase.DB(h.Context) db, err := pool.DB(h.Context)
require.NoError(h.T, err) require.NoError(h.T, err)
copied, err := dbase.SQLDriver().CopyTable( copied, err := pool.SQLDriver().CopyTable(
h.Context, h.Context,
db, db,
fromTable, fromTable,
@ -577,28 +577,28 @@ func (h *Helper) CopyTable(
// DropTable drops tbl from src. // DropTable drops tbl from src.
func (h *Helper) DropTable(src *source.Source, tbl tablefq.T) { func (h *Helper) DropTable(src *source.Source, tbl tablefq.T) {
dbase := h.openNew(src) pool := h.openNew(src)
defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, dbase) defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, pool)
db, err := dbase.DB(h.Context) db, err := pool.DB(h.Context)
require.NoError(h.T, err) require.NoError(h.T, err)
require.NoError(h.T, dbase.SQLDriver().DropTable(h.Context, db, tbl, true)) require.NoError(h.T, pool.SQLDriver().DropTable(h.Context, db, tbl, true))
h.Log.Debug("Dropped table", lga.Target, source.Target(src, tbl.Table)) h.Log.Debug("Dropped table", lga.Target, source.Target(src, tbl.Table))
} }
// QuerySQL uses libsq.QuerySQL to execute SQL query // QuerySQL uses libsq.QuerySQL to execute SQL query
// against src, returning a sink to which all records have // against src, returning a sink to which all records have
// been written. Typically the db arg is nil, and QuerySQL uses the // been written. Typically the db arg is nil, and QuerySQL uses the
// same driver.Database instance as returned by Helper.Open. If db // same driver.Pool instance as returned by Helper.Open. If db
// is non-nil, it is passed to libsq.QuerySQL (e.g. the query needs to // is non-nil, it is passed to libsq.QuerySQL (e.g. the query needs to
// execute against a sql.Tx), and the caller is responsible for closing db. // execute against a sql.Tx), and the caller is responsible for closing db.
func (h *Helper) QuerySQL(src *source.Source, db sqlz.DB, query string, args ...any) (*RecordSink, error) { func (h *Helper) QuerySQL(src *source.Source, db sqlz.DB, query string, args ...any) (*RecordSink, error) {
dbase := h.Open(src) pool := h.Open(src)
sink := &RecordSink{} sink := &RecordSink{}
recw := output.NewRecordWriterAdapter(h.Context, sink) recw := output.NewRecordWriterAdapter(h.Context, sink)
err := libsq.QuerySQL(h.Context, dbase, db, recw, query, args...) err := libsq.QuerySQL(h.Context, pool, db, recw, query, args...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -624,9 +624,9 @@ func (h *Helper) QuerySLQ(query string, args map[string]string) (*RecordSink, er
qc := &libsq.QueryContext{ qc := &libsq.QueryContext{
Collection: h.coll, Collection: h.coll,
DBOpener: h.databases, PoolOpener: h.pools,
JoinDBOpener: h.databases, JoinPoolOpener: h.pools,
ScratchDBOpener: h.databases, ScratchPoolOpener: h.pools,
Args: args, Args: args,
} }
@ -647,7 +647,7 @@ func (h *Helper) QuerySLQ(query string, args map[string]string) (*RecordSink, er
// ExecSQL is a convenience wrapper for sql.DB.Exec that returns the // ExecSQL is a convenience wrapper for sql.DB.Exec that returns the
// rows affected, failing on any error. Note that ExecSQL uses the // rows affected, failing on any error. Note that ExecSQL uses the
// same Database instance as returned by h.Open. // same Pool instance as returned by h.Open.
func (h *Helper) ExecSQL(src *source.Source, query string, args ...any) (affected int64) { func (h *Helper) ExecSQL(src *source.Source, query string, args ...any) (affected int64) {
db := h.OpenDB(src) db := h.OpenDB(src)
@ -688,8 +688,8 @@ func (h *Helper) InsertDefaultRow(src *source.Source, tbl string) {
// TruncateTable truncates tbl in src. // TruncateTable truncates tbl in src.
func (h *Helper) TruncateTable(src *source.Source, tbl string) (affected int64) { func (h *Helper) TruncateTable(src *source.Source, tbl string) (affected int64) {
dbase := h.openNew(src) pool := h.openNew(src)
defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, dbase) defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, pool)
affected, err := h.DriverFor(src).Truncate(h.Context, src, tbl, true) affected, err := h.DriverFor(src).Truncate(h.Context, src, tbl, true)
require.NoError(h.T, err) require.NoError(h.T, err)
@ -734,7 +734,7 @@ func (h *Helper) addUserDrivers() {
Log: h.Log, Log: h.Log,
DriverDef: userDriverDef, DriverDef: userDriverDef,
ImportFn: importFn, ImportFn: importFn,
Scratcher: h.databases, Scratcher: h.pools,
Files: h.files, Files: h.files,
} }
@ -748,10 +748,10 @@ func (h *Helper) IsMonotable(src *source.Source) bool {
return h.DriverFor(src).DriverMetadata().Monotable return h.DriverFor(src).DriverMetadata().Monotable
} }
// Databases returns the helper's Databases instance. // Pools returns the helper's driver.Pools instance.
func (h *Helper) Databases() *driver.Databases { func (h *Helper) Pools() *driver.Pools {
h.init() h.init()
return h.databases return h.pools
} }
// Files returns the helper's Files instance. // Files returns the helper's Files instance.
@ -762,22 +762,22 @@ func (h *Helper) Files() *source.Files {
// SourceMetadata returns metadata for src. // SourceMetadata returns metadata for src.
func (h *Helper) SourceMetadata(src *source.Source) (*source.Metadata, error) { func (h *Helper) SourceMetadata(src *source.Source) (*source.Metadata, error) {
dbases, err := h.Databases().Open(h.Context, src) pools, err := h.Pools().Open(h.Context, src)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return dbases.SourceMetadata(h.Context, false) return pools.SourceMetadata(h.Context, false)
} }
// TableMetadata returns metadata for src's table. // TableMetadata returns metadata for src's table.
func (h *Helper) TableMetadata(src *source.Source, tbl string) (*source.TableMetadata, error) { func (h *Helper) TableMetadata(src *source.Source, tbl string) (*source.TableMetadata, error) {
dbases, err := h.Databases().Open(h.Context, src) pools, err := h.Pools().Open(h.Context, src)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return dbases.TableMetadata(h.Context, tbl) return pools.TableMetadata(h.Context, tbl)
} }
// DiffDB fails the test if src's metadata is substantially different // DiffDB fails the test if src's metadata is substantially different