mirror of
https://github.com/neilotoole/sq.git
synced 2024-11-30 19:09:13 +03:00
3f6157c4c4
- Switch to slog logger.
491 lines
12 KiB
Go
491 lines
12 KiB
Go
package json
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
stdj "encoding/json"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/neilotoole/sq/libsq/core/lg/lga"
|
|
|
|
"github.com/neilotoole/sq/libsq/core/lg/lgm"
|
|
|
|
"github.com/neilotoole/sq/libsq/core/lg"
|
|
|
|
"github.com/neilotoole/sq/libsq/core/errz"
|
|
"github.com/neilotoole/sq/libsq/core/stringz"
|
|
"github.com/neilotoole/sq/libsq/driver"
|
|
"github.com/neilotoole/sq/libsq/source"
|
|
)
|
|
|
|
// DetectJSON implements source.TypeDetectFunc.
|
|
// The function returns TypeJSON for two varieties of input:.
|
|
func DetectJSON(ctx context.Context, openFn source.FileOpenFunc) (detected source.Type, score float32,
|
|
err error,
|
|
) {
|
|
log := lg.FromContext(ctx)
|
|
var r1, r2 io.ReadCloser
|
|
r1, err = openFn()
|
|
if err != nil {
|
|
return source.TypeNone, 0, errz.Err(err)
|
|
}
|
|
defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r1)
|
|
|
|
dec := stdj.NewDecoder(r1)
|
|
var tok stdj.Token
|
|
tok, err = dec.Token()
|
|
if err != nil {
|
|
return source.TypeNone, 0, nil
|
|
}
|
|
|
|
delim, ok := tok.(stdj.Delim)
|
|
if !ok {
|
|
return source.TypeNone, 0, nil
|
|
}
|
|
|
|
switch delim {
|
|
default:
|
|
return source.TypeNone, 0, nil
|
|
case leftBrace:
|
|
// The input is a single JSON object
|
|
r2, err = openFn()
|
|
|
|
// buf gets a copy of what is read from r2
|
|
buf := &buffer{}
|
|
|
|
if err != nil {
|
|
return source.TypeNone, 0, errz.Err(err)
|
|
}
|
|
defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r2)
|
|
|
|
dec = stdj.NewDecoder(io.TeeReader(r2, buf))
|
|
var m map[string]any
|
|
err = dec.Decode(&m)
|
|
if err != nil {
|
|
return source.TypeNone, 0, nil
|
|
}
|
|
|
|
if dec.More() {
|
|
// The input is supposed to be just a single object, so
|
|
// it shouldn't have more tokens
|
|
return source.TypeNone, 0, nil
|
|
}
|
|
|
|
// If the input is all on a single line, then it could be
|
|
// either JSON or JSONL. For single-line input, prefer JSONL.
|
|
lineCount := stringz.LineCount(bytes.NewReader(buf.b), true)
|
|
switch lineCount {
|
|
case -1:
|
|
// should never happen
|
|
return source.TypeNone, 0, errz.New("unknown problem reading JSON input")
|
|
case 0:
|
|
// should never happen
|
|
return source.TypeNone, 0, errz.New("JSON input is empty")
|
|
case 1:
|
|
// If the input is a JSON object on a single line, it could
|
|
// be TypeJSON or TypeJSONL. In deference to TypeJSONL, we
|
|
// return 0.9 instead of 1.0
|
|
return TypeJSON, 0.9, nil
|
|
default:
|
|
return TypeJSON, 1.0, nil
|
|
}
|
|
|
|
case leftBracket:
|
|
// The input is one or more JSON objects inside an array
|
|
}
|
|
|
|
r2, err = openFn()
|
|
if err != nil {
|
|
return source.TypeNone, 0, errz.Err(err)
|
|
}
|
|
defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r2)
|
|
|
|
sc := newObjectInArrayScanner(r2)
|
|
var validObjCount int
|
|
var obj map[string]any
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return source.TypeNone, 0, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
obj, _, err = sc.next()
|
|
if err != nil {
|
|
return source.TypeNone, 0, ctx.Err()
|
|
}
|
|
|
|
if obj == nil { // end of input
|
|
break
|
|
}
|
|
|
|
validObjCount++
|
|
if validObjCount >= driver.Tuning.SampleSize {
|
|
break
|
|
}
|
|
}
|
|
|
|
if validObjCount > 0 {
|
|
return TypeJSON, 1.0, nil
|
|
}
|
|
|
|
return source.TypeNone, 0, nil
|
|
}
|
|
|
|
func importJSON(ctx context.Context, job importJob) error {
|
|
log := lg.FromContext(ctx)
|
|
|
|
r, err := job.openFn()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer lg.WarnIfCloseError(log, lgm.CloseFileReader, r)
|
|
|
|
drvr := job.destDB.SQLDriver()
|
|
db, err := job.destDB.DB().Conn(ctx)
|
|
if err != nil {
|
|
return errz.Err(err)
|
|
}
|
|
defer lg.WarnIfCloseError(log, lgm.CloseDB, db)
|
|
|
|
proc := newProcessor(job.flatten)
|
|
scan := newObjectInArrayScanner(r)
|
|
|
|
var (
|
|
obj map[string]any
|
|
chunk []byte
|
|
schemaModified bool
|
|
curSchema *importSchema
|
|
insertions []*insertion
|
|
hasMore bool
|
|
)
|
|
|
|
for {
|
|
obj, chunk, err = scan.next()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// obj is returned nil by scan.next when end of input.
|
|
hasMore = obj != nil
|
|
|
|
if schemaModified {
|
|
if !hasMore || scan.objCount >= job.sampleSize {
|
|
log.Debug("Time to (re)build the schema", lga.Line, scan.objCount)
|
|
if curSchema == nil {
|
|
log.Debug("First time building the schema")
|
|
}
|
|
|
|
var newSchema *importSchema
|
|
newSchema, err = proc.buildSchemaFlat()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = execSchemaDelta(ctx, drvr, db, curSchema, newSchema)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// The DB has been updated with the current schema,
|
|
// so we mark it as clean.
|
|
proc.markSchemaClean()
|
|
|
|
curSchema = newSchema
|
|
newSchema = nil
|
|
|
|
insertions, err = proc.buildInsertionsFlat(curSchema)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = execInsertions(ctx, drvr, db, insertions)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if !hasMore {
|
|
// We're done
|
|
break
|
|
}
|
|
}
|
|
|
|
schemaModified, err = proc.processObject(obj, chunk)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Initial schema has not been created: we're still in
|
|
// the sampling phase. So we loop.
|
|
if curSchema == nil {
|
|
continue
|
|
}
|
|
|
|
// If we got this far, the initial schema has already been created.
|
|
if schemaModified {
|
|
// But... the schema has been modified. We could still be in
|
|
// the sampling phase, so we loop.
|
|
continue
|
|
}
|
|
|
|
// The schema exists in the DB, and the current JSON chunk hasn't
|
|
// dirtied the schema, so it's safe to insert the recent rows.
|
|
insertions, err = proc.buildInsertionsFlat(curSchema)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = execInsertions(ctx, drvr, db, insertions)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if scan.objCount == 0 {
|
|
return errz.New("empty JSON input")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// objectsInArrayScanner scans JSON text that consists of an array of
|
|
// JSON objects, returning the decoded object and the chunk of JSON
|
|
// that it was scanned from. Example input: [{a:1},{a:2},{a:3}].
|
|
type objectsInArrayScanner struct {
|
|
// buf will get all the data that the JSON decoder reads.
|
|
// buf's role is to keep track of JSON text that has already been
|
|
// consumed by dec, so that we can return the raw JSON chunk
|
|
// when returning its object. Note that on each invocation
|
|
// of next, the buffer is trimmed, otherwise it would grow
|
|
// unbounded.
|
|
buf *buffer
|
|
|
|
// bufOffset is the offset of buf's byte slice relative to the
|
|
// entire input stream. This is necessary given that we trim
|
|
// buf to prevent unbounded growth.
|
|
bufOffset int
|
|
|
|
// dec is the stdlib json decoder.
|
|
dec *stdj.Decoder
|
|
|
|
// curDecPos is the current position of the decoder in the
|
|
// input stream.
|
|
curDecPos int
|
|
|
|
// prevDecPos holds the previous position of the decoder
|
|
// in the input stream.
|
|
prevDecPos int
|
|
|
|
// decBuf holds the value of dec.Buffered, which allows us
|
|
// to look ahead at the upcoming decoder values.
|
|
decBuf []byte
|
|
|
|
// objCount is the count of objects processed by method next.
|
|
objCount int
|
|
}
|
|
|
|
// newObjectInArrayScanner returns a new instance that
|
|
// reads from r.
|
|
func newObjectInArrayScanner(r io.Reader) *objectsInArrayScanner {
|
|
buf := &buffer{b: []byte{}}
|
|
// Everything that dec reads from r is written
|
|
// to buf via the TeeReader.
|
|
dec := stdj.NewDecoder(io.TeeReader(r, buf))
|
|
|
|
return &objectsInArrayScanner{buf: buf, dec: dec}
|
|
}
|
|
|
|
// next scans the next object from the reader. The returned chunk holds
|
|
// the raw JSON that the obj was decoded from. When no more objects,
|
|
// obj and chunk are nil.
|
|
func (s *objectsInArrayScanner) next() (obj map[string]any, chunk []byte, err error) {
|
|
var tok stdj.Token
|
|
|
|
if s.bufOffset == 0 {
|
|
// This is only invoked on the first call to next().
|
|
|
|
// The first token must be left-bracket '['
|
|
tok, err = requireDelimToken(s.dec, leftBracket)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
s.prevDecPos = int(s.dec.InputOffset())
|
|
|
|
// Sync s.buf with the position in the stream
|
|
s.buf.b = s.buf.b[s.prevDecPos:]
|
|
s.bufOffset = s.prevDecPos
|
|
}
|
|
|
|
more := s.dec.More()
|
|
if !more {
|
|
// We've reached the end of the stream.
|
|
|
|
// Make sure there's no trailing invalid stuff
|
|
s.decBuf, err = io.ReadAll(s.dec.Buffered())
|
|
if err != nil {
|
|
return nil, nil, errz.Err(err)
|
|
}
|
|
|
|
trimmed := bytes.TrimSpace(s.decBuf)
|
|
switch {
|
|
case len(trimmed) == 0:
|
|
case len(trimmed) == 1 && trimmed[0] == ']':
|
|
default:
|
|
return nil, nil, errz.Errorf("invalid JSON: non-whitespace trailing input: %s", string(s.decBuf))
|
|
}
|
|
|
|
return nil, nil, nil
|
|
}
|
|
|
|
// Decode the next json into obj.
|
|
err = s.dec.Decode(&obj)
|
|
if err != nil {
|
|
return nil, nil, errz.Err(err)
|
|
}
|
|
|
|
s.curDecPos = int(s.dec.InputOffset())
|
|
s.decBuf, err = io.ReadAll(s.dec.Buffered())
|
|
if err != nil {
|
|
return nil, nil, errz.Err(err)
|
|
}
|
|
|
|
more = s.dec.More()
|
|
|
|
// Peek ahead in the decoder buffer
|
|
delimIndex, delim := nextDelim(s.decBuf, 0, true)
|
|
if delimIndex == -1 {
|
|
return nil, nil, errz.New("invalid JSON: additional input expected")
|
|
}
|
|
|
|
// If end of input, delim should be right-bracket.
|
|
// If there's another object to come, delim should be comma.
|
|
// If delim not found, or some other delim, it's an error.
|
|
switch delim {
|
|
default:
|
|
// bad input
|
|
return nil, nil, errz.Errorf("invalid JSON: expected comma or right-bracket ']' token but got: %s",
|
|
formatToken(tok))
|
|
|
|
case ']':
|
|
// should be end of input
|
|
_, err = requireDelimToken(s.dec, rightBracket)
|
|
if err != nil {
|
|
return nil, nil, errz.Err(err)
|
|
}
|
|
|
|
if more {
|
|
return nil, nil, errz.New("unexpected additional JSON input after closing ']'")
|
|
}
|
|
|
|
// Make sure there's no invalid trailing stuff
|
|
s.decBuf, err = io.ReadAll(s.dec.Buffered())
|
|
if err != nil {
|
|
return nil, nil, errz.Err(err)
|
|
}
|
|
|
|
if len(bytes.TrimSpace(s.decBuf)) != 0 {
|
|
return nil, nil, errz.Errorf("invalid JSON: non-whitespace trailing input: %s", string(s.decBuf))
|
|
}
|
|
|
|
case ',':
|
|
// Expect more objects to come
|
|
if !more {
|
|
return nil, nil, errz.New("invalid JSON: expected additional tokens input after comma")
|
|
}
|
|
}
|
|
|
|
// Note that we re-use the vars delimIndex and delim here.
|
|
// Above us, these vars referred to s.decBuf, not s.buf as here.
|
|
delimIndex, delim = nextDelim(s.buf.b, s.prevDecPos-s.bufOffset, false)
|
|
if delimIndex == -1 {
|
|
return nil, nil, errz.Errorf("invalid JSON: expected delimiter token")
|
|
}
|
|
|
|
switch delim {
|
|
default:
|
|
return nil, nil,
|
|
errz.Errorf("invalid JSON: expected comma or left-brace '{' but got: %s", string(delim))
|
|
case '{':
|
|
}
|
|
|
|
chunk = make([]byte, s.curDecPos-s.prevDecPos-delimIndex)
|
|
copy(chunk, s.buf.b[s.prevDecPos-s.bufOffset+delimIndex:s.curDecPos-s.bufOffset])
|
|
|
|
if !stdj.Valid(chunk) {
|
|
// Should never happen; should be able to delete this check
|
|
return nil, nil, errz.Errorf("invalid JSON")
|
|
}
|
|
|
|
// If processing a large stream, s.buf.b will grow unbounded.
|
|
// So we snip it down to size and use s.bufOffset to track
|
|
// the nominal size.
|
|
s.buf.b = s.buf.b[s.curDecPos-s.bufOffset:]
|
|
s.bufOffset = s.curDecPos
|
|
s.prevDecPos = s.curDecPos
|
|
|
|
s.objCount++
|
|
return obj, chunk, nil
|
|
}
|
|
|
|
// buffer is a basic implementation of io.Writer.
|
|
type buffer struct {
|
|
b []byte
|
|
}
|
|
|
|
// Write implements io.Writer.
|
|
func (b *buffer) Write(p []byte) (n int, err error) {
|
|
b.b = append(b.b, p...)
|
|
return len(p), nil
|
|
}
|
|
|
|
// requireDelimToken invokes dec.Token, returning an error if the
|
|
// token is not delim.
|
|
func requireDelimToken(dec *stdj.Decoder, delim stdj.Delim) (stdj.Token, error) {
|
|
tok, err := dec.Token()
|
|
if err != nil {
|
|
return tok, err
|
|
}
|
|
|
|
if tok != delim {
|
|
return tok, errz.Errorf("expected next token to be delimiter {%s} but got: %s", string(delim), formatToken(tok))
|
|
}
|
|
|
|
return tok, nil
|
|
}
|
|
|
|
// formatToken returns a string representation of tok.
|
|
func formatToken(tok stdj.Token) string {
|
|
switch v := tok.(type) {
|
|
case string:
|
|
return v
|
|
case stdj.Delim:
|
|
return string(v)
|
|
default:
|
|
return fmt.Sprintf("%v", v)
|
|
}
|
|
}
|
|
|
|
// nextDelim returns the index in b of the first JSON
|
|
// delimiter (left or right bracket, left or right brace)
|
|
// occurring from index start onwards. If arg comma is true,
|
|
// comma is also considered a delimiter. If no delimiter
|
|
// found, (-1, 0) is returned.
|
|
func nextDelim(b []byte, start int, comma bool) (i int, delim byte) {
|
|
for i = start; i < len(b); i++ {
|
|
switch b[i] {
|
|
case '{', '}', '[', ']':
|
|
return i, b[i]
|
|
case ',':
|
|
if comma {
|
|
return i, b[i]
|
|
}
|
|
}
|
|
}
|
|
|
|
return -1, 0
|
|
}
|