1
1
mirror of https://github.com/wader/fq.git synced 2024-12-01 02:30:32 +03:00
fq/format/avro/avro_ocf.go

162 lines
4.2 KiB
Go
Raw Normal View History

2021-12-28 22:01:53 +03:00
package avro
import (
2022-02-08 08:03:50 +03:00
"bytes"
"compress/flate"
"embed"
2022-02-10 05:46:12 +03:00
"hash/crc32"
2022-02-08 08:03:50 +03:00
"github.com/golang/snappy"
2021-12-28 22:01:53 +03:00
"github.com/wader/fq/format"
2021-12-30 01:12:36 +03:00
"github.com/wader/fq/format/avro/decoders"
2021-12-28 22:05:10 +03:00
"github.com/wader/fq/format/avro/schema"
2021-12-28 22:01:53 +03:00
"github.com/wader/fq/format/registry"
2022-02-08 08:03:50 +03:00
"github.com/wader/fq/pkg/bitio"
2021-12-28 22:01:53 +03:00
"github.com/wader/fq/pkg/decode"
2021-12-28 22:05:10 +03:00
"github.com/wader/fq/pkg/scalar"
2021-12-28 22:01:53 +03:00
)
//go:embed avro_ocf.jq
var avroOcfFS embed.FS
2021-12-28 22:01:53 +03:00
func init() {
registry.MustRegister(decode.Format{
Name: format.AVRO_OCF,
Description: "Avro object container file",
Groups: []string{format.PROBE},
2022-02-08 08:03:50 +03:00
DecodeFn: decodeAvroOCF,
Functions: []string{"_help"},
Files: avroOcfFS,
2021-12-28 22:01:53 +03:00
})
}
2021-12-28 22:05:10 +03:00
type HeaderData struct {
Schema schema.SimplifiedSchema
2021-12-28 22:05:10 +03:00
Codec string
Sync []byte
2021-12-28 22:01:53 +03:00
}
const headerSchemaSpec = `
{
"type": "record",
"name": "org.apache.avro.file.Header",
"fields": [
{"name": "meta", "type": {"type": "map", "values": "string"}},
{"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}}
]
}`
2021-12-28 22:05:10 +03:00
func decodeHeader(d *decode.D) HeaderData {
d.FieldRawLen("magic", 4*8, d.AssertBitBuf([]byte{'O', 'b', 'j', 1}))
2021-12-28 22:05:10 +03:00
var headerData HeaderData
headerSchema, err := schema.FromSchemaString(headerSchemaSpec)
if err != nil {
d.Fatalf("Failed to parse header schema: %v", err)
}
decodeHeaderFn, err := decoders.DecodeFnForSchema(headerSchema)
if err != nil {
d.Fatalf("failed to parse header: %v", err)
}
2021-12-28 22:01:53 +03:00
header := decodeHeaderFn("header", d)
headerRecord, ok := header.(map[string]interface{})
if !ok {
d.Fatalf("header is not a map")
}
meta, ok := headerRecord["meta"].(map[string]interface{})
if !ok {
d.Fatalf("header.meta is not a map")
2021-12-28 22:05:10 +03:00
}
headerData.Schema, err = schema.FromSchemaString(meta["avro.schema"].(string))
if err != nil {
d.Fatalf("failed to parse schema: %v", err)
}
2022-02-08 08:03:50 +03:00
if codec, ok := meta["avro.codec"]; ok {
headerData.Codec, ok = codec.(string)
if !ok {
d.Fatalf("avro.codec is not a string")
}
} else {
2022-02-08 08:03:50 +03:00
headerData.Codec = "null"
2021-12-28 22:05:10 +03:00
}
headerData.Sync, ok = headerRecord["sync"].([]byte)
if !ok {
d.Fatalf("header.sync is not a byte array")
2021-12-28 22:01:53 +03:00
}
2021-12-28 22:05:10 +03:00
return headerData
}
2022-02-10 05:39:31 +03:00
func decodeBlockCodec(d *decode.D, dataSize int64, codec string) *bytes.Buffer {
bb := &bytes.Buffer{}
if codec == "deflate" {
br := d.FieldRawLen("compressed", dataSize*8)
d.MustCopy(bb, flate.NewReader(bitio.NewIOReader(br)))
} else if codec == "snappy" {
// Everything but last 4 bytes which are the checksum
n := dataSize - 4
br := d.FieldRawLen("compressed", n*8)
2022-02-10 17:55:56 +03:00
// This could be simplified to be similar to deflate, however snappy's reader only works for streaming frames,
// not block data. See https://github.com/google/snappy/blob/main/framing_format.txt for details.
2022-02-10 05:39:31 +03:00
compressed := make([]byte, n)
if _, err := bitio.ReadFull(br, compressed, n*8); err != nil {
d.Fatalf("failed reading compressed data %v", err)
}
decompressed, err := snappy.Decode(nil, compressed)
if err != nil {
d.Fatalf("failed decompressing data: %v", err)
}
d.MustCopy(bb, bytes.NewReader(decompressed))
// Check the checksum
crc32W := crc32.NewIEEE()
d.MustCopy(crc32W, bytes.NewReader(bb.Bytes()))
d.FieldU32("crc", d.ValidateUBytes(crc32W.Sum(nil)), scalar.Hex)
} else {
// Unknown codec, just dump the compressed data.
d.FieldRawLen("compressed", dataSize*8, scalar.Description(codec+" encoded"))
return nil
}
return bb
}
2022-02-08 08:03:50 +03:00
func decodeAvroOCF(d *decode.D, in interface{}) interface{} {
2021-12-28 22:05:10 +03:00
header := decodeHeader(d)
decodeFn, err := decoders.DecodeFnForSchema(header.Schema)
2021-12-28 22:05:10 +03:00
if err != nil {
d.Fatalf("unable to create codec: %v", err)
}
2021-12-28 22:01:53 +03:00
d.FieldStructArrayLoop("blocks", "block", func() bool { return d.NotEnd() }, func(d *decode.D) {
2021-12-30 01:12:36 +03:00
count := d.FieldSFn("count", decoders.VarZigZag)
2021-12-28 22:05:10 +03:00
if count <= 0 {
return
}
2021-12-30 01:12:36 +03:00
size := d.FieldSFn("size", decoders.VarZigZag)
2022-02-08 08:03:50 +03:00
i := int64(0)
2022-02-10 05:39:31 +03:00
if header.Codec != "null" {
if bb := decodeBlockCodec(d, size, header.Codec); bb != nil {
d.FieldArrayRootBitBufFn("data", bitio.NewBitReader(bb.Bytes(), -1), func(d *decode.D) {
for ; i < count; i++ {
decodeFn("data", d)
}
})
2022-02-08 08:03:50 +03:00
}
2022-02-10 05:39:31 +03:00
} else {
2021-12-28 22:05:10 +03:00
d.FieldArrayLoop("data", func() bool { return i < count }, func(d *decode.D) {
2021-12-30 01:12:36 +03:00
decodeFn("datum", d)
2021-12-30 01:46:56 +03:00
i++
2021-12-28 22:05:10 +03:00
})
}
d.FieldRawLen("sync", 16*8, d.AssertBitBuf(header.Sync))
2021-12-28 22:01:53 +03:00
})
return nil
}