From 82727b389068da16f76f22ff5d4f40b5f6e39df6 Mon Sep 17 00:00:00 2001 From: Neil O'Toole Date: Sat, 18 Nov 2023 19:21:14 -0700 Subject: [PATCH] Refactor/rename database to pool (#328) * Renamed `driver.Database` to `driver.Pool` (and related things) * workflow: Update tparse version * workflow: Update golangci-lint version --- .github/workflows/main.yml | 5 +- cli/cmd_inspect.go | 10 +- cli/cmd_slq.go | 12 +- cli/cmd_sql.go | 14 +-- cli/cmd_tbl.go | 12 +- cli/complete.go | 10 +- cli/diff/source.go | 4 +- cli/diff/table.go | 4 +- cli/output/adapter_test.go | 4 +- cli/run.go | 12 +- cli/run/run.go | 14 +-- drivers/csv/csv.go | 70 +++++------ drivers/csv/ingest.go | 14 +-- drivers/csv/insert.go | 6 +- drivers/json/import.go | 16 +-- drivers/json/import_json.go | 4 +- drivers/json/import_jsona.go | 14 +-- drivers/json/import_jsonl.go | 4 +- drivers/json/import_test.go | 8 +- drivers/json/internal_test.go | 4 +- drivers/json/json.go | 74 +++++------ drivers/mysql/metadata.go | 4 +- drivers/mysql/metadata_test.go | 8 +- drivers/mysql/mysql.go | 48 ++++---- drivers/postgres/postgres.go | 50 ++++---- drivers/postgres/postgres_test.go | 10 +- drivers/sqlite3/metadata_test.go | 8 +- drivers/sqlite3/sqlite3.go | 68 +++++------ drivers/sqlserver/sqlserver.go | 36 +++--- drivers/userdriver/userdriver.go | 56 ++++----- drivers/userdriver/userdriver_test.go | 8 +- drivers/userdriver/xmlud/xmlimport.go | 24 ++-- drivers/userdriver/xmlud/xmlimport_test.go | 4 +- drivers/xlsx/database.go | 102 ++++++++-------- drivers/xlsx/ingest.go | 26 ++-- drivers/xlsx/xlsx.go | 26 ++-- drivers/xlsx/xlsx_test.go | 4 +- libsq/dbwriter.go | 34 +++--- libsq/driver/driver.go | 95 +++++++------- libsq/driver/driver_test.go | 30 ++--- libsq/libsq.go | 28 ++--- libsq/pipeline.go | 104 ++++++++-------- libsq/prepare.go | 8 +- libsq/query_no_src_test.go | 10 +- libsq/query_test.go | 12 +- testh/testh.go | 136 ++++++++++----------- 46 files changed, 626 insertions(+), 628 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 93f98359..1b0f799b 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,8 +11,8 @@ on: env: GO_VERSION: 1.21.0 GORELEASER_VERSION: 1.20.0 - GOLANGCI_LINT_VERSION: v1.54.1 - TPARSE_VERSION: v0.11.1 + GOLANGCI_LINT_VERSION: v1.55.2 + TPARSE_VERSION: v0.13.2 BUILD_TAGS: 'sqlite_vtable sqlite_stat4 sqlite_fts5 sqlite_introspect sqlite_json sqlite_math_functions' jobs: @@ -129,6 +129,7 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v3 with: + skip-cache: true version: ${{ env.GOLANGCI_LINT_VERSION }} diff --git a/cli/cmd_inspect.go b/cli/cmd_inspect.go index 6d96c26b..713de4df 100644 --- a/cli/cmd_inspect.go +++ b/cli/cmd_inspect.go @@ -166,14 +166,14 @@ func execInspect(cmd *cobra.Command, args []string) error { return err } - dbase, err := ru.Databases.Open(ctx, src) + pool, err := ru.Pools.Open(ctx, src) if err != nil { return errz.Wrapf(err, "failed to inspect %s", src.Handle) } if table != "" { var tblMeta *source.TableMetadata - tblMeta, err = dbase.TableMetadata(ctx, table) + tblMeta, err = pool.TableMetadata(ctx, table) if err != nil { return err } @@ -183,11 +183,11 @@ func execInspect(cmd *cobra.Command, args []string) error { if cmdFlagIsSetTrue(cmd, flag.InspectDBProps) { var db *sql.DB - if db, err = dbase.DB(ctx); err != nil { + if db, err = pool.DB(ctx); err != nil { return err } var props map[string]any - sqlDrvr := dbase.SQLDriver() + sqlDrvr := pool.SQLDriver() if props, err = sqlDrvr.DBProperties(ctx, db); err != nil { return err } @@ -197,7 +197,7 @@ func execInspect(cmd *cobra.Command, args []string) error { overviewOnly := cmdFlagIsSetTrue(cmd, flag.InspectOverview) - srcMeta, err := dbase.SourceMetadata(ctx, overviewOnly) + srcMeta, err := pool.SourceMetadata(ctx, overviewOnly) if err != nil { return errz.Wrapf(err, "failed to read %s source metadata", src.Handle) } diff --git a/cli/cmd_slq.go b/cli/cmd_slq.go index 0f6e11b5..f6d24b83 100644 --- a/cli/cmd_slq.go +++ b/cli/cmd_slq.go @@ -141,7 +141,7 @@ func execSLQInsert(ctx context.Context, ru *run.Run, mArgs map[string]string, ctx, cancelFn := context.WithCancel(ctx) defer cancelFn() - destDB, err := ru.Databases.Open(ctx, destSrc) + destPool, err := ru.Pools.Open(ctx, destSrc) if err != nil { return err } @@ -152,7 +152,7 @@ func execSLQInsert(ctx context.Context, ru *run.Run, mArgs map[string]string, // stack. inserter := libsq.NewDBWriter( - destDB, + destPool, destTbl, driver.OptTuningRecChanSize.Get(destSrc.Options), libsq.DBWriterCreateTableIfNotExistsHook(destTbl), @@ -209,7 +209,7 @@ func execSLQPrint(ctx context.Context, ru *run.Run, mArgs map[string]string) err // // $ cat something.xlsx | sq @stdin.sheet1 func preprocessUserSLQ(ctx context.Context, ru *run.Run, args []string) (string, error) { - log, reg, dbases, coll := lg.FromContext(ctx), ru.DriverRegistry, ru.Databases, ru.Config.Collection + log, reg, pools, coll := lg.FromContext(ctx), ru.DriverRegistry, ru.Pools, ru.Config.Collection activeSrc := coll.Active() if len(args) == 0 { @@ -240,13 +240,13 @@ func preprocessUserSLQ(ctx context.Context, ru *run.Run, args []string) (string, // This isn't a monotable src, so we can't // just select @stdin.data. Instead we'll select // the first table name, as found in the source meta. - dbase, err := dbases.Open(ctx, activeSrc) + pool, err := pools.Open(ctx, activeSrc) if err != nil { return "", err } - defer lg.WarnIfCloseError(log, lgm.CloseDB, dbase) + defer lg.WarnIfCloseError(log, lgm.CloseDB, pool) - srcMeta, err := dbase.SourceMetadata(ctx, false) + srcMeta, err := pool.SourceMetadata(ctx, false) if err != nil { return "", err } diff --git a/cli/cmd_sql.go b/cli/cmd_sql.go index 8a15e8c4..d6554804 100644 --- a/cli/cmd_sql.go +++ b/cli/cmd_sql.go @@ -118,13 +118,13 @@ func execSQL(cmd *cobra.Command, args []string) error { // to the configured writer. func execSQLPrint(ctx context.Context, ru *run.Run, fromSrc *source.Source) error { args := ru.Args - dbase, err := ru.Databases.Open(ctx, fromSrc) + pool, err := ru.Pools.Open(ctx, fromSrc) if err != nil { return err } recw := output.NewRecordWriterAdapter(ctx, ru.Writers.Record) - err = libsq.QuerySQL(ctx, dbase, nil, recw, args[0]) + err = libsq.QuerySQL(ctx, pool, nil, recw, args[0]) if err != nil { return err } @@ -138,27 +138,27 @@ func execSQLInsert(ctx context.Context, ru *run.Run, fromSrc, destSrc *source.Source, destTbl string, ) error { args := ru.Args - dbases := ru.Databases + pools := ru.Pools ctx, cancelFn := context.WithCancel(ctx) defer cancelFn() - fromDB, err := dbases.Open(ctx, fromSrc) + fromDB, err := pools.Open(ctx, fromSrc) if err != nil { return err } - destDB, err := dbases.Open(ctx, destSrc) + destPool, err := pools.Open(ctx, destSrc) if err != nil { return err } // Note: We don't need to worry about closing fromDB and - // destDB because they are closed by dbases.Close, which + // destPool because they are closed by pools.Close, which // is invoked by ru.Close, and ru is closed further up the // stack. inserter := libsq.NewDBWriter( - destDB, + destPool, destTbl, driver.OptTuningRecChanSize.Get(destSrc.Options), libsq.DBWriterCreateTableIfNotExistsHook(destTbl), diff --git a/cli/cmd_tbl.go b/cli/cmd_tbl.go index 163e0fb3..62ce3fc6 100644 --- a/cli/cmd_tbl.go +++ b/cli/cmd_tbl.go @@ -124,13 +124,13 @@ func execTblCopy(cmd *cobra.Command, args []string) error { return err } - var dbase driver.Database - dbase, err = ru.Databases.Open(ctx, tblHandles[0].src) + var pool driver.Pool + pool, err = ru.Pools.Open(ctx, tblHandles[0].src) if err != nil { return err } - db, err := dbase.DB(ctx) + db, err := pool.DB(ctx) if err != nil { return err } @@ -257,13 +257,13 @@ func execTblDrop(cmd *cobra.Command, args []string) (err error) { return errz.Errorf("driver type {%s} (%s) doesn't support dropping tables", tblH.src.Type, tblH.src.Handle) } - var dbase driver.Database - if dbase, err = ru.Databases.Open(ctx, tblH.src); err != nil { + var pool driver.Pool + if pool, err = ru.Pools.Open(ctx, tblH.src); err != nil { return err } var db *sql.DB - if db, err = dbase.DB(ctx); err != nil { + if db, err = pool.DB(ctx); err != nil { return err } diff --git a/cli/complete.go b/cli/complete.go index 8f731278..463b209b 100644 --- a/cli/complete.go +++ b/cli/complete.go @@ -149,7 +149,7 @@ func completeSLQ(cmd *cobra.Command, args []string, toComplete string) ([]string // completeDriverType is a completionFunc that suggests drivers. func completeDriverType(cmd *cobra.Command, _ []string, _ string) ([]string, cobra.ShellCompDirective) { ru := getRun(cmd) - if ru.Databases == nil { + if ru.Pools == nil { if err := preRun(cmd, ru); err != nil { lg.Unexpected(logFrom(cmd), err) return nil, cobra.ShellCompDirectiveError @@ -392,13 +392,13 @@ func (c activeSchemaCompleter) complete(cmd *cobra.Command, args []string, toCom ctx, cancelFn := context.WithTimeout(cmd.Context(), OptShellCompletionTimeout.Get(ru.Config.Options)) defer cancelFn() - dbase, err := ru.Databases.Open(ctx, src) + pool, err := ru.Pools.Open(ctx, src) if err != nil { lg.Unexpected(log, err) return nil, cobra.ShellCompDirectiveError } - db, err := dbase.DB(ctx) + db, err := pool.DB(ctx) if err != nil { lg.Unexpected(log, err) return nil, cobra.ShellCompDirectiveError @@ -768,14 +768,14 @@ func getTableNamesForHandle(ctx context.Context, ru *run.Run, handle string) ([] return nil, err } - dbase, err := ru.Databases.Open(ctx, src) + pool, err := ru.Pools.Open(ctx, src) if err != nil { return nil, err } // TODO: We shouldn't have to load the full metadata just to get // the table names. driver.SQLDriver should have a method ListTables. - md, err := dbase.SourceMetadata(ctx, false) + md, err := pool.SourceMetadata(ctx, false) if err != nil { return nil, err } diff --git a/cli/diff/source.go b/cli/diff/source.go index 0cabb7d0..8e965e90 100644 --- a/cli/diff/source.go +++ b/cli/diff/source.go @@ -195,11 +195,11 @@ func fetchSourceMeta(ctx context.Context, ru *run.Run, handle string) (*source.S if err != nil { return nil, nil, err } - dbase, err := ru.Databases.Open(ctx, src) + pool, err := ru.Pools.Open(ctx, src) if err != nil { return nil, nil, err } - md, err := dbase.SourceMetadata(ctx, false) + md, err := pool.SourceMetadata(ctx, false) if err != nil { return nil, nil, err } diff --git a/cli/diff/table.go b/cli/diff/table.go index 0bb37a18..903dac42 100644 --- a/cli/diff/table.go +++ b/cli/diff/table.go @@ -113,11 +113,11 @@ func buildTableStructureDiff(cfg *Config, showRowCounts bool, td1, td2 *tableDat func fetchTableMeta(ctx context.Context, ru *run.Run, src *source.Source, table string) ( *source.TableMetadata, error, ) { - dbase, err := ru.Databases.Open(ctx, src) + pool, err := ru.Pools.Open(ctx, src) if err != nil { return nil, err } - md, err := dbase.TableMetadata(ctx, table) + md, err := pool.TableMetadata(ctx, table) if err != nil { if errz.IsErrNotExist(err) { return nil, nil //nolint:nilnil diff --git a/cli/output/adapter_test.go b/cli/output/adapter_test.go index 0ce595f1..61b6b0f0 100644 --- a/cli/output/adapter_test.go +++ b/cli/output/adapter_test.go @@ -51,11 +51,11 @@ func TestRecordWriterAdapter(t *testing.T) { th := testh.New(t) src := th.Source(tc.handle) - dbase := th.Open(src) + pool := th.Open(src) sink := &testh.RecordSink{} recw := output.NewRecordWriterAdapter(th.Context, sink) - err := libsq.QuerySQL(th.Context, dbase, nil, recw, tc.sqlQuery) + err := libsq.QuerySQL(th.Context, pool, nil, recw, tc.sqlQuery) require.NoError(t, err) written, err := recw.Wait() require.NoError(t, err) diff --git a/cli/run.go b/cli/run.go index 7104be12..d9a3241d 100644 --- a/cli/run.go +++ b/cli/run.go @@ -157,19 +157,19 @@ func FinishRunInit(ctx context.Context, ru *run.Run) error { ru.DriverRegistry = driver.NewRegistry(log) dr := ru.DriverRegistry - ru.Databases = driver.NewDatabases(log, dr, scratchSrcFunc) - ru.Cleanup.AddC(ru.Databases) + ru.Pools = driver.NewPools(log, dr, scratchSrcFunc) + ru.Cleanup.AddC(ru.Pools) dr.AddProvider(sqlite3.Type, &sqlite3.Provider{Log: log}) dr.AddProvider(postgres.Type, &postgres.Provider{Log: log}) dr.AddProvider(sqlserver.Type, &sqlserver.Provider{Log: log}) dr.AddProvider(mysql.Type, &mysql.Provider{Log: log}) - csvp := &csv.Provider{Log: log, Scratcher: ru.Databases, Files: ru.Files} + csvp := &csv.Provider{Log: log, Scratcher: ru.Pools, Files: ru.Files} dr.AddProvider(csv.TypeCSV, csvp) dr.AddProvider(csv.TypeTSV, csvp) ru.Files.AddDriverDetectors(csv.DetectCSV, csv.DetectTSV) - jsonp := &json.Provider{Log: log, Scratcher: ru.Databases, Files: ru.Files} + jsonp := &json.Provider{Log: log, Scratcher: ru.Pools, Files: ru.Files} dr.AddProvider(json.TypeJSON, jsonp) dr.AddProvider(json.TypeJSONA, jsonp) dr.AddProvider(json.TypeJSONL, jsonp) @@ -180,7 +180,7 @@ func FinishRunInit(ctx context.Context, ru *run.Run) error { json.DetectJSONL(sampleSize), ) - dr.AddProvider(xlsx.Type, &xlsx.Provider{Log: log, Scratcher: ru.Databases, Files: ru.Files}) + dr.AddProvider(xlsx.Type, &xlsx.Provider{Log: log, Scratcher: ru.Pools, Files: ru.Files}) ru.Files.AddDriverDetectors(xlsx.DetectXLSX) // One day we may have more supported user driver genres. userDriverImporters := map[string]userdriver.ImportFunc{ @@ -210,7 +210,7 @@ func FinishRunInit(ctx context.Context, ru *run.Run) error { Log: log, DriverDef: userDriverDef, ImportFn: importFn, - Scratcher: ru.Databases, + Scratcher: ru.Pools, Files: ru.Files, } diff --git a/cli/run/run.go b/cli/run/run.go index 9c6eb4bc..3823fd6f 100644 --- a/cli/run/run.go +++ b/cli/run/run.go @@ -75,8 +75,8 @@ type Run struct { // Files manages file access. Files *source.Files - // Databases mediates access to databases. - Databases *driver.Databases + // Pools mediates access to db pools. + Pools *driver.Pools // Writers holds the various writer types that // the CLI uses to print output. @@ -101,10 +101,10 @@ func (ru *Run) Close() error { // NewQueryContext returns a *libsq.QueryContext constructed from ru. func NewQueryContext(ru *Run, args map[string]string) *libsq.QueryContext { return &libsq.QueryContext{ - Collection: ru.Config.Collection, - DBOpener: ru.Databases, - JoinDBOpener: ru.Databases, - ScratchDBOpener: ru.Databases, - Args: args, + Collection: ru.Config.Collection, + PoolOpener: ru.Pools, + JoinPoolOpener: ru.Pools, + ScratchPoolOpener: ru.Pools, + Args: args, } } diff --git a/drivers/csv/csv.go b/drivers/csv/csv.go index d0d23439..61b2dcfd 100644 --- a/drivers/csv/csv.go +++ b/drivers/csv/csv.go @@ -28,7 +28,7 @@ const ( // Provider implements driver.Provider. type Provider struct { Log *slog.Logger - Scratcher driver.ScratchDatabaseOpener + Scratcher driver.ScratchPoolOpener Files *source.Files } @@ -48,7 +48,7 @@ func (d *Provider) DriverFor(typ source.DriverType) (driver.Driver, error) { type driveri struct { log *slog.Logger typ source.DriverType - scratcher driver.ScratchDatabaseOpener + scratcher driver.ScratchPoolOpener files *source.Files } @@ -65,27 +65,27 @@ func (d *driveri) DriverMetadata() driver.Metadata { return md } -// Open implements driver.DatabaseOpener. -func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) { +// Open implements driver.PoolOpener. +func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) { lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) - dbase := &database{ + pool := &pool{ log: d.log, src: src, files: d.files, } var err error - dbase.impl, err = d.scratcher.OpenScratch(ctx, src.Handle) + pool.impl, err = d.scratcher.OpenScratch(ctx, src.Handle) if err != nil { return nil, err } - if err = ingestCSV(ctx, src, d.files.OpenFunc(src), dbase.impl); err != nil { + if err = ingestCSV(ctx, src, d.files.OpenFunc(src), pool.impl); err != nil { return nil, err } - return dbase, nil + return pool, nil } // Truncate implements driver.Driver. @@ -113,37 +113,37 @@ func (d *driveri) Ping(_ context.Context, src *source.Source) error { return nil } -// database implements driver.Database. -type database struct { +// pool implements driver.Pool. +type pool struct { log *slog.Logger src *source.Source - impl driver.Database + impl driver.Pool files *source.Files } -// DB implements driver.Database. -func (d *database) DB(ctx context.Context) (*sql.DB, error) { - return d.impl.DB(ctx) +// DB implements driver.Pool. +func (p *pool) DB(ctx context.Context) (*sql.DB, error) { + return p.impl.DB(ctx) } -// SQLDriver implements driver.Database. -func (d *database) SQLDriver() driver.SQLDriver { - return d.impl.SQLDriver() +// SQLDriver implements driver.Pool. +func (p *pool) SQLDriver() driver.SQLDriver { + return p.impl.SQLDriver() } -// Source implements driver.Database. -func (d *database) Source() *source.Source { - return d.src +// Source implements driver.Pool. +func (p *pool) Source() *source.Source { + return p.src } -// TableMetadata implements driver.Database. -func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { +// TableMetadata implements driver.Pool. +func (p *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { if tblName != source.MonotableName { return nil, errz.Errorf("table name should be %s for CSV/TSV etc., but got: %s", source.MonotableName, tblName) } - srcMeta, err := d.SourceMetadata(ctx, false) + srcMeta, err := p.SourceMetadata(ctx, false) if err != nil { return nil, err } @@ -152,23 +152,23 @@ func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.T return srcMeta.Tables[0], nil } -// SourceMetadata implements driver.Database. -func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { - md, err := d.impl.SourceMetadata(ctx, noSchema) +// SourceMetadata implements driver.Pool. +func (p *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { + md, err := p.impl.SourceMetadata(ctx, noSchema) if err != nil { return nil, err } - md.Handle = d.src.Handle - md.Location = d.src.Location - md.Driver = d.src.Type + md.Handle = p.src.Handle + md.Location = p.src.Location + md.Driver = p.src.Type - md.Name, err = source.LocationFileName(d.src) + md.Name, err = source.LocationFileName(p.src) if err != nil { return nil, err } - md.Size, err = d.files.Size(d.src) + md.Size, err = p.files.Size(p.src) if err != nil { return nil, err } @@ -177,9 +177,9 @@ func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.M return md, nil } -// Close implements driver.Database. -func (d *database) Close() error { - d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) +// Close implements driver.Pool. +func (p *pool) Close() error { + p.log.Debug(lgm.CloseDB, lga.Handle, p.src.Handle) - return errz.Err(d.impl.Close()) + return errz.Err(p.impl.Close()) } diff --git a/drivers/csv/ingest.go b/drivers/csv/ingest.go index 193e0653..2d8b094f 100644 --- a/drivers/csv/ingest.go +++ b/drivers/csv/ingest.go @@ -54,7 +54,7 @@ Possible values are: comma, space, pipe, tab, colon, semi, period.`, ) // ingestCSV loads the src CSV data into scratchDB. -func ingestCSV(ctx context.Context, src *source.Source, openFn source.FileOpenFunc, scratchDB driver.Database) error { +func ingestCSV(ctx context.Context, src *source.Source, openFn source.FileOpenFunc, scratchPool driver.Pool) error { log := lg.FromContext(ctx) var err error @@ -109,17 +109,17 @@ func ingestCSV(ctx context.Context, src *source.Source, openFn source.FileOpenFu // And now we need to create the dest table in scratchDB tblDef := createTblDef(source.MonotableName, header, kinds) - db, err := scratchDB.DB(ctx) + db, err := scratchPool.DB(ctx) if err != nil { return err } - err = scratchDB.SQLDriver().CreateTable(ctx, db, tblDef) + err = scratchPool.SQLDriver().CreateTable(ctx, db, tblDef) if err != nil { return errz.Wrap(err, "csv: failed to create dest scratch table") } - recMeta, err := getIngestRecMeta(ctx, scratchDB, tblDef) + recMeta, err := getIngestRecMeta(ctx, scratchPool, tblDef) if err != nil { return err } @@ -129,9 +129,9 @@ func ingestCSV(ctx context.Context, src *source.Source, openFn source.FileOpenFu } insertWriter := libsq.NewDBWriter( - scratchDB, + scratchPool, tblDef.Name, - driver.OptTuningRecChanSize.Get(scratchDB.Source().Options), + driver.OptTuningRecChanSize.Get(scratchPool.Source().Options), ) err = execInsert(ctx, insertWriter, recMeta, mungers, recs, cr) if err != nil { @@ -145,7 +145,7 @@ func ingestCSV(ctx context.Context, src *source.Source, openFn source.FileOpenFu log.Debug("Inserted rows", lga.Count, inserted, - lga.Target, source.Target(scratchDB.Source(), tblDef.Name), + lga.Target, source.Target(scratchPool.Source(), tblDef.Name), ) return nil } diff --git a/drivers/csv/insert.go b/drivers/csv/insert.go index b7c8e1e5..0791e1e1 100644 --- a/drivers/csv/insert.go +++ b/drivers/csv/insert.go @@ -122,13 +122,13 @@ func createTblDef(tblName string, colNames []string, kinds []kind.Kind) *sqlmode } // getIngestRecMeta returns record.Meta to use with RecordWriter.Open. -func getIngestRecMeta(ctx context.Context, scratchDB driver.Database, tblDef *sqlmodel.TableDef) (record.Meta, error) { - db, err := scratchDB.DB(ctx) +func getIngestRecMeta(ctx context.Context, scratchPool driver.Pool, tblDef *sqlmodel.TableDef) (record.Meta, error) { + db, err := scratchPool.DB(ctx) if err != nil { return nil, err } - drvr := scratchDB.SQLDriver() + drvr := scratchPool.SQLDriver() colTypes, err := drvr.TableColumnTypes(ctx, db, tblDef.Name, tblDef.ColNames()) if err != nil { diff --git a/drivers/json/import.go b/drivers/json/import.go index 05a6af5e..e5723e3b 100644 --- a/drivers/json/import.go +++ b/drivers/json/import.go @@ -30,11 +30,11 @@ import ( // importJob describes a single import job, where the JSON // at fromSrc is read via openFn and the resulting records -// are written to destDB. +// are written to destPool. type importJob struct { - fromSrc *source.Source - openFn source.FileOpenFunc - destDB driver.Database + fromSrc *source.Source + openFn source.FileOpenFunc + destPool driver.Pool // sampleSize is the maximum number of values to // sample to determine the kind of an element. @@ -57,18 +57,18 @@ var ( ) // getRecMeta returns record.Meta to use with RecordWriter.Open. -func getRecMeta(ctx context.Context, scratchDB driver.Database, tblDef *sqlmodel.TableDef) (record.Meta, error) { - db, err := scratchDB.DB(ctx) +func getRecMeta(ctx context.Context, scratchPool driver.Pool, tblDef *sqlmodel.TableDef) (record.Meta, error) { + db, err := scratchPool.DB(ctx) if err != nil { return nil, err } - colTypes, err := scratchDB.SQLDriver().TableColumnTypes(ctx, db, tblDef.Name, tblDef.ColNames()) + colTypes, err := scratchPool.SQLDriver().TableColumnTypes(ctx, db, tblDef.Name, tblDef.ColNames()) if err != nil { return nil, err } - destMeta, _, err := scratchDB.SQLDriver().RecordMeta(ctx, colTypes) + destMeta, _, err := scratchPool.SQLDriver().RecordMeta(ctx, colTypes) if err != nil { return nil, err } diff --git a/drivers/json/import_json.go b/drivers/json/import_json.go index 9a59111a..f75c0bf6 100644 --- a/drivers/json/import_json.go +++ b/drivers/json/import_json.go @@ -143,9 +143,9 @@ func importJSON(ctx context.Context, job importJob) error { } defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r) - drvr := job.destDB.SQLDriver() + drvr := job.destPool.SQLDriver() - db, err := job.destDB.DB(ctx) + db, err := job.destPool.DB(ctx) if err != nil { return err } diff --git a/drivers/json/import_jsona.go b/drivers/json/import_jsona.go index 2dc8ffbc..eb343ef9 100644 --- a/drivers/json/import_jsona.go +++ b/drivers/json/import_jsona.go @@ -122,19 +122,19 @@ func importJSONA(ctx context.Context, job importJob) error { colNames[i] = stringz.GenerateAlphaColName(i, true) } - // And now we need to create the dest table in destDB + // And now we need to create the dest table in destPool tblDef := sqlmodel.NewTableDef(source.MonotableName, colNames, colKinds) - db, err := job.destDB.DB(ctx) + db, err := job.destPool.DB(ctx) if err != nil { return err } - err = job.destDB.SQLDriver().CreateTable(ctx, db, tblDef) + err = job.destPool.SQLDriver().CreateTable(ctx, db, tblDef) if err != nil { return errz.Wrapf(err, "import %s: failed to create dest scratch table", TypeJSONA) } - recMeta, err := getRecMeta(ctx, job.destDB, tblDef) + recMeta, err := getRecMeta(ctx, job.destPool, tblDef) if err != nil { return err } @@ -146,9 +146,9 @@ func importJSONA(ctx context.Context, job importJob) error { defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r) insertWriter := libsq.NewDBWriter( - job.destDB, + job.destPool, tblDef.Name, - driver.OptTuningRecChanSize.Get(job.destDB.Source().Options), + driver.OptTuningRecChanSize.Get(job.destPool.Source().Options), ) var cancelFn context.CancelFunc @@ -174,7 +174,7 @@ func importJSONA(ctx context.Context, job importJob) error { log.Debug("Inserted rows", lga.Count, inserted, - lga.Target, source.Target(job.destDB.Source(), tblDef.Name), + lga.Target, source.Target(job.destPool.Source(), tblDef.Name), ) return nil } diff --git a/drivers/json/import_jsonl.go b/drivers/json/import_jsonl.go index 97b5dad5..d01c6825 100644 --- a/drivers/json/import_jsonl.go +++ b/drivers/json/import_jsonl.go @@ -94,8 +94,8 @@ func importJSONL(ctx context.Context, job importJob) error { //nolint:gocognit } defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r) - drvr := job.destDB.SQLDriver() - db, err := job.destDB.DB(ctx) + drvr := job.destPool.SQLDriver() + db, err := job.destPool.DB(ctx) if err != nil { return err } diff --git a/drivers/json/import_test.go b/drivers/json/import_test.go index 5bdb6f01..9a3f5ee6 100644 --- a/drivers/json/import_test.go +++ b/drivers/json/import_test.go @@ -85,8 +85,8 @@ func TestImportJSONL_Flat(t *testing.T) { } } - th, src, _, dbase, _ := testh.NewWith(t, testsrc.EmptyDB) - job := json.NewImportJob(src, openFn, dbase, 0, true) + th, src, _, pool, _ := testh.NewWith(t, testsrc.EmptyDB) + job := json.NewImportJob(src, openFn, pool, 0, true) err := json.ImportJSONL(th.Context, job) if tc.wantErr { @@ -110,8 +110,8 @@ func TestImportJSON_Flat(t *testing.T) { return os.Open("testdata/actor.json") } - th, src, _, dbase, _ := testh.NewWith(t, testsrc.EmptyDB) - job := json.NewImportJob(src, openFn, dbase, 0, true) + th, src, _, pool, _ := testh.NewWith(t, testsrc.EmptyDB) + job := json.NewImportJob(src, openFn, pool, 0, true) err := json.ImportJSON(th.Context, job) require.NoError(t, err) diff --git a/drivers/json/internal_test.go b/drivers/json/internal_test.go index eea4edaf..71b028e2 100644 --- a/drivers/json/internal_test.go +++ b/drivers/json/internal_test.go @@ -26,7 +26,7 @@ var ( // newImportJob is a constructor for the unexported importJob type. // If sampleSize <= 0, a default value is used. -func newImportJob(fromSrc *source.Source, openFn source.FileOpenFunc, destDB driver.Database, sampleSize int, +func newImportJob(fromSrc *source.Source, openFn source.FileOpenFunc, destPool driver.Pool, sampleSize int, flatten bool, ) importJob { if sampleSize <= 0 { @@ -36,7 +36,7 @@ func newImportJob(fromSrc *source.Source, openFn source.FileOpenFunc, destDB dri return importJob{ fromSrc: fromSrc, openFn: openFn, - destDB: destDB, + destPool: destPool, sampleSize: sampleSize, flatten: flatten, } diff --git a/drivers/json/json.go b/drivers/json/json.go index 22138319..32fee686 100644 --- a/drivers/json/json.go +++ b/drivers/json/json.go @@ -36,7 +36,7 @@ const ( // Provider implements driver.Provider. type Provider struct { Log *slog.Logger - Scratcher driver.ScratchDatabaseOpener + Scratcher driver.ScratchPoolOpener Files *source.Files } @@ -69,7 +69,7 @@ type driveri struct { log *slog.Logger typ source.DriverType importFn importFunc - scratcher driver.ScratchDatabaseOpener + scratcher driver.ScratchPoolOpener files *source.Files } @@ -92,28 +92,28 @@ func (d *driveri) DriverMetadata() driver.Metadata { return md } -// Open implements driver.DatabaseOpener. -func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) { +// Open implements driver.PoolOpener. +func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) { lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) - dbase := &database{log: d.log, src: src, clnup: cleanup.New(), files: d.files} + p := &pool{log: d.log, src: src, clnup: cleanup.New(), files: d.files} r, err := d.files.Open(src) if err != nil { return nil, err } - dbase.impl, err = d.scratcher.OpenScratch(ctx, src.Handle) + p.impl, err = d.scratcher.OpenScratch(ctx, src.Handle) if err != nil { lg.WarnIfCloseError(d.log, lgm.CloseFileReader, r) - lg.WarnIfFuncError(d.log, lgm.CloseDB, dbase.clnup.Run) + lg.WarnIfFuncError(d.log, lgm.CloseDB, p.clnup.Run) return nil, err } job := importJob{ fromSrc: src, openFn: d.files.OpenFunc(src), - destDB: dbase.impl, + destPool: p.impl, sampleSize: driver.OptIngestSampleSize.Get(src.Options), flatten: true, } @@ -121,7 +121,7 @@ func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database err = d.importFn(ctx, job) if err != nil { lg.WarnIfCloseError(d.log, lgm.CloseFileReader, r) - lg.WarnIfFuncError(d.log, lgm.CloseDB, dbase.clnup.Run) + lg.WarnIfFuncError(d.log, lgm.CloseDB, p.clnup.Run) return nil, err } @@ -130,7 +130,7 @@ func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database return nil, err } - return dbase, nil + return p, nil } // Truncate implements driver.Driver. @@ -160,38 +160,38 @@ func (d *driveri) Ping(_ context.Context, src *source.Source) error { return nil } -// database implements driver.Database. -type database struct { +// pool implements driver.Pool. +type pool struct { log *slog.Logger src *source.Source - impl driver.Database + impl driver.Pool clnup *cleanup.Cleanup files *source.Files } -// DB implements driver.Database. -func (d *database) DB(ctx context.Context) (*sql.DB, error) { - return d.impl.DB(ctx) +// DB implements driver.Pool. +func (p *pool) DB(ctx context.Context) (*sql.DB, error) { + return p.impl.DB(ctx) } -// SQLDriver implements driver.Database. -func (d *database) SQLDriver() driver.SQLDriver { - return d.impl.SQLDriver() +// SQLDriver implements driver.Pool. +func (p *pool) SQLDriver() driver.SQLDriver { + return p.impl.SQLDriver() } -// Source implements driver.Database. -func (d *database) Source() *source.Source { - return d.src +// Source implements driver.Pool. +func (p *pool) Source() *source.Source { + return p.src } -// TableMetadata implements driver.Database. -func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { +// TableMetadata implements driver.Pool. +func (p *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { if tblName != source.MonotableName { return nil, errz.Errorf("table name should be %s for CSV/TSV etc., but got: %s", source.MonotableName, tblName) } - srcMeta, err := d.SourceMetadata(ctx, false) + srcMeta, err := p.SourceMetadata(ctx, false) if err != nil { return nil, err } @@ -200,23 +200,23 @@ func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.T return srcMeta.Tables[0], nil } -// SourceMetadata implements driver.Database. -func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { - md, err := d.impl.SourceMetadata(ctx, noSchema) +// SourceMetadata implements driver.Pool. +func (p *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { + md, err := p.impl.SourceMetadata(ctx, noSchema) if err != nil { return nil, err } - md.Handle = d.src.Handle - md.Location = d.src.Location - md.Driver = d.src.Type + md.Handle = p.src.Handle + md.Location = p.src.Location + md.Driver = p.src.Type - md.Name, err = source.LocationFileName(d.src) + md.Name, err = source.LocationFileName(p.src) if err != nil { return nil, err } - md.Size, err = d.files.Size(d.src) + md.Size, err = p.files.Size(p.src) if err != nil { return nil, err } @@ -225,9 +225,9 @@ func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.M return md, nil } -// Close implements driver.Database. -func (d *database) Close() error { - d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) +// Close implements driver.Pool. +func (p *pool) Close() error { + p.log.Debug(lgm.CloseDB, lga.Handle, p.src.Handle) - return errz.Combine(d.impl.Close(), d.clnup.Run()) + return errz.Combine(p.impl.Close(), p.clnup.Run()) } diff --git a/drivers/mysql/metadata.go b/drivers/mysql/metadata.go index 2b32f4f7..e922dc49 100644 --- a/drivers/mysql/metadata.go +++ b/drivers/mysql/metadata.go @@ -165,7 +165,7 @@ func getNewRecordFunc(rowMeta record.Meta) driver.NewRecordFunc { } // getTableMetadata gets the metadata for a single table. It is the -// implementation of driver.Database.TableMetadata. +// implementation of driver.Pool.TableMetadata. func getTableMetadata(ctx context.Context, db sqlz.DB, tblName string) (*source.TableMetadata, error) { query := `SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, TABLE_COMMENT, (DATA_LENGTH + INDEX_LENGTH) AS table_size, (SELECT COUNT(*) FROM ` + "`" + tblName + "`" + `) AS row_count @@ -243,7 +243,7 @@ ORDER BY cols.ordinal_position ASC` return cols, errw(rows.Err()) } -// getSourceMetadata is the implementation of driver.Database.SourceMetadata. +// getSourceMetadata is the implementation of driver.Pool.SourceMetadata. // // Multiple queries are required to build the SourceMetadata, and this // impl makes use of errgroup to make concurrent queries. In the initial diff --git a/drivers/mysql/metadata_test.go b/drivers/mysql/metadata_test.go index b690c177..fb55d0b6 100644 --- a/drivers/mysql/metadata_test.go +++ b/drivers/mysql/metadata_test.go @@ -79,8 +79,8 @@ func TestDatabase_SourceMetadata_MySQL(t *testing.T) { t.Run(handle, func(t *testing.T) { t.Parallel() - th, _, _, dbase, _ := testh.NewWith(t, handle) - md, err := dbase.SourceMetadata(th.Context, false) + th, _, _, pool, _ := testh.NewWith(t, handle) + md, err := pool.SourceMetadata(th.Context, false) require.NoError(t, err) require.Equal(t, "sakila", md.Name) require.Equal(t, handle, md.Handle) @@ -102,8 +102,8 @@ func TestDatabase_TableMetadata(t *testing.T) { t.Run(handle, func(t *testing.T) { t.Parallel() - th, _, _, dbase, _ := testh.NewWith(t, handle) - md, err := dbase.TableMetadata(th.Context, sakila.TblActor) + th, _, _, pool, _ := testh.NewWith(t, handle) + md, err := pool.TableMetadata(th.Context, sakila.TblActor) require.NoError(t, err) require.Equal(t, sakila.TblActor, md.Name) }) diff --git a/drivers/mysql/mysql.go b/drivers/mysql/mysql.go index 6fc16ac0..df4fef44 100644 --- a/drivers/mysql/mysql.go +++ b/drivers/mysql/mysql.go @@ -410,8 +410,8 @@ func (d *driveri) getTableRecordMeta(ctx context.Context, db sqlz.DB, tblName st return destCols, nil } -// Open implements driver.DatabaseOpener. -func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) { +// Open implements driver.PoolOpener. +func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) { lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) db, err := d.doOpen(ctx, src) @@ -423,7 +423,7 @@ func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database return nil, err } - return &database{log: d.log, db: db, src: src, drvr: d}, nil + return &pool{log: d.log, db: db, src: src, drvr: d}, nil } func (d *driveri) doOpen(ctx context.Context, src *source.Source) (*sql.DB, error) { @@ -519,43 +519,43 @@ func (d *driveri) Truncate(ctx context.Context, src *source.Source, tbl string, return beforeCount, errw(tx.Commit()) } -// database implements driver.Database. -type database struct { +// pool implements driver.Pool. +type pool struct { log *slog.Logger db *sql.DB src *source.Source drvr *driveri } -// DB implements driver.Database. -func (d *database) DB(context.Context) (*sql.DB, error) { - return d.db, nil +// DB implements driver.Pool. +func (p *pool) DB(context.Context) (*sql.DB, error) { + return p.db, nil } -// SQLDriver implements driver.Database. -func (d *database) SQLDriver() driver.SQLDriver { - return d.drvr +// SQLDriver implements driver.Pool. +func (p *pool) SQLDriver() driver.SQLDriver { + return p.drvr } -// Source implements driver.Database. -func (d *database) Source() *source.Source { - return d.src +// Source implements driver.Pool. +func (p *pool) Source() *source.Source { + return p.src } -// TableMetadata implements driver.Database. -func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { - return getTableMetadata(ctx, d.db, tblName) +// TableMetadata implements driver.Pool. +func (p *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { + return getTableMetadata(ctx, p.db, tblName) } -// SourceMetadata implements driver.Database. -func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { - return getSourceMetadata(ctx, d.src, d.db, noSchema) +// SourceMetadata implements driver.Pool. +func (p *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { + return getSourceMetadata(ctx, p.src, p.db, noSchema) } -// Close implements driver.Database. -func (d *database) Close() error { - d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) - return errw(d.db.Close()) +// Close implements driver.Pool. +func (p *pool) Close() error { + p.log.Debug(lgm.CloseDB, lga.Handle, p.src.Handle) + return errw(p.db.Close()) } // dsnFromLocation builds the mysql driver DSN from src.Location. diff --git a/drivers/postgres/postgres.go b/drivers/postgres/postgres.go index 9bdedd22..1a3958d8 100644 --- a/drivers/postgres/postgres.go +++ b/drivers/postgres/postgres.go @@ -144,8 +144,8 @@ func (d *driveri) Renderer() *render.Renderer { return r } -// Open implements driver.DatabaseOpener. -func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) { +// Open implements driver.PoolOpener. +func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) { lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) db, err := d.doOpen(ctx, src) @@ -157,7 +157,7 @@ func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database return nil, err } - return &database{log: d.log, db: db, src: src, drvr: d}, nil + return &pool{log: d.log, db: db, src: src, drvr: d}, nil } func (d *driveri) doOpen(ctx context.Context, src *source.Source) (*sql.DB, error) { @@ -709,32 +709,32 @@ func (d *driveri) RecordMeta(ctx context.Context, colTypes []*sql.ColumnType) ( return recMeta, mungeFn, nil } -// database is the postgres implementation of driver.Database. -type database struct { +// pool is the postgres implementation of driver.Pool. +type pool struct { log *slog.Logger drvr *driveri db *sql.DB src *source.Source } -// DB implements driver.Database. -func (d *database) DB(context.Context) (*sql.DB, error) { - return d.db, nil +// DB implements driver.Pool. +func (p *pool) DB(context.Context) (*sql.DB, error) { + return p.db, nil } -// SQLDriver implements driver.Database. -func (d *database) SQLDriver() driver.SQLDriver { - return d.drvr +// SQLDriver implements driver.Pool. +func (p *pool) SQLDriver() driver.SQLDriver { + return p.drvr } -// Source implements driver.Database. -func (d *database) Source() *source.Source { - return d.src +// Source implements driver.Pool. +func (p *pool) Source() *source.Source { + return p.src } -// TableMetadata implements driver.Database. -func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { - db, err := d.DB(ctx) +// TableMetadata implements driver.Pool. +func (p *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { + db, err := p.DB(ctx) if err != nil { return nil, err } @@ -742,20 +742,20 @@ func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.T return getTableMetadata(ctx, db, tblName) } -// SourceMetadata implements driver.Database. -func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { - db, err := d.DB(ctx) +// SourceMetadata implements driver.Pool. +func (p *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { + db, err := p.DB(ctx) if err != nil { return nil, err } - return getSourceMetadata(ctx, d.src, db, noSchema) + return getSourceMetadata(ctx, p.src, db, noSchema) } -// Close implements driver.Database. -func (d *database) Close() error { - d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) +// Close implements driver.Pool. +func (p *pool) Close() error { + p.log.Debug(lgm.CloseDB, lga.Handle, p.src.Handle) - err := d.db.Close() + err := p.db.Close() if err != nil { return errw(err) } diff --git a/drivers/postgres/postgres_test.go b/drivers/postgres/postgres_test.go index faec44b6..d57291b2 100644 --- a/drivers/postgres/postgres_test.go +++ b/drivers/postgres/postgres_test.go @@ -222,12 +222,12 @@ func TestAlternateSchema(t *testing.T) { src2 := src.Clone() src2.Handle += "2" src2.Location += "?search_path=" + schemaName - dbase2 := th.Open(src2) - md2, err := dbase2.SourceMetadata(ctx, false) + pool2 := th.Open(src2) + md2, err := pool2.SourceMetadata(ctx, false) require.NoError(t, err) require.Equal(t, schemaName, md2.Schema) - tblMeta2, err := dbase2.TableMetadata(ctx, tblName) + tblMeta2, err := pool2.TableMetadata(ctx, tblName) require.NoError(t, err) require.Equal(t, int64(wantRowCount), tblMeta2.RowCount) } @@ -279,10 +279,10 @@ func BenchmarkDatabase_SourceMetadata(b *testing.B) { b.Run(handle, func(b *testing.B) { th := testh.New(b) th.Log = lg.Discard() - dbase := th.Open(th.Source(handle)) + pool := th.Open(th.Source(handle)) b.ResetTimer() - md, err := dbase.SourceMetadata(th.Context, false) + md, err := pool.SourceMetadata(th.Context, false) require.NoError(b, err) require.Equal(b, "sakila", md.Name) }) diff --git a/drivers/sqlite3/metadata_test.go b/drivers/sqlite3/metadata_test.go index bce3d379..6b99f8eb 100644 --- a/drivers/sqlite3/metadata_test.go +++ b/drivers/sqlite3/metadata_test.go @@ -204,7 +204,7 @@ func TestRecordMetadata(t *testing.T) { t.Run(tc.tbl, func(t *testing.T) { t.Parallel() - th, _, drvr, dbase, db := testh.NewWith(t, sakila.SL3) + th, _, drvr, pool, db := testh.NewWith(t, sakila.SL3) query := fmt.Sprintf("SELECT %s FROM %s", strings.Join(tc.colNames, ", "), tc.tbl) rows, err := db.QueryContext(th.Context, query) //nolint:rowserrcheck @@ -235,7 +235,7 @@ func TestRecordMetadata(t *testing.T) { } // Now check our table metadata - gotTblMeta, err := dbase.TableMetadata(th.Context, tc.tbl) + gotTblMeta, err := pool.TableMetadata(th.Context, tc.tbl) require.NoError(t, err) require.Equal(t, tc.tbl, gotTblMeta.Name) require.Equal(t, tc.rowCount, gotTblMeta.RowCount) @@ -285,12 +285,12 @@ func TestAggregateFuncsQuery(t *testing.T) { func BenchmarkDatabase_SourceMetadata(b *testing.B) { const numTables = 1000 - th, src, drvr, dbase, db := testh.NewWith(b, testsrc.MiscDB) + th, src, drvr, pool, db := testh.NewWith(b, testsrc.MiscDB) tblNames := createTypeTestTbls(th, src, numTables, true) b.ResetTimer() for n := 0; n < b.N; n++ { - srcMeta, err := dbase.SourceMetadata(th.Context, false) + srcMeta, err := pool.SourceMetadata(th.Context, false) require.NoError(b, err) require.True(b, len(srcMeta.Tables) > len(tblNames)) } diff --git a/drivers/sqlite3/sqlite3.go b/drivers/sqlite3/sqlite3.go index a36e7eff..bab89bb1 100644 --- a/drivers/sqlite3/sqlite3.go +++ b/drivers/sqlite3/sqlite3.go @@ -130,8 +130,8 @@ func (d *driveri) DriverMetadata() driver.Metadata { } } -// Open implements driver.DatabaseOpener. -func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) { +// Open implements driver.PoolOpener. +func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) { lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) db, err := d.doOpen(ctx, src) @@ -143,7 +143,7 @@ func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database return nil, err } - return &database{log: d.log, db: db, src: src, drvr: d}, nil + return &pool{log: d.log, db: db, src: src, drvr: d}, nil } func (d *driveri) doOpen(ctx context.Context, src *source.Source) (*sql.DB, error) { @@ -896,8 +896,8 @@ func (d *driveri) getTableRecordMeta(ctx context.Context, db sqlz.DB, tblName st return destCols, nil } -// database implements driver.Database. -type database struct { +// pool implements driver.Pool. +type pool struct { log *slog.Logger db *sql.DB src *source.Source @@ -908,24 +908,24 @@ type database struct { closed bool } -// DB implements driver.Database. -func (d *database) DB(context.Context) (*sql.DB, error) { - return d.db, nil +// DB implements driver.Pool. +func (p *pool) DB(context.Context) (*sql.DB, error) { + return p.db, nil } -// SQLDriver implements driver.Database. -func (d *database) SQLDriver() driver.SQLDriver { - return d.drvr +// SQLDriver implements driver.Pool. +func (p *pool) SQLDriver() driver.SQLDriver { + return p.drvr } -// Source implements driver.Database. -func (d *database) Source() *source.Source { - return d.src +// Source implements driver.Pool. +func (p *pool) Source() *source.Source { + return p.src } -// TableMetadata implements driver.Database. -func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { - db, err := d.DB(ctx) +// TableMetadata implements driver.Pool. +func (p *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { + db, err := p.DB(ctx) if err != nil { return nil, err } @@ -933,20 +933,20 @@ func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.T return getTableMetadata(ctx, db, tblName) } -// SourceMetadata implements driver.Database. -func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { +// SourceMetadata implements driver.Pool. +func (p *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { // https://stackoverflow.com/questions/9646353/how-to-find-sqlite-database-file-version - md := &source.Metadata{Handle: d.src.Handle, Driver: Type, DBDriver: dbDrvr} + md := &source.Metadata{Handle: p.src.Handle, Driver: Type, DBDriver: dbDrvr} - dsn, err := PathFromLocation(d.src) + dsn, err := PathFromLocation(p.src) if err != nil { return nil, err } const q = "SELECT sqlite_version(), (SELECT name FROM pragma_database_list ORDER BY seq limit 1);" - err = d.db.QueryRowContext(ctx, q).Scan(&md.DBVersion, &md.Schema) + err = p.db.QueryRowContext(ctx, q).Scan(&md.DBVersion, &md.Schema) if err != nil { return nil, errw(err) } @@ -961,9 +961,9 @@ func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.M md.Size = fi.Size() md.Name = fi.Name() md.FQName = fi.Name() + "/" + md.Schema - md.Location = d.src.Location + md.Location = p.src.Location - md.DBProperties, err = getDBProperties(ctx, d.db) + md.DBProperties, err = getDBProperties(ctx, p.db) if err != nil { return nil, err } @@ -972,7 +972,7 @@ func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.M return md, nil } - md.Tables, err = getAllTableMetadata(ctx, d.db, md.Schema) + md.Tables, err = getAllTableMetadata(ctx, p.db, md.Schema) if err != nil { return nil, err } @@ -988,19 +988,19 @@ func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.M return md, nil } -// Close implements driver.Database. -func (d *database) Close() error { - d.closeMu.Lock() - defer d.closeMu.Unlock() +// Close implements driver.Pool. +func (p *pool) Close() error { + p.closeMu.Lock() + defer p.closeMu.Unlock() - if d.closed { - d.log.Warn("SQLite DB already closed", lga.Src, d.src) + if p.closed { + p.log.Warn("SQLite DB already closed", lga.Src, p.src) return nil } - d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) - err := errw(d.db.Close()) - d.closed = true + p.log.Debug(lgm.CloseDB, lga.Handle, p.src.Handle) + err := errw(p.db.Close()) + p.closed = true return err } diff --git a/drivers/sqlserver/sqlserver.go b/drivers/sqlserver/sqlserver.go index b428864e..18090482 100644 --- a/drivers/sqlserver/sqlserver.go +++ b/drivers/sqlserver/sqlserver.go @@ -162,8 +162,8 @@ func (d *driveri) Renderer() *render.Renderer { return r } -// Open implements driver.DatabaseOpener. -func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database, error) { +// Open implements driver.PoolOpener. +func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) { lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) db, err := d.doOpen(ctx, src) @@ -175,7 +175,7 @@ func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Database return nil, err } - return &database{log: d.log, db: db, src: src, drvr: d}, nil + return &pool{log: d.log, db: db, src: src, drvr: d}, nil } func (d *driveri) doOpen(ctx context.Context, src *source.Source) (*sql.DB, error) { @@ -638,33 +638,33 @@ func (d *driveri) getTableColsMeta(ctx context.Context, db sqlz.DB, tblName stri return destCols, nil } -// database implements driver.Database. -type database struct { +// pool implements driver.Pool. +type pool struct { log *slog.Logger drvr *driveri db *sql.DB src *source.Source } -var _ driver.Database = (*database)(nil) +var _ driver.Pool = (*pool)(nil) -// DB implements driver.Database. -func (d *database) DB(context.Context) (*sql.DB, error) { +// DB implements driver.Pool. +func (d *pool) DB(context.Context) (*sql.DB, error) { return d.db, nil } -// SQLDriver implements driver.Database. -func (d *database) SQLDriver() driver.SQLDriver { +// SQLDriver implements driver.Pool. +func (d *pool) SQLDriver() driver.SQLDriver { return d.drvr } -// Source implements driver.Database. -func (d *database) Source() *source.Source { +// Source implements driver.Pool. +func (d *pool) Source() *source.Source { return d.src } -// TableMetadata implements driver.Database. -func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { +// TableMetadata implements driver.Pool. +func (d *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { const query = `SELECT TABLE_CATALOG, TABLE_SCHEMA, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = @p1` @@ -680,13 +680,13 @@ WHERE TABLE_NAME = @p1` return getTableMetadata(ctx, d.db, catalog, schema, tblName, tblType) } -// SourceMetadata implements driver.Database. -func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { +// SourceMetadata implements driver.Pool. +func (d *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { return getSourceMetadata(ctx, d.src, d.db, noSchema) } -// Close implements driver.Database. -func (d *database) Close() error { +// Close implements driver.Pool. +func (d *pool) Close() error { d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) return errw(d.db.Close()) diff --git a/drivers/userdriver/userdriver.go b/drivers/userdriver/userdriver.go index d77e6a37..a8b277d2 100644 --- a/drivers/userdriver/userdriver.go +++ b/drivers/userdriver/userdriver.go @@ -24,15 +24,15 @@ import ( ) // ImportFunc is a function that can import -// data (as defined in def) to destDB. +// data (as defined in def) to destPool. type ImportFunc func(ctx context.Context, def *DriverDef, - data io.Reader, destDB driver.Database) error + data io.Reader, destPool driver.Pool) error // Provider implements driver.Provider for a DriverDef. type Provider struct { Log *slog.Logger DriverDef *DriverDef - Scratcher driver.ScratchDatabaseOpener + Scratcher driver.ScratchPoolOpener Files *source.Files ImportFn ImportFunc } @@ -43,7 +43,7 @@ func (p *Provider) DriverFor(typ source.DriverType) (driver.Driver, error) { return nil, errz.Errorf("unsupported driver type {%s}", typ) } - return &drvr{ + return &driveri{ log: p.Log, typ: typ, def: p.DriverDef, @@ -62,17 +62,17 @@ func (p *Provider) Detectors() []source.DriverDetectFunc { } // Driver implements driver.Driver. -type drvr struct { +type driveri struct { log *slog.Logger typ source.DriverType def *DriverDef files *source.Files - scratcher driver.ScratchDatabaseOpener + scratcher driver.ScratchPoolOpener importFn ImportFunc } // DriverMetadata implements driver.Driver. -func (d *drvr) DriverMetadata() driver.Metadata { +func (d *driveri) DriverMetadata() driver.Metadata { return driver.Metadata{ Type: source.DriverType(d.def.Name), Description: d.def.Title, @@ -81,8 +81,8 @@ func (d *drvr) DriverMetadata() driver.Metadata { } } -// Open implements driver.DatabaseOpener. -func (d *drvr) Open(ctx context.Context, src *source.Source) (driver.Database, error) { +// Open implements driver.PoolOpener. +func (d *driveri) Open(ctx context.Context, src *source.Source) (driver.Pool, error) { lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) clnup := cleanup.New() @@ -106,16 +106,16 @@ func (d *drvr) Open(ctx context.Context, src *source.Source) (driver.Database, e return nil, errz.Wrap(err, d.def.Name) } - return &database{log: d.log, src: src, impl: scratchDB, clnup: clnup}, nil + return &pool{log: d.log, src: src, impl: scratchDB, clnup: clnup}, nil } // Truncate implements driver.Driver. -func (d *drvr) Truncate(_ context.Context, _ *source.Source, _ string, _ bool) (int64, error) { +func (d *driveri) Truncate(_ context.Context, _ *source.Source, _ string, _ bool) (int64, error) { return 0, errz.Errorf("truncate not supported for %s", d.DriverMetadata().Type) } // ValidateSource implements driver.Driver. -func (d *drvr) ValidateSource(src *source.Source) (*source.Source, error) { +func (d *driveri) ValidateSource(src *source.Source) (*source.Source, error) { d.log.Debug("Validating source", lga.Src, src) if string(src.Type) != d.def.Name { return nil, errz.Errorf("expected driver type {%s} but got {%s}", d.def.Name, src.Type) @@ -124,7 +124,7 @@ func (d *drvr) ValidateSource(src *source.Source) (*source.Source, error) { } // Ping implements driver.Driver. -func (d *drvr) Ping(_ context.Context, src *source.Source) error { +func (d *driveri) Ping(_ context.Context, src *source.Source) error { d.log.Debug("Ping source", lga.Driver, d.typ, lga.Src, src, @@ -141,39 +141,39 @@ func (d *drvr) Ping(_ context.Context, src *source.Source) error { return r.Close() } -// database implements driver.Database. -type database struct { +// pool implements driver.Pool. +type pool struct { log *slog.Logger src *source.Source - impl driver.Database + impl driver.Pool // clnup will ultimately invoke impl.Close to dispose of // the scratch DB. clnup *cleanup.Cleanup } -// DB implements driver.Database. -func (d *database) DB(ctx context.Context) (*sql.DB, error) { +// DB implements driver.Pool. +func (d *pool) DB(ctx context.Context) (*sql.DB, error) { return d.impl.DB(ctx) } -// SQLDriver implements driver.Database. -func (d *database) SQLDriver() driver.SQLDriver { +// SQLDriver implements driver.Pool. +func (d *pool) SQLDriver() driver.SQLDriver { return d.impl.SQLDriver() } -// Source implements driver.Database. -func (d *database) Source() *source.Source { +// Source implements driver.Pool. +func (d *pool) Source() *source.Source { return d.src } -// TableMetadata implements driver.Database. -func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { +// TableMetadata implements driver.Pool. +func (d *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { return d.impl.TableMetadata(ctx, tblName) } -// SourceMetadata implements driver.Database. -func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { +// SourceMetadata implements driver.Pool. +func (d *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { meta, err := d.impl.SourceMetadata(ctx, noSchema) if err != nil { return nil, err @@ -190,8 +190,8 @@ func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.M return meta, nil } -// Close implements driver.Database. -func (d *database) Close() error { +// Close implements driver.Pool. +func (d *pool) Close() error { d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) // We don't need to explicitly invoke c.impl.Close diff --git a/drivers/userdriver/userdriver_test.go b/drivers/userdriver/userdriver_test.go index 9801a5cd..71dfa0c6 100644 --- a/drivers/userdriver/userdriver_test.go +++ b/drivers/userdriver/userdriver_test.go @@ -40,15 +40,15 @@ func TestDriver(t *testing.T) { err := drvr.Ping(th.Context, src) require.NoError(t, err) - dbase, err := drvr.Open(th.Context, src) + pool, err := drvr.Open(th.Context, src) require.NoError(t, err) - t.Cleanup(func() { assert.NoError(t, dbase.Close()) }) + t.Cleanup(func() { assert.NoError(t, pool.Close()) }) - srcMeta, err := dbase.SourceMetadata(th.Context, false) + srcMeta, err := pool.SourceMetadata(th.Context, false) require.NoError(t, err) require.True(t, stringz.InSlice(srcMeta.TableNames(), tc.tbl)) - tblMeta, err := dbase.TableMetadata(th.Context, tc.tbl) + tblMeta, err := pool.TableMetadata(th.Context, tc.tbl) require.NoError(t, err) require.Equal(t, tc.tbl, tblMeta.Name) diff --git a/drivers/userdriver/xmlud/xmlimport.go b/drivers/userdriver/xmlud/xmlimport.go index f932306c..95964ff7 100644 --- a/drivers/userdriver/xmlud/xmlimport.go +++ b/drivers/userdriver/xmlud/xmlimport.go @@ -29,7 +29,7 @@ import ( const Genre = "xml" // Import implements userdriver.ImportFunc. -func Import(ctx context.Context, def *userdriver.DriverDef, data io.Reader, destDB driver.Database) error { +func Import(ctx context.Context, def *userdriver.DriverDef, data io.Reader, destPool driver.Pool) error { if def.Genre != Genre { return errz.Errorf("xmlud.Import does not support genre {%s}", def.Genre) } @@ -47,7 +47,7 @@ func Import(ctx context.Context, def *userdriver.DriverDef, data io.Reader, dest msgOnce: map[string]struct{}{}, } - err := im.execImport(ctx, data, destDB) + err := im.execImport(ctx, data, destPool) err2 := im.clnup.Run() if err != nil { return errz.Wrap(err, "xml import") @@ -61,7 +61,7 @@ type importer struct { log *slog.Logger def *userdriver.DriverDef data io.Reader - destDB driver.Database + destPool driver.Pool selStack *selStack rowStack *rowStack tblDefs map[string]*sqlmodel.TableDef @@ -88,8 +88,8 @@ type importer struct { msgOnce map[string]struct{} } -func (im *importer) execImport(ctx context.Context, r io.Reader, destDB driver.Database) error { //nolint:gocognit - im.data, im.destDB = r, destDB +func (im *importer) execImport(ctx context.Context, r io.Reader, destPool driver.Pool) error { //nolint:gocognit + im.data, im.destPool = r, destPool err := im.createTables(ctx) if err != nil { @@ -431,13 +431,13 @@ func (im *importer) dbInsert(ctx context.Context, row *rowState) error { execInsertFn, ok := im.execInsertFns[cacheKey] if !ok { - db, err := im.destDB.DB(ctx) + db, err := im.destPool.DB(ctx) if err != nil { return err } // Nothing cached, prepare the insert statement and insert munge func - stmtExecer, err := im.destDB.SQLDriver().PrepareInsertStmt(ctx, db, tblName, colNames, 1) + stmtExecer, err := im.destPool.SQLDriver().PrepareInsertStmt(ctx, db, tblName, colNames, 1) if err != nil { return err } @@ -471,7 +471,7 @@ func (im *importer) dbInsert(ctx context.Context, row *rowState) error { // dbUpdate updates row's table with row's dirty values, using row's // primary key cols as the args to the WHERE clause. func (im *importer) dbUpdate(ctx context.Context, row *rowState) error { - drvr := im.destDB.SQLDriver() + drvr := im.destPool.SQLDriver() tblName := row.tbl.Name pkColNames := row.tbl.PrimaryKey @@ -508,7 +508,7 @@ func (im *importer) dbUpdate(ctx context.Context, row *rowState) error { cacheKey := "##update_func__" + tblName + "__" + strings.Join(colNames, ",") + whereClause execUpdateFn, ok := im.execUpdateFns[cacheKey] if !ok { - db, err := im.destDB.DB(ctx) + db, err := im.destPool.DB(ctx) if err != nil { return err } @@ -578,16 +578,16 @@ func (im *importer) createTables(ctx context.Context) error { im.tblDefs[tblDef.Name] = tblDef - db, err := im.destDB.DB(ctx) + db, err := im.destPool.DB(ctx) if err != nil { return err } - err = im.destDB.SQLDriver().CreateTable(ctx, db, tblDef) + err = im.destPool.SQLDriver().CreateTable(ctx, db, tblDef) if err != nil { return err } - im.log.Debug("Created table", lga.Target, source.Target(im.destDB.Source(), tblDef.Name)) + im.log.Debug("Created table", lga.Target, source.Target(im.destPool.Source(), tblDef.Name)) } return nil diff --git a/drivers/userdriver/xmlud/xmlimport_test.go b/drivers/userdriver/xmlud/xmlimport_test.go index 4d9fc19d..fe86b071 100644 --- a/drivers/userdriver/xmlud/xmlimport_test.go +++ b/drivers/userdriver/xmlud/xmlimport_test.go @@ -33,7 +33,7 @@ func TestImport_Ppl(t *testing.T) { require.Equal(t, driverPpl, udDef.Name) require.Equal(t, xmlud.Genre, udDef.Genre) - scratchDB, err := th.Databases().OpenScratch(th.Context, "ppl") + scratchDB, err := th.Pools().OpenScratch(th.Context, "ppl") require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, scratchDB.Close()) @@ -78,7 +78,7 @@ func TestImport_RSS(t *testing.T) { require.Equal(t, driverRSS, udDef.Name) require.Equal(t, xmlud.Genre, udDef.Genre) - scratchDB, err := th.Databases().OpenScratch(th.Context, "rss") + scratchDB, err := th.Pools().OpenScratch(th.Context, "rss") require.NoError(t, err) t.Cleanup(func() { assert.NoError(t, scratchDB.Close()) diff --git a/drivers/xlsx/database.go b/drivers/xlsx/database.go index 0b426317..b5c76010 100644 --- a/drivers/xlsx/database.go +++ b/drivers/xlsx/database.go @@ -18,16 +18,16 @@ import ( "github.com/neilotoole/sq/libsq/source" ) -// database implements driver.Database. It implements a deferred ingest +// pool implements driver.Pool. It implements a deferred ingest // of the Excel data. -type database struct { - // REVISIT: do we need database.log, or can we use lg.FromContext? +type pool struct { + // REVISIT: do we need pool.log, or can we use lg.FromContext? log *slog.Logger - src *source.Source - files *source.Files - scratchDB driver.Database - clnup *cleanup.Cleanup + src *source.Source + files *source.Files + scratchPool driver.Pool + clnup *cleanup.Cleanup mu sync.Mutex ingestOnce sync.Once @@ -40,28 +40,28 @@ type database struct { } // checkIngest performs data ingestion if not already done. -func (d *database) checkIngest(ctx context.Context) error { - d.ingestOnce.Do(func() { - d.ingestErr = d.doIngest(ctx, d.ingestSheetNames) +func (p *pool) checkIngest(ctx context.Context) error { + p.ingestOnce.Do(func() { + p.ingestErr = p.doIngest(ctx, p.ingestSheetNames) }) - return d.ingestErr + return p.ingestErr } // doIngest performs data ingest. It must only be invoked from checkIngest. -func (d *database) doIngest(ctx context.Context, includeSheetNames []string) error { +func (p *pool) doIngest(ctx context.Context, includeSheetNames []string) error { log := lg.FromContext(ctx) // Because of the deferred ingest mechanism, we need to ensure that // the context being passed down the stack (in particular to ingestXLSX) // has the source's options on it. - ctx = options.NewContext(ctx, options.Merge(options.FromContext(ctx), d.src.Options)) + ctx = options.NewContext(ctx, options.Merge(options.FromContext(ctx), p.src.Options)) - r, err := d.files.Open(d.src) + r, err := p.files.Open(p.src) if err != nil { return err } - defer lg.WarnIfCloseError(d.log, lgm.CloseFileReader, r) + defer lg.WarnIfCloseError(p.log, lgm.CloseFileReader, r) xfile, err := excelize.OpenReader(r, excelize.Options{RawCellValue: false}) if err != nil { @@ -70,83 +70,83 @@ func (d *database) doIngest(ctx context.Context, includeSheetNames []string) err defer lg.WarnIfCloseError(log, lgm.CloseFileReader, xfile) - err = ingestXLSX(ctx, d.src, d.scratchDB, xfile, includeSheetNames) + err = ingestXLSX(ctx, p.src, p.scratchPool, xfile, includeSheetNames) if err != nil { - lg.WarnIfError(d.log, lgm.CloseDB, d.clnup.Run()) + lg.WarnIfError(p.log, lgm.CloseDB, p.clnup.Run()) return err } return err } -// DB implements driver.Database. -func (d *database) DB(ctx context.Context) (*sql.DB, error) { - d.mu.Lock() - defer d.mu.Unlock() +// DB implements driver.Pool. +func (p *pool) DB(ctx context.Context) (*sql.DB, error) { + p.mu.Lock() + defer p.mu.Unlock() - if err := d.checkIngest(ctx); err != nil { + if err := p.checkIngest(ctx); err != nil { return nil, err } - return d.scratchDB.DB(ctx) + return p.scratchPool.DB(ctx) } -// SQLDriver implements driver.Database. -func (d *database) SQLDriver() driver.SQLDriver { - return d.scratchDB.SQLDriver() +// SQLDriver implements driver.Pool. +func (p *pool) SQLDriver() driver.SQLDriver { + return p.scratchPool.SQLDriver() } -// Source implements driver.Database. -func (d *database) Source() *source.Source { - return d.src +// Source implements driver.Pool. +func (p *pool) Source() *source.Source { + return p.src } -// SourceMetadata implements driver.Database. -func (d *database) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { - d.mu.Lock() - defer d.mu.Unlock() +// SourceMetadata implements driver.Pool. +func (p *pool) SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) { + p.mu.Lock() + defer p.mu.Unlock() - if err := d.checkIngest(ctx); err != nil { + if err := p.checkIngest(ctx); err != nil { return nil, err } - md, err := d.scratchDB.SourceMetadata(ctx, noSchema) + md, err := p.scratchPool.SourceMetadata(ctx, noSchema) if err != nil { return nil, err } - md.Handle = d.src.Handle + md.Handle = p.src.Handle md.Driver = Type - md.Location = d.src.Location - if md.Name, err = source.LocationFileName(d.src); err != nil { + md.Location = p.src.Location + if md.Name, err = source.LocationFileName(p.src); err != nil { return nil, err } md.FQName = md.Name - if md.Size, err = d.files.Size(d.src); err != nil { + if md.Size, err = p.files.Size(p.src); err != nil { return nil, err } return md, nil } -// TableMetadata implements driver.Database. -func (d *database) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { - d.mu.Lock() - defer d.mu.Unlock() +// TableMetadata implements driver.Pool. +func (p *pool) TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) { + p.mu.Lock() + defer p.mu.Unlock() - d.ingestSheetNames = []string{tblName} - if err := d.checkIngest(ctx); err != nil { + p.ingestSheetNames = []string{tblName} + if err := p.checkIngest(ctx); err != nil { return nil, err } - return d.scratchDB.TableMetadata(ctx, tblName) + return p.scratchPool.TableMetadata(ctx, tblName) } -// Close implements driver.Database. -func (d *database) Close() error { - d.log.Debug(lgm.CloseDB, lga.Handle, d.src.Handle) +// Close implements driver.Pool. +func (p *pool) Close() error { + p.log.Debug(lgm.CloseDB, lga.Handle, p.src.Handle) - // No need to explicitly invoke c.scratchDB.Close because + // No need to explicitly invoke c.scratchPool.Close because // that's already added to c.clnup - return d.clnup.Run() + return p.clnup.Run() } diff --git a/drivers/xlsx/ingest.go b/drivers/xlsx/ingest.go index 25859351..c90c77bf 100644 --- a/drivers/xlsx/ingest.go +++ b/drivers/xlsx/ingest.go @@ -92,16 +92,16 @@ func (xs *xSheet) loadSampleRows(ctx context.Context, sampleSize int) error { return nil } -// ingestXLSX loads the data in xfile into scratchDB. +// ingestXLSX loads the data in xfile into scratchPool. // If includeSheetNames is non-empty, only the named sheets are ingested. -func ingestXLSX(ctx context.Context, src *source.Source, scratchDB driver.Database, +func ingestXLSX(ctx context.Context, src *source.Source, scratchPool driver.Pool, xfile *excelize.File, includeSheetNames []string, ) error { log := lg.FromContext(ctx) start := time.Now() log.Debug("Beginning import from XLSX", lga.Src, src, - lga.Target, scratchDB.Source()) + lga.Target, scratchPool.Source()) var sheets []*xSheet if len(includeSheetNames) > 0 { @@ -132,18 +132,18 @@ func ingestXLSX(ctx context.Context, src *source.Source, scratchDB driver.Databa } var db *sql.DB - if db, err = scratchDB.DB(ctx); err != nil { + if db, err = scratchPool.DB(ctx); err != nil { return err } - if err = scratchDB.SQLDriver().CreateTable(ctx, db, sheetTbl.def); err != nil { + if err = scratchPool.SQLDriver().CreateTable(ctx, db, sheetTbl.def); err != nil { return err } } log.Debug("Tables created (but not yet populated)", lga.Count, len(sheetTbls), - lga.Target, scratchDB.Source(), + lga.Target, scratchPool.Source(), lga.Elapsed, time.Since(start)) var imported, skipped int @@ -154,7 +154,7 @@ func ingestXLSX(ctx context.Context, src *source.Source, scratchDB driver.Databa continue } - if err = ingestSheetToTable(ctx, scratchDB, sheetTbls[i]); err != nil { + if err = ingestSheetToTable(ctx, scratchPool, sheetTbls[i]); err != nil { return err } imported++ @@ -164,7 +164,7 @@ func ingestXLSX(ctx context.Context, src *source.Source, scratchDB driver.Databa lga.Count, imported, "skipped", skipped, lga.From, src, - lga.To, scratchDB.Source(), + lga.To, scratchPool.Source(), lga.Elapsed, time.Since(start), ) @@ -172,8 +172,8 @@ func ingestXLSX(ctx context.Context, src *source.Source, scratchDB driver.Databa } // ingestSheetToTable imports the sheet data into the appropriate table -// in scratchDB. The scratch table must already exist. -func ingestSheetToTable(ctx context.Context, scratchDB driver.Database, sheetTbl *sheetTable) error { +// in scratchPool. The scratch table must already exist. +func ingestSheetToTable(ctx context.Context, scratchPool driver.Pool, sheetTbl *sheetTable) error { var ( log = lg.FromContext(ctx) startTime = time.Now() @@ -183,7 +183,7 @@ func ingestSheetToTable(ctx context.Context, scratchDB driver.Database, sheetTbl destColKinds = tblDef.ColKinds() ) - db, err := scratchDB.DB(ctx) + db, err := scratchPool.DB(ctx) if err != nil { return err } @@ -194,7 +194,7 @@ func ingestSheetToTable(ctx context.Context, scratchDB driver.Database, sheetTbl } defer lg.WarnIfCloseError(log, lgm.CloseDB, conn) - drvr := scratchDB.SQLDriver() + drvr := scratchPool.SQLDriver() batchSize := driver.MaxBatchRows(drvr, len(destColKinds)) bi, err := driver.NewBatchInsert(ctx, drvr, conn, tblDef.Name, tblDef.ColNames(), batchSize) @@ -264,7 +264,7 @@ func ingestSheetToTable(ctx context.Context, scratchDB driver.Database, sheetTbl log.Debug("Inserted rows from sheet into table", lga.Count, bi.Written(), laSheet, sheet.name, - lga.Target, source.Target(scratchDB.Source(), tblDef.Name), + lga.Target, source.Target(scratchPool.Source(), tblDef.Name), lga.Elapsed, time.Since(startTime)) return nil diff --git a/drivers/xlsx/xlsx.go b/drivers/xlsx/xlsx.go index fecc6feb..ae05d31f 100644 --- a/drivers/xlsx/xlsx.go +++ b/drivers/xlsx/xlsx.go @@ -31,7 +31,7 @@ const ( type Provider struct { Log *slog.Logger Files *source.Files - Scratcher driver.ScratchDatabaseOpener + Scratcher driver.ScratchPoolOpener } // DriverFor implements driver.Provider. @@ -46,7 +46,7 @@ func (p *Provider) DriverFor(typ source.DriverType) (driver.Driver, error) { // Driver implements driver.Driver. type Driver struct { log *slog.Logger - scratcher driver.ScratchDatabaseOpener + scratcher driver.ScratchPoolOpener files *source.Files } @@ -59,27 +59,27 @@ func (d *Driver) DriverMetadata() driver.Metadata { } } -// Open implements driver.DatabaseOpener. -func (d *Driver) Open(ctx context.Context, src *source.Source) (driver.Database, error) { +// Open implements driver.PoolOpener. +func (d *Driver) Open(ctx context.Context, src *source.Source) (driver.Pool, error) { lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) - scratchDB, err := d.scratcher.OpenScratch(ctx, src.Handle) + scratchPool, err := d.scratcher.OpenScratch(ctx, src.Handle) if err != nil { return nil, err } clnup := cleanup.New() - clnup.AddE(scratchDB.Close) + clnup.AddE(scratchPool.Close) - dbase := &database{ - log: d.log, - src: src, - scratchDB: scratchDB, - files: d.files, - clnup: clnup, + p := &pool{ + log: d.log, + src: src, + scratchPool: scratchPool, + files: d.files, + clnup: clnup, } - return dbase, nil + return p, nil } // Truncate implements driver.Driver. diff --git a/drivers/xlsx/xlsx_test.go b/drivers/xlsx/xlsx_test.go index bbb8e41d..5f652cfe 100644 --- a/drivers/xlsx/xlsx_test.go +++ b/drivers/xlsx/xlsx_test.go @@ -161,9 +161,9 @@ func TestOpenFileFormats(t *testing.T) { Location: filepath.Join("testdata", "file_formats", tc.filename), }) - dbase, err := th.Databases().Open(th.Context, src) + pool, err := th.Pools().Open(th.Context, src) require.NoError(t, err) - db, err := dbase.DB(th.Context) + db, err := pool.DB(th.Context) if tc.wantErr { require.Error(t, err) return diff --git a/libsq/dbwriter.go b/libsq/dbwriter.go index 752959ff..d22a0b37 100644 --- a/libsq/dbwriter.go +++ b/libsq/dbwriter.go @@ -26,7 +26,7 @@ import ( type DBWriter struct { wg *sync.WaitGroup cancelFn context.CancelFunc - destDB driver.Database + destPool driver.Pool destTbl string recordCh chan record.Record bi *driver.BatchInsert @@ -42,17 +42,17 @@ type DBWriter struct { // DBWriterPreWriteHook is a function that is invoked before DBWriter // begins writing. -type DBWriterPreWriteHook func(ctx context.Context, recMeta record.Meta, destDB driver.Database, tx sqlz.DB) error +type DBWriterPreWriteHook func(ctx context.Context, recMeta record.Meta, destPool driver.Pool, tx sqlz.DB) error // DBWriterCreateTableIfNotExistsHook returns a hook that // creates destTblName if it does not exist. func DBWriterCreateTableIfNotExistsHook(destTblName string) DBWriterPreWriteHook { - return func(ctx context.Context, recMeta record.Meta, destDB driver.Database, tx sqlz.DB) error { - db, err := destDB.DB(ctx) + return func(ctx context.Context, recMeta record.Meta, destPool driver.Pool, tx sqlz.DB) error { + db, err := destPool.DB(ctx) if err != nil { return err } - tblExists, err := destDB.SQLDriver().TableExists(ctx, db, destTblName) + tblExists, err := destPool.SQLDriver().TableExists(ctx, db, destTblName) if err != nil { return errz.Err(err) } @@ -65,9 +65,9 @@ func DBWriterCreateTableIfNotExistsHook(destTblName string) DBWriterPreWriteHook destColKinds := recMeta.Kinds() destTblDef := sqlmodel.NewTableDef(destTblName, destColNames, destColKinds) - err = destDB.SQLDriver().CreateTable(ctx, tx, destTblDef) + err = destPool.SQLDriver().CreateTable(ctx, tx, destTblDef) if err != nil { - return errz.Wrapf(err, "failed to create dest table %s.%s", destDB.Source().Handle, destTblName) + return errz.Wrapf(err, "failed to create dest table %s.%s", destPool.Source().Handle, destTblName) } return nil @@ -76,13 +76,13 @@ func DBWriterCreateTableIfNotExistsHook(destTblName string) DBWriterPreWriteHook // NewDBWriter returns a new writer than implements RecordWriter. // The writer writes records from recordCh to destTbl -// in destDB. The recChSize param controls the size of recordCh +// in destPool. The recChSize param controls the size of recordCh // returned by the writer's Open method. -func NewDBWriter(destDB driver.Database, destTbl string, recChSize int, +func NewDBWriter(destPool driver.Pool, destTbl string, recChSize int, preWriteHooks ...DBWriterPreWriteHook, ) *DBWriter { return &DBWriter{ - destDB: destDB, + destPool: destPool, destTbl: destTbl, recordCh: make(chan record.Record, recChSize), errCh: make(chan error, 3), @@ -102,7 +102,7 @@ func (w *DBWriter) Open(ctx context.Context, cancelFn context.CancelFunc, recMet ) { w.cancelFn = cancelFn - db, err := w.destDB.DB(ctx) + db, err := w.destPool.DB(ctx) if err != nil { return nil, nil, err } @@ -110,19 +110,19 @@ func (w *DBWriter) Open(ctx context.Context, cancelFn context.CancelFunc, recMet // REVISIT: tx could potentially be passed to NewDBWriter? tx, err := db.BeginTx(ctx, nil) if err != nil { - return nil, nil, errz.Wrapf(err, "failed to open tx for %s.%s", w.destDB.Source().Handle, w.destTbl) + return nil, nil, errz.Wrapf(err, "failed to open tx for %s.%s", w.destPool.Source().Handle, w.destTbl) } for _, hook := range w.preWriteHooks { - err = hook(ctx, recMeta, w.destDB, tx) + err = hook(ctx, recMeta, w.destPool, tx) if err != nil { w.rollback(ctx, tx, err) return nil, nil, err } } - batchSize := driver.MaxBatchRows(w.destDB.SQLDriver(), len(recMeta.Names())) - w.bi, err = driver.NewBatchInsert(ctx, w.destDB.SQLDriver(), tx, w.destTbl, recMeta.Names(), batchSize) + batchSize := driver.MaxBatchRows(w.destPool.SQLDriver(), len(recMeta.Names())) + w.bi, err = driver.NewBatchInsert(ctx, w.destPool.SQLDriver(), tx, w.destTbl, recMeta.Names(), batchSize) if err != nil { w.rollback(ctx, tx, err) return nil, nil, err @@ -169,7 +169,7 @@ func (w *DBWriter) Open(ctx context.Context, cancelFn context.CancelFunc, recMet w.addErrs(commitErr) } else { lg.FromContext(ctx).Debug("Tx commit success", - lga.Target, source.Target(w.destDB.Source(), w.destTbl)) + lga.Target, source.Target(w.destPool.Source(), w.destTbl)) } return @@ -224,7 +224,7 @@ func (w *DBWriter) addErrs(errs ...error) { func (w *DBWriter) rollback(ctx context.Context, tx *sql.Tx, causeErrs ...error) { // Guaranteed to be at least one causeErr lg.FromContext(ctx).Error("failed to insert data: tx will rollback", - lga.Target, w.destDB.Source().Handle+"."+w.destTbl, + lga.Target, w.destPool.Source().Handle+"."+w.destTbl, lga.Err, causeErrs[0]) rollbackErr := errz.Err(tx.Rollback()) diff --git a/libsq/driver/driver.go b/libsq/driver/driver.go index 0ea94a62..0b924005 100644 --- a/libsq/driver/driver.go +++ b/libsq/driver/driver.go @@ -166,31 +166,31 @@ type Provider interface { DriverFor(typ source.DriverType) (Driver, error) } -// DatabaseOpener opens a Database. -type DatabaseOpener interface { - // Open returns a Database instance for src. - Open(ctx context.Context, src *source.Source) (Database, error) +// PoolOpener opens a Pool. +type PoolOpener interface { + // Open returns a Pool instance for src. + Open(ctx context.Context, src *source.Source) (Pool, error) } -// JoinDatabaseOpener can open a join database. -type JoinDatabaseOpener interface { - // OpenJoin opens an appropriate Database for use as +// JoinPoolOpener can open a join database. +type JoinPoolOpener interface { + // OpenJoin opens an appropriate Pool for use as // a work DB for joining across sources. - OpenJoin(ctx context.Context, srcs ...*source.Source) (Database, error) + OpenJoin(ctx context.Context, srcs ...*source.Source) (Pool, error) } -// ScratchDatabaseOpener opens a scratch database. A scratch database is +// ScratchPoolOpener opens a scratch database pool. A scratch database is // typically a short-lived database used as a target for loading // non-SQL data (such as CSV). -type ScratchDatabaseOpener interface { - // OpenScratch returns a database for scratch use. - OpenScratch(ctx context.Context, name string) (Database, error) +type ScratchPoolOpener interface { + // OpenScratch returns a pool for scratch use. + OpenScratch(ctx context.Context, name string) (Pool, error) } // Driver is the core interface that must be implemented for each type // of data source. type Driver interface { - DatabaseOpener + PoolOpener // DriverMetadata returns driver metadata. DriverMetadata() Metadata @@ -341,13 +341,12 @@ type SQLDriver interface { DBProperties(ctx context.Context, db sqlz.DB) (map[string]any, error) } -// Database models a database handle. It is conceptually equivalent to +// Pool models a database handle representing a pool of underlying +// connections. It is conceptually equivalent to // stdlib sql.DB, and in fact encapsulates a sql.DB instance. The // realized sql.DB instance can be accessed via the DB method. -// -// REVISIT: maybe rename driver.Database to driver.Pool or such? -type Database interface { - // DB returns the sql.DB object for this Database. +type Pool interface { + // DB returns the sql.DB object for this Pool. // This operation can take a long time if opening the DB requires // an ingest of data. // For example, with file-based sources such as XLSX, invoking Open @@ -366,13 +365,13 @@ type Database interface { // If noSchema is true, schema details are not populated // on the returned source.Metadata. // - // TODO: SourceMetadata doesn't really belong on driver.Database? It + // TODO: SourceMetadata doesn't really belong on driver.Pool? It // should be moved to driver.Driver? SourceMetadata(ctx context.Context, noSchema bool) (*source.Metadata, error) // TableMetadata returns metadata for the specified table in the data source. // - // TODO: TableMetadata doesn't really belong on driver.Database? It + // TODO: TableMetadata doesn't really belong on driver.Pool? It // should be moved to driver.Driver? TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error) @@ -411,45 +410,45 @@ type Metadata struct { } var ( - _ DatabaseOpener = (*Databases)(nil) - _ JoinDatabaseOpener = (*Databases)(nil) + _ PoolOpener = (*Pools)(nil) + _ JoinPoolOpener = (*Pools)(nil) ) -// Databases provides a mechanism for getting Database instances. +// Pools provides a mechanism for getting Pool instances. // Note that at this time instances returned by Open are cached // and then closed by Close. This may be a bad approach. -type Databases struct { +type Pools struct { log *slog.Logger drvrs Provider mu sync.Mutex scratchSrcFn ScratchSrcFunc - dbases map[string]Database + pools map[string]Pool clnup *cleanup.Cleanup } -// NewDatabases returns a Databases instances. -func NewDatabases(log *slog.Logger, drvrs Provider, scratchSrcFn ScratchSrcFunc) *Databases { - return &Databases{ +// NewPools returns a Pools instances. +func NewPools(log *slog.Logger, drvrs Provider, scratchSrcFn ScratchSrcFunc) *Pools { + return &Pools{ log: log, drvrs: drvrs, mu: sync.Mutex{}, scratchSrcFn: scratchSrcFn, - dbases: map[string]Database{}, + pools: map[string]Pool{}, clnup: cleanup.New(), } } -// Open returns an opened Database for src. The returned Database +// Open returns an opened Pool for src. The returned Pool // may be cached and returned on future invocations for the // same source (where each source fields is identical). // Thus, the caller should typically not close -// the Database: it will be closed via d.Close. +// the Pool: it will be closed via d.Close. // // NOTE: This entire logic re caching/not-closing is a bit sketchy, // and needs to be revisited. // -// Open implements DatabaseOpener. -func (d *Databases) Open(ctx context.Context, src *source.Source) (Database, error) { +// Open implements PoolOpener. +func (d *Pools) Open(ctx context.Context, src *source.Source) (Pool, error) { lg.FromContext(ctx).Debug(lgm.OpenSrc, lga.Src, src) d.mu.Lock() @@ -457,9 +456,9 @@ func (d *Databases) Open(ctx context.Context, src *source.Source) (Database, err key := src.Handle + "_" + hashSource(src) - dbase, ok := d.dbases[key] + pool, ok := d.pools[key] if ok { - return dbase, nil + return pool, nil } drvr, err := d.drvrs.DriverFor(src.Type) @@ -471,23 +470,23 @@ func (d *Databases) Open(ctx context.Context, src *source.Source) (Database, err o := options.Merge(baseOptions, src.Options) ctx = options.NewContext(ctx, o) - dbase, err = drvr.Open(ctx, src) + pool, err = drvr.Open(ctx, src) if err != nil { return nil, err } - d.clnup.AddC(dbase) + d.clnup.AddC(pool) - d.dbases[key] = dbase - return dbase, nil + d.pools[key] = pool + return pool, nil } // OpenScratch returns a scratch database instance. It is not -// necessary for the caller to close the returned Database as +// necessary for the caller to close the returned Pool as // its Close method will be invoked by d.Close. // -// OpenScratch implements ScratchDatabaseOpener. -func (d *Databases) OpenScratch(ctx context.Context, name string) (Database, error) { +// OpenScratch implements ScratchPoolOpener. +func (d *Pools) OpenScratch(ctx context.Context, name string) (Pool, error) { const msgCloseScratch = "close scratch db" scratchSrc, cleanFn, err := d.scratchSrcFn(ctx, name) @@ -509,15 +508,15 @@ func (d *Databases) OpenScratch(ctx context.Context, name string) (Database, err return nil, errz.Errorf("driver for scratch source %s is not a SQLDriver but is %T", scratchSrc.Handle, drvr) } - var backingDB Database - backingDB, err = sqlDrvr.Open(ctx, scratchSrc) + var backingPool Pool + backingPool, err = sqlDrvr.Open(ctx, scratchSrc) if err != nil { lg.WarnIfFuncError(d.log, msgCloseScratch, cleanFn) return nil, err } d.clnup.AddE(cleanFn) - return backingDB, nil + return backingPool, nil } // OpenJoin opens an appropriate database for use as @@ -530,8 +529,8 @@ func (d *Databases) OpenScratch(ctx context.Context, name string) (Database, err // the join etc.). Currently the implementation simply delegates // to OpenScratch. // -// OpenJoin implements JoinDatabaseOpener. -func (d *Databases) OpenJoin(ctx context.Context, srcs ...*source.Source) (Database, error) { +// OpenJoin implements JoinPoolOpener. +func (d *Pools) OpenJoin(ctx context.Context, srcs ...*source.Source) (Pool, error) { var names []string for _, src := range srcs { names = append(names, src.Handle[1:]) @@ -542,7 +541,7 @@ func (d *Databases) OpenJoin(ctx context.Context, srcs ...*source.Source) (Datab } // Close closes d, invoking Close on any instances opened via d.Open. -func (d *Databases) Close() error { +func (d *Pools) Close() error { d.log.Debug("Closing databases(s)...", lga.Count, d.clnup.Len()) return d.clnup.Run() } diff --git a/libsq/driver/driver_test.go b/libsq/driver/driver_test.go index b4060ec2..0ea46a4f 100644 --- a/libsq/driver/driver_test.go +++ b/libsq/driver/driver_test.go @@ -272,12 +272,12 @@ func TestDriver_Open(t *testing.T) { th := testh.New(t) src := th.Source(handle) drvr := th.DriverFor(src) - dbase, err := drvr.Open(th.Context, src) + pool, err := drvr.Open(th.Context, src) require.NoError(t, err) - db, err := dbase.DB(th.Context) + db, err := pool.DB(th.Context) require.NoError(t, err) require.NoError(t, db.PingContext(th.Context)) - require.NoError(t, dbase.Close()) + require.NoError(t, pool.Close()) }) } } @@ -441,9 +441,9 @@ func TestDatabase_TableMetadata(t *testing.T) { //nolint:tparallel t.Run(handle, func(t *testing.T) { t.Parallel() - th, _, _, dbase, _ := testh.NewWith(t, handle) + th, _, _, pool, _ := testh.NewWith(t, handle) - tblMeta, err := dbase.TableMetadata(th.Context, sakila.TblActor) + tblMeta, err := pool.TableMetadata(th.Context, sakila.TblActor) require.NoError(t, err) require.Equal(t, sakila.TblActor, tblMeta.Name) require.Equal(t, int64(sakila.TblActorCount), tblMeta.RowCount) @@ -460,9 +460,9 @@ func TestDatabase_SourceMetadata(t *testing.T) { t.Run(handle, func(t *testing.T) { t.Parallel() - th, _, _, dbase, _ := testh.NewWith(t, handle) + th, _, _, pool, _ := testh.NewWith(t, handle) - md, err := dbase.SourceMetadata(th.Context, false) + md, err := pool.SourceMetadata(th.Context, false) require.NoError(t, err) require.Equal(t, sakila.TblActor, md.Tables[0].Name) require.Equal(t, int64(sakila.TblActorCount), md.Tables[0].RowCount) @@ -482,11 +482,11 @@ func TestDatabase_SourceMetadata_concurrent(t *testing.T) { //nolint:tparallel t.Run(handle, func(t *testing.T) { t.Parallel() - th, _, _, dbase, _ := testh.NewWith(t, handle) + th, _, _, pool, _ := testh.NewWith(t, handle) g, gCtx := errgroup.WithContext(th.Context) for i := 0; i < concurrency; i++ { g.Go(func() error { - md, err := dbase.SourceMetadata(gCtx, false) + md, err := pool.SourceMetadata(gCtx, false) require.NoError(t, err) require.NotNil(t, md) gotTbl := md.Table(sakila.TblActor) @@ -539,7 +539,7 @@ func TestSQLDriver_AlterTableRename(t *testing.T) { handle := handle t.Run(handle, func(t *testing.T) { - th, src, drvr, dbase, db := testh.NewWith(t, handle) + th, src, drvr, pool, db := testh.NewWith(t, handle) // Make a copy of the table to play with tbl := th.CopyTable(true, src, tablefq.From(sakila.TblActor), tablefq.T{}, true) @@ -550,7 +550,7 @@ func TestSQLDriver_AlterTableRename(t *testing.T) { require.NoError(t, err) defer th.DropTable(src, tablefq.From(newName)) - md, err := dbase.TableMetadata(th.Context, newName) + md, err := pool.TableMetadata(th.Context, newName) require.NoError(t, err) require.Equal(t, newName, md.Name) sink, err := th.QuerySQL(src, nil, "SELECT * FROM "+newName) @@ -567,7 +567,7 @@ func TestSQLDriver_AlterTableRenameColumn(t *testing.T) { handle := handle t.Run(handle, func(t *testing.T) { - th, src, drvr, dbase, db := testh.NewWith(t, handle) + th, src, drvr, pool, db := testh.NewWith(t, handle) // Make a copy of the table to play with tbl := th.CopyTable(true, src, tablefq.From(sakila.TblActor), tablefq.T{}, true) @@ -576,7 +576,7 @@ func TestSQLDriver_AlterTableRenameColumn(t *testing.T) { err := drvr.AlterTableRenameColumn(th.Context, db, tbl, "first_name", newName) require.NoError(t, err) - md, err := dbase.TableMetadata(th.Context, tbl) + md, err := pool.TableMetadata(th.Context, tbl) require.NoError(t, err) require.NotNil(t, md.Column(newName)) sink, err := th.QuerySQL(src, nil, fmt.Sprintf("SELECT %s FROM %s", newName, tbl)) @@ -623,13 +623,13 @@ func TestSQLDriver_CurrentSchema(t *testing.T) { tc := tc t.Run(tc.handle, func(t *testing.T) { - th, _, drvr, dbase, db := testh.NewWith(t, tc.handle) + th, _, drvr, pool, db := testh.NewWith(t, tc.handle) gotSchema, err := drvr.CurrentSchema(th.Context, db) require.NoError(t, err) require.Equal(t, tc.want, gotSchema) - md, err := dbase.SourceMetadata(th.Context, false) + md, err := pool.SourceMetadata(th.Context, false) require.NoError(t, err) require.NotNil(t, md) require.Equal(t, md.Schema, gotSchema) diff --git a/libsq/libsq.go b/libsq/libsq.go index bea706de..41fbc2b8 100644 --- a/libsq/libsq.go +++ b/libsq/libsq.go @@ -30,14 +30,14 @@ type QueryContext struct { // Collection is the set of sources. Collection *source.Collection - // DBOpener is used to open databases. - DBOpener driver.DatabaseOpener + // PoolOpener is used to open databases. + PoolOpener driver.PoolOpener - // JoinDBOpener is used to open the joindb. - JoinDBOpener driver.JoinDatabaseOpener + // JoinPoolOpener is used to open the joindb. + JoinPoolOpener driver.JoinPoolOpener - // ScratchDBOpener is used to open the scratchdb. - ScratchDBOpener driver.ScratchDatabaseOpener + // ScratchPoolOpener is used to open the scratchdb. + ScratchPoolOpener driver.ScratchPoolOpener // Args defines variables that are substituted into the query. // May be nil or empty. @@ -118,26 +118,26 @@ func SLQ2SQL(ctx context.Context, qc *QueryContext, query string) (targetSQL str // QuerySQL executes the SQL query, writing the results to recw. If db is // non-nil, the query is executed against it. Otherwise, the connection is -// obtained from dbase. +// obtained from pool. // Note that QuerySQL may return before recw has finished writing, thus the // caller may wish to wait for recw to complete. -// The caller is responsible for closing dbase (and db, if non-nil). -func QuerySQL(ctx context.Context, dbase driver.Database, db sqlz.DB, +// The caller is responsible for closing pool (and db, if non-nil). +func QuerySQL(ctx context.Context, pool driver.Pool, db sqlz.DB, recw RecordWriter, query string, args ...any, ) error { log := lg.FromContext(ctx) - errw := dbase.SQLDriver().ErrWrapFunc() + errw := pool.SQLDriver().ErrWrapFunc() if db == nil { var err error - if db, err = dbase.DB(ctx); err != nil { + if db, err = pool.DB(ctx); err != nil { return err } } rows, err := db.QueryContext(ctx, query, args...) if err != nil { - return errz.Wrapf(errw(err), `SQL query against %s failed: %s`, dbase.Source().Handle, query) + return errz.Wrapf(errw(err), `SQL query against %s failed: %s`, pool.Source().Handle, query) } defer lg.WarnIfCloseError(log, lgm.CloseDBRows, rows) @@ -183,7 +183,7 @@ func QuerySQL(ctx context.Context, dbase driver.Database, db sqlz.DB, } } - drvr := dbase.SQLDriver() + drvr := pool.SQLDriver() recMeta, recFromScanRowFn, err := drvr.RecordMeta(ctx, colTypes) if err != nil { return errw(err) @@ -213,7 +213,7 @@ func QuerySQL(ctx context.Context, dbase driver.Database, db sqlz.DB, err = rows.Scan(scanRow...) if err != nil { cancelFn() - return errz.Wrapf(errw(err), "query against %s", dbase.Source().Handle) + return errz.Wrapf(errw(err), "query against %s", pool.Source().Handle) } // recFromScanRowFn returns a new Record with appropriate diff --git a/libsq/pipeline.go b/libsq/pipeline.go index b4d8a376..ca934494 100644 --- a/libsq/pipeline.go +++ b/libsq/pipeline.go @@ -44,16 +44,16 @@ type pipeline struct { rc *render.Context // tasks contains tasks that must be completed before targetSQL - // is executed against targetDB. Typically tasks is used to + // is executed against targetPool. Typically tasks is used to // set up the joindb before it is queried. tasks []tasker - // targetSQL is the ultimate SQL query to be executed against targetDB. + // targetSQL is the ultimate SQL query to be executed against targetPool. targetSQL string - // targetDB is the destination for the ultimate SQL query to + // targetPool is the destination for the ultimate SQL query to // be executed against. - targetDB driver.Database + targetPool driver.Pool } // newPipeline parses query, returning a pipeline prepared for @@ -87,7 +87,7 @@ func newPipeline(ctx context.Context, qc *QueryContext, query string) (*pipeline func (p *pipeline) execute(ctx context.Context, recw RecordWriter) error { lg.FromContext(ctx).Debug( "Execute SQL query", - lga.Src, p.targetDB.Source(), + lga.Src, p.targetPool.Source(), lga.SQL, p.targetSQL, ) @@ -95,7 +95,7 @@ func (p *pipeline) execute(ctx context.Context, recw RecordWriter) error { return err } - return QuerySQL(ctx, p.targetDB, nil, recw, p.targetSQL) + return QuerySQL(ctx, p.targetPool, nil, recw, p.targetSQL) } // executeTasks executes any tasks in pipeline.tasks. @@ -146,15 +146,15 @@ func (p *pipeline) prepareNoTable(ctx context.Context, qm *queryModel) error { if handle == "" { if src = p.qc.Collection.Active(); src == nil { log.Debug("No active source, will use scratchdb.") - p.targetDB, err = p.qc.ScratchDBOpener.OpenScratch(ctx, "scratch") + p.targetPool, err = p.qc.ScratchPoolOpener.OpenScratch(ctx, "scratch") if err != nil { return err } p.rc = &render.Context{ - Renderer: p.targetDB.SQLDriver().Renderer(), + Renderer: p.targetPool.SQLDriver().Renderer(), Args: p.qc.Args, - Dialect: p.targetDB.SQLDriver().Dialect(), + Dialect: p.targetPool.SQLDriver().Dialect(), } return nil } @@ -165,14 +165,14 @@ func (p *pipeline) prepareNoTable(ctx context.Context, qm *queryModel) error { } // At this point, src is non-nil. - if p.targetDB, err = p.qc.DBOpener.Open(ctx, src); err != nil { + if p.targetPool, err = p.qc.PoolOpener.Open(ctx, src); err != nil { return err } p.rc = &render.Context{ - Renderer: p.targetDB.SQLDriver().Renderer(), + Renderer: p.targetPool.SQLDriver().Renderer(), Args: p.qc.Args, - Dialect: p.targetDB.SQLDriver().Dialect(), + Dialect: p.targetPool.SQLDriver().Dialect(), } return nil @@ -182,7 +182,7 @@ func (p *pipeline) prepareNoTable(ctx context.Context, qm *queryModel) error { // // When this function returns, pipeline.rc will be set. func (p *pipeline) prepareFromTable(ctx context.Context, tblSel *ast.TblSelectorNode) (fromClause string, - fromConn driver.Database, err error, + fromPool driver.Pool, err error, ) { handle := tblSel.Handle() if handle == "" { @@ -197,16 +197,16 @@ func (p *pipeline) prepareFromTable(ctx context.Context, tblSel *ast.TblSelector return "", nil, err } - fromConn, err = p.qc.DBOpener.Open(ctx, src) + fromPool, err = p.qc.PoolOpener.Open(ctx, src) if err != nil { return "", nil, err } - rndr := fromConn.SQLDriver().Renderer() + rndr := fromPool.SQLDriver().Renderer() p.rc = &render.Context{ Renderer: rndr, Args: p.qc.Args, - Dialect: fromConn.SQLDriver().Dialect(), + Dialect: fromPool.SQLDriver().Dialect(), } fromClause, err = rndr.FromTable(p.rc, tblSel) @@ -214,7 +214,7 @@ func (p *pipeline) prepareFromTable(ctx context.Context, tblSel *ast.TblSelector return "", nil, err } - return fromClause, fromConn, nil + return fromClause, fromPool, nil } // joinClause models the SQL "JOIN" construct. @@ -270,7 +270,7 @@ func (jc *joinClause) isSingleSource() bool { // // When this function returns, pipeline.rc will be set. func (p *pipeline) prepareFromJoin(ctx context.Context, jc *joinClause) (fromClause string, - fromConn driver.Database, err error, + fromConn driver.Pool, err error, ) { if jc.isSingleSource() { return p.joinSingleSource(ctx, jc) @@ -283,23 +283,23 @@ func (p *pipeline) prepareFromJoin(ctx context.Context, jc *joinClause) (fromCla // // On return, pipeline.rc will be set. func (p *pipeline) joinSingleSource(ctx context.Context, jc *joinClause) (fromClause string, - fromDB driver.Database, err error, + fromPool driver.Pool, err error, ) { src, err := p.qc.Collection.Get(jc.leftTbl.Handle()) if err != nil { return "", nil, err } - fromDB, err = p.qc.DBOpener.Open(ctx, src) + fromPool, err = p.qc.PoolOpener.Open(ctx, src) if err != nil { return "", nil, err } - rndr := fromDB.SQLDriver().Renderer() + rndr := fromPool.SQLDriver().Renderer() p.rc = &render.Context{ Renderer: rndr, Args: p.qc.Args, - Dialect: fromDB.SQLDriver().Dialect(), + Dialect: fromPool.SQLDriver().Dialect(), } fromClause, err = rndr.Join(p.rc, jc.leftTbl, jc.joins) @@ -307,7 +307,7 @@ func (p *pipeline) joinSingleSource(ctx context.Context, jc *joinClause) (fromCl return "", nil, err } - return fromClause, fromDB, nil + return fromClause, fromPool, nil } // joinCrossSource returns a FROM clause that forms part of @@ -315,10 +315,8 @@ func (p *pipeline) joinSingleSource(ctx context.Context, jc *joinClause) (fromCl // // On return, pipeline.rc will be set. func (p *pipeline) joinCrossSource(ctx context.Context, jc *joinClause) (fromClause string, - fromDB driver.Database, err error, + fromDB driver.Pool, err error, ) { - // FIXME: finish tidying up - handles := jc.handles() srcs := make([]*source.Source, 0, len(handles)) for _, handle := range handles { @@ -330,16 +328,16 @@ func (p *pipeline) joinCrossSource(ctx context.Context, jc *joinClause) (fromCla } // Open the join db - joinDB, err := p.qc.JoinDBOpener.OpenJoin(ctx, srcs...) + joinPool, err := p.qc.JoinPoolOpener.OpenJoin(ctx, srcs...) if err != nil { return "", nil, err } - rndr := joinDB.SQLDriver().Renderer() + rndr := joinPool.SQLDriver().Renderer() p.rc = &render.Context{ Renderer: rndr, Args: p.qc.Args, - Dialect: joinDB.SQLDriver().Dialect(), + Dialect: joinPool.SQLDriver().Dialect(), } leftHandle := jc.leftTbl.Handle() @@ -356,16 +354,16 @@ func (p *pipeline) joinCrossSource(ctx context.Context, jc *joinClause) (fromCla if src, err = p.qc.Collection.Get(handle); err != nil { return "", nil, err } - var db driver.Database - if db, err = p.qc.DBOpener.Open(ctx, src); err != nil { + var db driver.Pool + if db, err = p.qc.PoolOpener.Open(ctx, src); err != nil { return "", nil, err } task := &joinCopyTask{ - fromDB: db, - fromTbl: tbl.Table(), - toDB: joinDB, - toTbl: tbl.TblAliasOrName(), + fromPool: db, + fromTbl: tbl.Table(), + toPool: joinPool, + toTbl: tbl.TblAliasOrName(), } tbl.SyncTblNameAlias() @@ -378,7 +376,7 @@ func (p *pipeline) joinCrossSource(ctx context.Context, jc *joinClause) (fromCla return "", nil, err } - return fromClause, joinDB, nil + return fromClause, joinPool, nil } // tasker is the interface for executing a DB task. @@ -389,59 +387,59 @@ type tasker interface { // joinCopyTask is a specification of a table data copy task to be performed // for a cross-source join. That is, the data in fromDB.fromTblName will -// be copied to a table in toDB. If colNames is +// be copied to a table in toPool. If colNames is // empty, all cols in fromTbl are to be copied. type joinCopyTask struct { - fromDB driver.Database - fromTbl tablefq.T - toDB driver.Database - toTbl tablefq.T + fromPool driver.Pool + fromTbl tablefq.T + toPool driver.Pool + toTbl tablefq.T } func (jt *joinCopyTask) executeTask(ctx context.Context) error { - return execCopyTable(ctx, jt.fromDB, jt.fromTbl, jt.toDB, jt.toTbl) + return execCopyTable(ctx, jt.fromPool, jt.fromTbl, jt.toPool, jt.toTbl) } -// execCopyTable performs the work of copying fromDB.fromTbl to destDB.destTbl. -func execCopyTable(ctx context.Context, fromDB driver.Database, fromTbl tablefq.T, - destDB driver.Database, destTbl tablefq.T, +// execCopyTable performs the work of copying fromDB.fromTbl to destPool.destTbl. +func execCopyTable(ctx context.Context, fromDB driver.Pool, fromTbl tablefq.T, + destPool driver.Pool, destTbl tablefq.T, ) error { log := lg.FromContext(ctx) - createTblHook := func(ctx context.Context, originRecMeta record.Meta, destDB driver.Database, + createTblHook := func(ctx context.Context, originRecMeta record.Meta, destPool driver.Pool, tx sqlz.DB, ) error { destColNames := originRecMeta.Names() destColKinds := originRecMeta.Kinds() destTblDef := sqlmodel.NewTableDef(destTbl.Table, destColNames, destColKinds) - err := destDB.SQLDriver().CreateTable(ctx, tx, destTblDef) + err := destPool.SQLDriver().CreateTable(ctx, tx, destTblDef) if err != nil { - return errz.Wrapf(err, "failed to create dest table %s.%s", destDB.Source().Handle, destTbl) + return errz.Wrapf(err, "failed to create dest table %s.%s", destPool.Source().Handle, destTbl) } return nil } inserter := NewDBWriter( - destDB, + destPool, destTbl.Table, - driver.OptTuningRecChanSize.Get(destDB.Source().Options), + driver.OptTuningRecChanSize.Get(destPool.Source().Options), createTblHook, ) query := "SELECT * FROM " + fromTbl.Render(fromDB.SQLDriver().Dialect().Enquote) err := QuerySQL(ctx, fromDB, nil, inserter, query) if err != nil { - return errz.Wrapf(err, "insert %s.%s failed", destDB.Source().Handle, destTbl) + return errz.Wrapf(err, "insert %s.%s failed", destPool.Source().Handle, destTbl) } affected, err := inserter.Wait() // Wait for the writer to finish processing if err != nil { - return errz.Wrapf(err, "insert %s.%s failed", destDB.Source().Handle, destTbl) + return errz.Wrapf(err, "insert %s.%s failed", destPool.Source().Handle, destTbl) } log.Debug("Copied rows to dest", lga.Count, affected, lga.From, fmt.Sprintf("%s.%s", fromDB.Source().Handle, fromTbl), - lga.To, fmt.Sprintf("%s.%s", destDB.Source().Handle, destTbl)) + lga.To, fmt.Sprintf("%s.%s", destPool.Source().Handle, destTbl)) return nil } diff --git a/libsq/prepare.go b/libsq/prepare.go index b55b094c..3b612f3b 100644 --- a/libsq/prepare.go +++ b/libsq/prepare.go @@ -7,9 +7,9 @@ import ( ) // prepare prepares the pipeline to execute queryModel. -// When this method returns, targetDB and targetSQL will be set, +// When this method returns, targetPool and targetSQL will be set, // as will any tasks (which may be empty). The tasks must be executed -// against targetDB before targetSQL is executed (the pipeline.execute +// against targetPool before targetSQL is executed (the pipeline.execute // method does this work). func (p *pipeline) prepare(ctx context.Context, qm *queryModel) error { var ( @@ -25,11 +25,11 @@ func (p *pipeline) prepare(ctx context.Context, qm *queryModel) error { } case len(qm.Joins) > 0: jc := &joinClause{leftTbl: qm.Table, joins: qm.Joins} - if frags.From, p.targetDB, err = p.prepareFromJoin(ctx, jc); err != nil { + if frags.From, p.targetPool, err = p.prepareFromJoin(ctx, jc); err != nil { return err } default: - if frags.From, p.targetDB, err = p.prepareFromTable(ctx, qm.Table); err != nil { + if frags.From, p.targetPool, err = p.prepareFromTable(ctx, qm.Table); err != nil { return err } } diff --git a/libsq/query_no_src_test.go b/libsq/query_no_src_test.go index 519730bc..323e952e 100644 --- a/libsq/query_no_src_test.go +++ b/libsq/query_no_src_test.go @@ -31,13 +31,13 @@ func TestQuery_no_source(t *testing.T) { t.Logf("\nquery: %s\n want: %s", tc.in, tc.want) th := testh.New(t) coll := th.NewCollection() - dbases := th.Databases() + pools := th.Pools() qc := &libsq.QueryContext{ - Collection: coll, - DBOpener: dbases, - JoinDBOpener: dbases, - ScratchDBOpener: dbases, + Collection: coll, + PoolOpener: pools, + JoinPoolOpener: pools, + ScratchPoolOpener: pools, } gotSQL, gotErr := libsq.SLQ2SQL(th.Context, qc, tc.in) diff --git a/libsq/query_test.go b/libsq/query_test.go index ad927d1c..9e3fac9a 100644 --- a/libsq/query_test.go +++ b/libsq/query_test.go @@ -168,14 +168,14 @@ func doExecQueryTestCase(t *testing.T, tc queryTestCase) { require.NoError(t, err) th := testh.New(t) - dbases := th.Databases() + pools := th.Pools() qc := &libsq.QueryContext{ - Collection: coll, - DBOpener: dbases, - JoinDBOpener: dbases, - ScratchDBOpener: dbases, - Args: tc.args, + Collection: coll, + PoolOpener: pools, + JoinPoolOpener: pools, + ScratchPoolOpener: pools, + Args: tc.args, } if tc.beforeRun != nil { diff --git a/testh/testh.go b/testh/testh.go index b7d54dbe..e55e050f 100644 --- a/testh/testh.go +++ b/testh/testh.go @@ -103,10 +103,10 @@ type Helper struct { T testing.TB Log *slog.Logger - registry *driver.Registry - files *source.Files - databases *driver.Databases - run *run.Run + registry *driver.Registry + files *source.Files + pools *driver.Pools + run *run.Run initOnce sync.Once @@ -145,20 +145,20 @@ func New(t testing.TB, opts ...Option) *Helper { } // NewWith is a convenience wrapper for New that also returns -// a Source for handle, the driver.SQLDriver, driver.Database, +// a Source for handle, the driver.SQLDriver, driver.Pool, // and the *sql.DB. // // The function will fail if handle is not the handle for a // source whose driver implements driver.SQLDriver. -func NewWith(t testing.TB, handle string) (*Helper, *source.Source, driver.SQLDriver, driver.Database, *sql.DB) { +func NewWith(t testing.TB, handle string) (*Helper, *source.Source, driver.SQLDriver, driver.Pool, *sql.DB) { th := New(t) src := th.Source(handle) drvr := th.SQLDriverFor(src) - dbase := th.Open(src) - db, err := dbase.DB(th.Context) + pool := th.Open(src) + db, err := pool.DB(th.Context) require.NoError(t, err) - return th, src, drvr, dbase, db + return th, src, drvr, pool, db } func (h *Helper) init() { @@ -178,20 +178,20 @@ func (h *Helper) init() { h.files.AddDriverDetectors(source.DetectMagicNumber) - h.databases = driver.NewDatabases(log, h.registry, sqlite3.NewScratchSource) - h.Cleanup.AddC(h.databases) + h.pools = driver.NewPools(log, h.registry, sqlite3.NewScratchSource) + h.Cleanup.AddC(h.pools) h.registry.AddProvider(sqlite3.Type, &sqlite3.Provider{Log: log}) h.registry.AddProvider(postgres.Type, &postgres.Provider{Log: log}) h.registry.AddProvider(sqlserver.Type, &sqlserver.Provider{Log: log}) h.registry.AddProvider(mysql.Type, &mysql.Provider{Log: log}) - csvp := &csv.Provider{Log: log, Scratcher: h.databases, Files: h.files} + csvp := &csv.Provider{Log: log, Scratcher: h.pools, Files: h.files} h.registry.AddProvider(csv.TypeCSV, csvp) h.registry.AddProvider(csv.TypeTSV, csvp) h.files.AddDriverDetectors(csv.DetectCSV, csv.DetectTSV) - jsonp := &json.Provider{Log: log, Scratcher: h.databases, Files: h.files} + jsonp := &json.Provider{Log: log, Scratcher: h.pools, Files: h.files} h.registry.AddProvider(json.TypeJSON, jsonp) h.registry.AddProvider(json.TypeJSONA, jsonp) h.registry.AddProvider(json.TypeJSONL, jsonp) @@ -201,7 +201,7 @@ func (h *Helper) init() { json.DetectJSONL(driver.OptIngestSampleSize.Get(nil)), ) - h.registry.AddProvider(xlsx.Type, &xlsx.Provider{Log: log, Scratcher: h.databases, Files: h.files}) + h.registry.AddProvider(xlsx.Type, &xlsx.Provider{Log: log, Scratcher: h.pools, Files: h.files}) h.files.AddDriverDetectors(xlsx.DetectXLSX) h.addUserDrivers() @@ -368,49 +368,49 @@ func (h *Helper) NewCollection(handles ...string) *source.Collection { return coll } -// Open opens a Database for src via h's internal Databases +// Open opens a driver.Pool for src via h's internal Pools // instance: thus subsequent calls to Open may return the -// same Database instance. The opened Database will be closed +// same Pool instance. The opened driver.Pool will be closed // during h.Close. -func (h *Helper) Open(src *source.Source) driver.Database { +func (h *Helper) Open(src *source.Source) driver.Pool { ctx, cancelFn := context.WithTimeout(h.Context, h.dbOpenTimeout) defer cancelFn() - dbase, err := h.Databases().Open(ctx, src) + pool, err := h.Pools().Open(ctx, src) require.NoError(h.T, err) - db, err := dbase.DB(ctx) + db, err := pool.DB(ctx) require.NoError(h.T, err) require.NoError(h.T, db.PingContext(ctx)) - return dbase + return pool } // OpenDB is a convenience method for getting the sql.DB for src. // The returned sql.DB is closed during h.Close, via the closing -// of its parent driver.Database. +// of its parent driver.Pool. func (h *Helper) OpenDB(src *source.Source) *sql.DB { - dbase := h.Open(src) - db, err := dbase.DB(h.Context) + pool := h.Open(src) + db, err := pool.DB(h.Context) require.NoError(h.T, err) return db } -// openNew opens a new Database. It is the caller's responsibility -// to close the returned Database. Unlike method Open, this method +// openNew opens a new driver.Pool. It is the caller's responsibility +// to close the returned Pool. Unlike method Open, this method // will always invoke the driver's Open method. // // Some of Helper's methods (e.g. DropTable) need to use openNew rather -// than Open, as the Database returned by Open can be closed by test code, +// than Open, as the Pool returned by Open can be closed by test code, // potentially causing problems during Cleanup. -func (h *Helper) openNew(src *source.Source) driver.Database { +func (h *Helper) openNew(src *source.Source) driver.Pool { h.Log.Debug("openNew", lga.Src, src) reg := h.Registry() drvr, err := reg.DriverFor(src.Type) require.NoError(h.T, err) - dbase, err := drvr.Open(h.Context, src) + pool, err := drvr.Open(h.Context, src) require.NoError(h.T, err) - return dbase + return pool } // SQLDriverFor is a convenience method to get src's driver.SQLDriver. @@ -436,12 +436,12 @@ func (h *Helper) DriverFor(src *source.Source) driver.Driver { // RowCount returns the result of "SELECT COUNT(*) FROM tbl", // failing h's test on any error. func (h *Helper) RowCount(src *source.Source, tbl string) int64 { - dbase := h.openNew(src) - defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, dbase) + pool := h.openNew(src) + defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, pool) - query := "SELECT COUNT(*) FROM " + dbase.SQLDriver().Dialect().Enquote(tbl) + query := "SELECT COUNT(*) FROM " + pool.SQLDriver().Dialect().Enquote(tbl) var count int64 - db, err := dbase.DB(h.Context) + db, err := pool.DB(h.Context) require.NoError(h.T, err) require.NoError(h.T, db.QueryRowContext(h.Context, query).Scan(&count)) @@ -454,13 +454,13 @@ func (h *Helper) RowCount(src *source.Source, tbl string) int64 { func (h *Helper) CreateTable(dropAfter bool, src *source.Source, tblDef *sqlmodel.TableDef, data ...[]any, ) (affected int64) { - dbase := h.openNew(src) - defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, dbase) + pool := h.openNew(src) + defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, pool) - db, err := dbase.DB(h.Context) + db, err := pool.DB(h.Context) require.NoError(h.T, err) - require.NoError(h.T, dbase.SQLDriver().CreateTable(h.Context, db, tblDef)) + require.NoError(h.T, pool.SQLDriver().CreateTable(h.Context, db, tblDef)) h.T.Logf("Created table %s.%s", src.Handle, tblDef.Name) if dropAfter { @@ -482,11 +482,11 @@ func (h *Helper) Insert(src *source.Source, tbl string, cols []string, records . return 0 } - dbase := h.openNew(src) - defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, dbase) + pool := h.openNew(src) + defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, pool) - drvr := dbase.SQLDriver() - db, err := dbase.DB(h.Context) + drvr := pool.SQLDriver() + db, err := pool.DB(h.Context) require.NoError(h.T, err) conn, err := db.Conn(h.Context) @@ -547,13 +547,13 @@ func (h *Helper) CopyTable( toTable.Table = stringz.UniqTableName(fromTable.Table) } - dbase := h.openNew(src) - defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, dbase) + pool := h.openNew(src) + defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, pool) - db, err := dbase.DB(h.Context) + db, err := pool.DB(h.Context) require.NoError(h.T, err) - copied, err := dbase.SQLDriver().CopyTable( + copied, err := pool.SQLDriver().CopyTable( h.Context, db, fromTable, @@ -577,28 +577,28 @@ func (h *Helper) CopyTable( // DropTable drops tbl from src. func (h *Helper) DropTable(src *source.Source, tbl tablefq.T) { - dbase := h.openNew(src) - defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, dbase) + pool := h.openNew(src) + defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, pool) - db, err := dbase.DB(h.Context) + db, err := pool.DB(h.Context) require.NoError(h.T, err) - require.NoError(h.T, dbase.SQLDriver().DropTable(h.Context, db, tbl, true)) + require.NoError(h.T, pool.SQLDriver().DropTable(h.Context, db, tbl, true)) h.Log.Debug("Dropped table", lga.Target, source.Target(src, tbl.Table)) } // QuerySQL uses libsq.QuerySQL to execute SQL query // against src, returning a sink to which all records have // been written. Typically the db arg is nil, and QuerySQL uses the -// same driver.Database instance as returned by Helper.Open. If db +// same driver.Pool instance as returned by Helper.Open. If db // is non-nil, it is passed to libsq.QuerySQL (e.g. the query needs to // execute against a sql.Tx), and the caller is responsible for closing db. func (h *Helper) QuerySQL(src *source.Source, db sqlz.DB, query string, args ...any) (*RecordSink, error) { - dbase := h.Open(src) + pool := h.Open(src) sink := &RecordSink{} recw := output.NewRecordWriterAdapter(h.Context, sink) - err := libsq.QuerySQL(h.Context, dbase, db, recw, query, args...) + err := libsq.QuerySQL(h.Context, pool, db, recw, query, args...) if err != nil { return nil, err } @@ -623,11 +623,11 @@ func (h *Helper) QuerySLQ(query string, args map[string]string) (*RecordSink, er } qc := &libsq.QueryContext{ - Collection: h.coll, - DBOpener: h.databases, - JoinDBOpener: h.databases, - ScratchDBOpener: h.databases, - Args: args, + Collection: h.coll, + PoolOpener: h.pools, + JoinPoolOpener: h.pools, + ScratchPoolOpener: h.pools, + Args: args, } sink := &RecordSink{} @@ -647,7 +647,7 @@ func (h *Helper) QuerySLQ(query string, args map[string]string) (*RecordSink, er // ExecSQL is a convenience wrapper for sql.DB.Exec that returns the // rows affected, failing on any error. Note that ExecSQL uses the -// same Database instance as returned by h.Open. +// same Pool instance as returned by h.Open. func (h *Helper) ExecSQL(src *source.Source, query string, args ...any) (affected int64) { db := h.OpenDB(src) @@ -688,8 +688,8 @@ func (h *Helper) InsertDefaultRow(src *source.Source, tbl string) { // TruncateTable truncates tbl in src. func (h *Helper) TruncateTable(src *source.Source, tbl string) (affected int64) { - dbase := h.openNew(src) - defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, dbase) + pool := h.openNew(src) + defer lg.WarnIfCloseError(h.Log, lgm.CloseDB, pool) affected, err := h.DriverFor(src).Truncate(h.Context, src, tbl, true) require.NoError(h.T, err) @@ -734,7 +734,7 @@ func (h *Helper) addUserDrivers() { Log: h.Log, DriverDef: userDriverDef, ImportFn: importFn, - Scratcher: h.databases, + Scratcher: h.pools, Files: h.files, } @@ -748,10 +748,10 @@ func (h *Helper) IsMonotable(src *source.Source) bool { return h.DriverFor(src).DriverMetadata().Monotable } -// Databases returns the helper's Databases instance. -func (h *Helper) Databases() *driver.Databases { +// Pools returns the helper's driver.Pools instance. +func (h *Helper) Pools() *driver.Pools { h.init() - return h.databases + return h.pools } // Files returns the helper's Files instance. @@ -762,22 +762,22 @@ func (h *Helper) Files() *source.Files { // SourceMetadata returns metadata for src. func (h *Helper) SourceMetadata(src *source.Source) (*source.Metadata, error) { - dbases, err := h.Databases().Open(h.Context, src) + pools, err := h.Pools().Open(h.Context, src) if err != nil { return nil, err } - return dbases.SourceMetadata(h.Context, false) + return pools.SourceMetadata(h.Context, false) } // TableMetadata returns metadata for src's table. func (h *Helper) TableMetadata(src *source.Source, tbl string) (*source.TableMetadata, error) { - dbases, err := h.Databases().Open(h.Context, src) + pools, err := h.Pools().Open(h.Context, src) if err != nil { return nil, err } - return dbases.TableMetadata(h.Context, tbl) + return pools.TableMetadata(h.Context, tbl) } // DiffDB fails the test if src's metadata is substantially different