mirror of
https://github.com/wader/fq.git
synced 2024-12-19 11:21:47 +03:00
e9d9f8aef9
Rename s/interface{}/any/g Preparation for using generics in decode API and native jq funcations etc Remove some unused linter ignores as linter has been fixed
162 lines
4.2 KiB
Go
162 lines
4.2 KiB
Go
package avro
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/flate"
|
|
"embed"
|
|
"hash/crc32"
|
|
|
|
"github.com/golang/snappy"
|
|
"github.com/wader/fq/format"
|
|
"github.com/wader/fq/format/avro/decoders"
|
|
"github.com/wader/fq/format/avro/schema"
|
|
"github.com/wader/fq/format/registry"
|
|
"github.com/wader/fq/pkg/bitio"
|
|
"github.com/wader/fq/pkg/decode"
|
|
"github.com/wader/fq/pkg/scalar"
|
|
)
|
|
|
|
//go:embed avro_ocf.jq
|
|
var avroOcfFS embed.FS
|
|
|
|
func init() {
|
|
registry.MustRegister(decode.Format{
|
|
Name: format.AVRO_OCF,
|
|
Description: "Avro object container file",
|
|
Groups: []string{format.PROBE},
|
|
DecodeFn: decodeAvroOCF,
|
|
Functions: []string{"_help"},
|
|
Files: avroOcfFS,
|
|
})
|
|
}
|
|
|
|
type HeaderData struct {
|
|
Schema schema.SimplifiedSchema
|
|
Codec string
|
|
Sync []byte
|
|
}
|
|
|
|
const headerSchemaSpec = `
|
|
{
|
|
"type": "record",
|
|
"name": "org.apache.avro.file.Header",
|
|
"fields": [
|
|
{"name": "meta", "type": {"type": "map", "values": "string"}},
|
|
{"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}}
|
|
]
|
|
}`
|
|
|
|
func decodeHeader(d *decode.D) HeaderData {
|
|
d.FieldRawLen("magic", 4*8, d.AssertBitBuf([]byte{'O', 'b', 'j', 1}))
|
|
|
|
var headerData HeaderData
|
|
|
|
headerSchema, err := schema.FromSchemaString(headerSchemaSpec)
|
|
if err != nil {
|
|
d.Fatalf("Failed to parse header schema: %v", err)
|
|
}
|
|
decodeHeaderFn, err := decoders.DecodeFnForSchema(headerSchema)
|
|
if err != nil {
|
|
d.Fatalf("failed to parse header: %v", err)
|
|
}
|
|
|
|
header := decodeHeaderFn("header", d)
|
|
headerRecord, ok := header.(map[string]any)
|
|
if !ok {
|
|
d.Fatalf("header is not a map")
|
|
}
|
|
meta, ok := headerRecord["meta"].(map[string]any)
|
|
if !ok {
|
|
d.Fatalf("header.meta is not a map")
|
|
}
|
|
|
|
headerData.Schema, err = schema.FromSchemaString(meta["avro.schema"].(string))
|
|
if err != nil {
|
|
d.Fatalf("failed to parse schema: %v", err)
|
|
}
|
|
if codec, ok := meta["avro.codec"]; ok {
|
|
headerData.Codec, ok = codec.(string)
|
|
if !ok {
|
|
d.Fatalf("avro.codec is not a string")
|
|
}
|
|
} else {
|
|
headerData.Codec = "null"
|
|
}
|
|
|
|
headerData.Sync, ok = headerRecord["sync"].([]byte)
|
|
if !ok {
|
|
d.Fatalf("header.sync is not a byte array")
|
|
}
|
|
return headerData
|
|
}
|
|
|
|
func decodeBlockCodec(d *decode.D, dataSize int64, codec string) *bytes.Buffer {
|
|
bb := &bytes.Buffer{}
|
|
if codec == "deflate" {
|
|
br := d.FieldRawLen("compressed", dataSize*8)
|
|
d.MustCopy(bb, flate.NewReader(bitio.NewIOReader(br)))
|
|
} else if codec == "snappy" {
|
|
// Everything but last 4 bytes which are the checksum
|
|
n := dataSize - 4
|
|
br := d.FieldRawLen("compressed", n*8)
|
|
|
|
// This could be simplified to be similar to deflate, however snappy's reader only works for streaming frames,
|
|
// not block data. See https://github.com/google/snappy/blob/main/framing_format.txt for details.
|
|
compressed := make([]byte, n)
|
|
if _, err := bitio.ReadFull(br, compressed, n*8); err != nil {
|
|
d.Fatalf("failed reading compressed data %v", err)
|
|
}
|
|
decompressed, err := snappy.Decode(nil, compressed)
|
|
if err != nil {
|
|
d.Fatalf("failed decompressing data: %v", err)
|
|
}
|
|
d.MustCopy(bb, bytes.NewReader(decompressed))
|
|
|
|
// Check the checksum
|
|
crc32W := crc32.NewIEEE()
|
|
d.MustCopy(crc32W, bytes.NewReader(bb.Bytes()))
|
|
d.FieldU32("crc", d.ValidateUBytes(crc32W.Sum(nil)), scalar.ActualHex)
|
|
} else {
|
|
// Unknown codec, just dump the compressed data.
|
|
d.FieldRawLen("compressed", dataSize*8, scalar.Description(codec+" encoded"))
|
|
return nil
|
|
}
|
|
return bb
|
|
}
|
|
|
|
func decodeAvroOCF(d *decode.D, in any) any {
|
|
header := decodeHeader(d)
|
|
|
|
decodeFn, err := decoders.DecodeFnForSchema(header.Schema)
|
|
if err != nil {
|
|
d.Fatalf("unable to create codec: %v", err)
|
|
}
|
|
|
|
d.FieldStructArrayLoop("blocks", "block", func() bool { return d.NotEnd() }, func(d *decode.D) {
|
|
count := d.FieldSFn("count", decoders.VarZigZag)
|
|
if count <= 0 {
|
|
return
|
|
}
|
|
size := d.FieldSFn("size", decoders.VarZigZag)
|
|
i := int64(0)
|
|
|
|
if header.Codec != "null" {
|
|
if bb := decodeBlockCodec(d, size, header.Codec); bb != nil {
|
|
d.FieldArrayRootBitBufFn("data", bitio.NewBitReader(bb.Bytes(), -1), func(d *decode.D) {
|
|
for ; i < count; i++ {
|
|
decodeFn("data", d)
|
|
}
|
|
})
|
|
}
|
|
} else {
|
|
d.FieldArrayLoop("data", func() bool { return i < count }, func(d *decode.D) {
|
|
decodeFn("datum", d)
|
|
i++
|
|
})
|
|
}
|
|
d.FieldRawLen("sync", 16*8, d.AssertBitBuf(header.Sync))
|
|
})
|
|
|
|
return nil
|
|
}
|