sq/drivers/csv/ingest.go

322 lines
7.3 KiB
Go
Raw Normal View History

2020-08-06 20:58:47 +03:00
package csv
import (
"context"
"encoding/csv"
"errors"
2020-08-06 20:58:47 +03:00
"io"
"time"
2020-08-06 20:58:47 +03:00
"unicode/utf8"
"github.com/neilotoole/sq/libsq"
"github.com/neilotoole/sq/libsq/core/errz"
"github.com/neilotoole/sq/libsq/core/kind"
"github.com/neilotoole/sq/libsq/core/lg"
"github.com/neilotoole/sq/libsq/core/lg/lga"
"github.com/neilotoole/sq/libsq/core/lg/lgm"
"github.com/neilotoole/sq/libsq/core/options"
"github.com/neilotoole/sq/libsq/core/record"
"github.com/neilotoole/sq/libsq/core/stringz"
"github.com/neilotoole/sq/libsq/core/tuning"
2020-08-06 20:58:47 +03:00
"github.com/neilotoole/sq/libsq/driver"
"github.com/neilotoole/sq/libsq/source"
"github.com/neilotoole/sq/libsq/source/drivertype"
2020-08-06 20:58:47 +03:00
)
// OptEmptyAsNull determines if an empty CSV field is treated as NULL
// or as the zero value for the kind of that field.
var OptEmptyAsNull = options.NewBool(
"driver.csv.empty-as-null",
nil,
true,
"Treat ingest empty CSV fields as NULL",
`When true, empty CSV fields are treated as NULL. When false,
the zero value for that type is used, e.g. empty string or 0.`,
options.TagSource,
options.TagIngestMutate,
"csv",
)
// OptDelim specifies the CSV delimiter to use.
var OptDelim = options.NewString(
"driver.csv.delim",
nil,
delimCommaKey,
nil,
"Delimiter for ingest CSV data",
`Delimiter to use for CSV files. Default is "comma".
Possible values are: comma, space, pipe, tab, colon, semi, period.`,
options.TagSource,
options.TagIngestMutate,
"csv",
)
2020-08-06 20:58:47 +03:00
// ingestCSV loads the src CSV data into destGrip.
func (d *driveri) ingestCSV(ctx context.Context, src *source.Source, destGrip driver.Grip) error {
log := lg.FromContext(ctx)
startUTC := time.Now().UTC()
2020-08-06 20:58:47 +03:00
rc, err := d.files.NewReader(ctx, src, true)
if err != nil {
return err
}
defer lg.WarnIfCloseError(log, lgm.CloseFileReader, rc)
delim, err := getDelimiter(src)
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
cr := newCSVReader(rc, delim)
recs, err := readRecords(cr, driver.OptIngestSampleSize.Get(src.Options))
if err != nil {
return err
}
2020-08-06 20:58:47 +03:00
headerPresent, err := hasHeaderRow(ctx, recs, src.Options)
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
var header []string
if headerPresent {
header = recs[0]
2020-08-06 20:58:47 +03:00
// We're done with the first row
recs = recs[1:]
2020-08-06 20:58:47 +03:00
} else {
// The CSV file does not have a header record. We will generate
// col names [A,B,C...].
header = make([]string, len(recs[0]))
for i := range recs[0] {
header[i] = stringz.GenerateAlphaColName(i, false)
2020-08-06 20:58:47 +03:00
}
}
if header, err = driver.MungeIngestColNames(ctx, header); err != nil {
return err
}
kinds, mungers, err := detectColKinds(recs)
if err != nil {
return err
}
2020-08-06 20:58:47 +03:00
// And now we need to create the dest table in scratchDB
tblDef := createTblDef(source.MonotableName, header, kinds)
2020-08-06 20:58:47 +03:00
db, err := destGrip.DB(ctx)
if err != nil {
return err
}
err = destGrip.SQLDriver().CreateTable(ctx, db, tblDef)
2020-08-06 20:58:47 +03:00
if err != nil {
return errz.Wrap(err, "csv: failed to create dest scratch table")
}
recMeta, err := getIngestRecMeta(ctx, destGrip, tblDef)
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
if OptEmptyAsNull.Get(src.Options) {
configureEmptyNullMunge(mungers, recMeta)
}
insertWriter := libsq.NewDBWriter(
libsq.MsgIngestRecords,
destGrip,
tblDef.Name,
tuning.OptRecBufSize.Get(destGrip.Source().Options),
)
err = execInsert(ctx, insertWriter, recMeta, mungers, recs, cr)
2020-08-06 20:58:47 +03:00
if err != nil {
return err
}
inserted, err := insertWriter.Wait()
if err != nil {
return err
}
log.Info("Ingested rows",
lga.Count, inserted,
lga.Elapsed, time.Since(startUTC).Round(time.Millisecond),
lga.Target, source.Target(destGrip.Source(), tblDef.Name),
)
2020-08-06 20:58:47 +03:00
return nil
}
// configureEmptyNullMunge configures mungers to that empty string is
// munged to nil.
func configureEmptyNullMunge(mungers []kind.MungeFunc, recMeta record.Meta) {
kinds := recMeta.Kinds()
for i := range mungers {
if kinds[i] == kind.Text {
if mungers[i] == nil {
mungers[i] = kind.MungeEmptyStringAsNil
continue
}
// There's already a munger: wrap it
existing := mungers[i]
mungers[i] = func(v any) (any, error) {
var err error
v, err = existing(v)
if err != nil {
return v, err
}
return kind.MungeEmptyStringAsNil(v)
}
}
}
}
const (
delimCommaKey = "comma"
delimComma = ','
delimSpaceKey = "space"
delimSpace = ' '
delimPipeKey = "pipe"
delimPipe = '|'
delimTabKey = "tab"
delimTab = '\t'
delimColonKey = "colon"
delimColon = ':'
delimSemiKey = "semi"
delimSemi = ';'
delimPeriodKey = "period"
delimPeriod = '.'
)
// NamedDelims returns the named delimiters, such as [comma, tab, pipe...].
func NamedDelims() []string {
return []string{
delimCommaKey,
delimTabKey,
delimSemiKey,
delimColonKey,
delimSpaceKey,
delimPipeKey,
delimPeriodKey,
}
}
2020-08-06 20:58:47 +03:00
// namedDelimiters is map of named delimiter strings to
// rune value. For example, "comma" maps to ',' and "pipe" maps to '|'.
var namedDelimiters = map[string]rune{
delimCommaKey: delimComma,
delimSpaceKey: delimSpace,
delimPipeKey: delimPipe,
delimTabKey: delimTab,
delimColonKey: delimColon,
delimSemiKey: delimSemi,
delimPeriodKey: delimPeriod,
2020-08-06 20:58:47 +03:00
}
// getDelimiter returns the delimiter for src. An explicit
// delimiter value may be set in src.Options; otherwise
// the default for the source is returned.
func getDelimiter(src *source.Source) (rune, error) {
delim, ok, err := getDelimFromOptions(src.Options)
if err != nil {
return 0, err
}
if ok {
return delim, nil
}
if src.Type == drivertype.TSV {
2020-08-06 20:58:47 +03:00
return '\t', nil
}
// default is comma
return ',', nil
}
// getDelimFromOptions returns ok as true and the delimiter rune if a
// valid value is provided in src.Options, returns ok as false if
// no valid value provided, and an error if the provided value is invalid.
func getDelimFromOptions(opts options.Options) (r rune, ok bool, err error) {
if len(opts) == 0 {
return 0, false, nil
}
if !OptDelim.IsSet(opts) {
return 0, false, nil
}
2020-08-06 20:58:47 +03:00
val := OptDelim.Get(opts)
2020-08-06 20:58:47 +03:00
if len(val) == 1 {
r, _ = utf8.DecodeRuneInString(val)
return r, true, nil
}
r, ok = namedDelimiters[val]
if !ok {
err = errz.Errorf("unknown delimiter constant {%s}", val)
2020-08-06 20:58:47 +03:00
return 0, false, err
}
return r, true, nil
}
func newCSVReader(r io.Reader, delim rune) *csv.Reader {
// We add the CR filter reader to deal with CSV files exported
// from Excel which can have the DOS-style \r EOL markers.
cr := csv.NewReader(&crFilterReader{r: r})
cr.Comma = delim
return cr
}
// crFilterReader is a reader whose Read method converts
// standalone carriage return '\r' bytes to newline '\n'.
// CRLF "\r\n" sequences are untouched.
// This is useful for reading from DOS format, e.g. a TSV
// file exported by Microsoft Excel.
type crFilterReader struct {
r io.Reader
}
func (r *crFilterReader) Read(p []byte) (n int, err error) {
n, err = r.r.Read(p)
for i := 0; i < n; i++ {
if p[i] == 13 {
if i+1 < n && p[i+1] == 10 {
continue // it's \r\n
}
// it's just \r by itself, replace
p[i] = 10
}
}
return n, err
}
// readRecords reads a maximum of n records from cr.
func readRecords(cr *csv.Reader, n int) ([][]string, error) {
recs := make([][]string, 0, n)
for i := 0; i < n; i++ {
rec, err := cr.Read()
if err != nil {
if errors.Is(err, io.EOF) {
return recs, nil
}
return nil, errz.Err(err)
}
recs = append(recs, rec)
}
return recs, nil
}