1
1
mirror of https://github.com/wader/fq.git synced 2024-12-01 02:30:32 +03:00
fq/format/avro/ocf.go

125 lines
3.3 KiB
Go
Raw Normal View History

2021-12-28 22:01:53 +03:00
package avro
import (
"github.com/wader/fq/format"
2021-12-30 01:12:36 +03:00
"github.com/wader/fq/format/avro/decoders"
2021-12-28 22:05:10 +03:00
"github.com/wader/fq/format/avro/schema"
2021-12-28 22:01:53 +03:00
"github.com/wader/fq/format/registry"
"github.com/wader/fq/pkg/decode"
2021-12-28 22:05:10 +03:00
"github.com/wader/fq/pkg/scalar"
2021-12-28 22:01:53 +03:00
)
var jsonGroup decode.Group
func init() {
registry.MustRegister(decode.Format{
Name: format.AVRO_OCF,
Description: "Avro object container file",
Groups: []string{format.PROBE},
2021-12-28 22:05:10 +03:00
DecodeFn: avroDecodeOCF,
2021-12-28 22:01:53 +03:00
Dependencies: []decode.Dependency{
{Names: []string{format.JSON}, Group: &jsonGroup},
},
})
}
2021-12-28 22:05:10 +03:00
func ScalarDescription(description string) scalar.Mapper {
return scalar.Fn(func(s scalar.S) (scalar.S, error) {
s.Description = description
return s, nil
})
}
2021-12-28 22:01:53 +03:00
2021-12-28 22:05:10 +03:00
type HeaderData struct {
Schema *schema.SimplifiedSchema
Codec string
Sync []byte
2021-12-28 22:01:53 +03:00
}
2021-12-28 22:05:10 +03:00
func decodeHeader(d *decode.D) HeaderData {
var headerData HeaderData
// Header is encoded in avro so could use avro decoder, but doing it manually so we can
// keep asserts and treating schema as JSON
2021-12-28 22:01:53 +03:00
d.FieldRawLen("magic", 4*8, d.AssertBitBuf([]byte{'O', 'b', 'j', 1}))
var blockCount int64 = -1
d.FieldStructArrayLoop("meta", "block",
func() bool { return blockCount != 0 },
func(d *decode.D) {
2021-12-30 01:12:36 +03:00
blockCount = d.FieldSFn("count", decoders.VarZigZag)
2021-12-28 22:01:53 +03:00
// If its negative, then theres another long representing byte size
if blockCount < 0 {
blockCount *= -1
2021-12-30 01:12:36 +03:00
d.FieldSFn("size", decoders.VarZigZag)
2021-12-28 22:01:53 +03:00
}
if blockCount == 0 {
return
}
var i int64 = 0
d.FieldStructArrayLoop("entries", "entry", func() bool { return i < blockCount }, func(d *decode.D) {
2021-12-30 01:12:36 +03:00
keyL := d.FieldSFn("key_len", decoders.VarZigZag)
2021-12-28 22:01:53 +03:00
key := d.FieldUTF8("key", int(keyL))
2021-12-30 01:12:36 +03:00
valL := d.FieldSFn("value_len", decoders.VarZigZag)
2021-12-28 22:01:53 +03:00
if key == "avro.schema" {
2021-12-28 22:05:10 +03:00
v, _ := d.FieldFormatLen("value", valL*8, jsonGroup, nil)
s, err := schema.SchemaFromJson(v.V.(*scalar.S).Actual)
headerData.Schema = &s
if err != nil {
d.Fatalf("Failed to parse schema: %s", err)
}
} else if key == "avro.codec" {
headerData.Codec = d.FieldUTF8("value", int(valL))
2021-12-28 22:01:53 +03:00
} else {
d.FieldUTF8("value", int(valL))
}
i++
})
})
2021-12-28 22:05:10 +03:00
if headerData.Schema == nil {
d.Fatalf("No schema found in header")
}
if headerData.Codec == "null" {
headerData.Codec = ""
}
2021-12-28 22:01:53 +03:00
syncbb := d.FieldRawLen("sync", 16*8)
2021-12-28 22:05:10 +03:00
var err error
headerData.Sync, err = syncbb.BytesLen(16)
2021-12-28 22:01:53 +03:00
if err != nil {
d.Fatalf("unable to read sync bytes: %v", err)
}
2021-12-28 22:05:10 +03:00
return headerData
}
func avroDecodeOCF(d *decode.D, in interface{}) interface{} {
header := decodeHeader(d)
2021-12-30 01:12:36 +03:00
decodeFn, err := decoders.DecodeFnForSchema(*header.Schema)
2021-12-28 22:05:10 +03:00
if err != nil {
d.Fatalf("unable to create codec: %v", err)
}
2021-12-28 22:01:53 +03:00
d.FieldStructArrayLoop("blocks", "block", func() bool { return d.NotEnd() }, func(d *decode.D) {
2021-12-30 01:12:36 +03:00
count := d.FieldSFn("count", decoders.VarZigZag)
2021-12-28 22:05:10 +03:00
if count <= 0 {
return
}
2021-12-30 01:12:36 +03:00
size := d.FieldSFn("size", decoders.VarZigZag)
2021-12-28 22:05:10 +03:00
// Currently not supporting encodings.
if header.Codec != "" {
d.FieldRawLen("data", size*8, ScalarDescription(header.Codec+" encoded"))
} else {
i := int64(0)
d.FieldArrayLoop("data", func() bool { return i < count }, func(d *decode.D) {
2021-12-30 01:12:36 +03:00
decodeFn("datum", d)
2021-12-28 22:05:10 +03:00
i += 1
})
}
d.FieldRawLen("sync", 16*8, d.AssertBitBuf(header.Sync))
2021-12-28 22:01:53 +03:00
})
return nil
}