mirror of
https://github.com/wader/fq.git
synced 2024-11-28 03:02:55 +03:00
156 lines
4.1 KiB
Go
156 lines
4.1 KiB
Go
package avro
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/flate"
|
|
"hash/crc32"
|
|
|
|
"github.com/golang/snappy"
|
|
"github.com/wader/fq/format"
|
|
"github.com/wader/fq/format/avro/decoders"
|
|
"github.com/wader/fq/format/avro/schema"
|
|
"github.com/wader/fq/format/registry"
|
|
"github.com/wader/fq/pkg/bitio"
|
|
"github.com/wader/fq/pkg/decode"
|
|
"github.com/wader/fq/pkg/scalar"
|
|
)
|
|
|
|
func init() {
|
|
registry.MustRegister(decode.Format{
|
|
Name: format.AVRO_OCF,
|
|
Description: "Avro object container file",
|
|
Groups: []string{format.PROBE},
|
|
DecodeFn: decodeAvroOCF,
|
|
})
|
|
}
|
|
|
|
type HeaderData struct {
|
|
Schema schema.SimplifiedSchema
|
|
Codec string
|
|
Sync []byte
|
|
}
|
|
|
|
const headerSchemaSpec = `
|
|
{
|
|
"type": "record",
|
|
"name": "org.apache.avro.file.Header",
|
|
"fields": [
|
|
{"name": "meta", "type": {"type": "map", "values": "string"}},
|
|
{"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}}
|
|
]
|
|
}`
|
|
|
|
func decodeHeader(d *decode.D) HeaderData {
|
|
d.FieldRawLen("magic", 4*8, d.AssertBitBuf([]byte{'O', 'b', 'j', 1}))
|
|
|
|
var headerData HeaderData
|
|
|
|
headerSchema, err := schema.FromSchemaString(headerSchemaSpec)
|
|
if err != nil {
|
|
d.Fatalf("Failed to parse header schema: %v", err)
|
|
}
|
|
decodeHeaderFn, err := decoders.DecodeFnForSchema(headerSchema)
|
|
if err != nil {
|
|
d.Fatalf("failed to parse header: %v", err)
|
|
}
|
|
|
|
header := decodeHeaderFn("header", d)
|
|
headerRecord, ok := header.(map[string]interface{})
|
|
if !ok {
|
|
d.Fatalf("header is not a map")
|
|
}
|
|
meta, ok := headerRecord["meta"].(map[string]interface{})
|
|
if !ok {
|
|
d.Fatalf("header.meta is not a map")
|
|
}
|
|
|
|
headerData.Schema, err = schema.FromSchemaString(meta["avro.schema"].(string))
|
|
if err != nil {
|
|
d.Fatalf("failed to parse schema: %v", err)
|
|
}
|
|
if codec, ok := meta["avro.codec"]; ok {
|
|
headerData.Codec, ok = codec.(string)
|
|
if !ok {
|
|
d.Fatalf("avro.codec is not a string")
|
|
}
|
|
} else {
|
|
headerData.Codec = "null"
|
|
}
|
|
|
|
headerData.Sync, ok = headerRecord["sync"].([]byte)
|
|
if !ok {
|
|
d.Fatalf("header.sync is not a byte array")
|
|
}
|
|
return headerData
|
|
}
|
|
|
|
func decodeBlockCodec(d *decode.D, dataSize int64, codec string) *bytes.Buffer {
|
|
bb := &bytes.Buffer{}
|
|
if codec == "deflate" {
|
|
br := d.FieldRawLen("compressed", dataSize*8)
|
|
d.MustCopy(bb, flate.NewReader(bitio.NewIOReader(br)))
|
|
} else if codec == "snappy" {
|
|
// Everything but last 4 bytes which are the checksum
|
|
n := dataSize - 4
|
|
br := d.FieldRawLen("compressed", n*8)
|
|
|
|
// This could be simplified to be similar to deflate, however snappy's reader only works for streaming frames,
|
|
// not block data. See https://github.com/google/snappy/blob/main/framing_format.txt for details.
|
|
compressed := make([]byte, n)
|
|
if _, err := bitio.ReadFull(br, compressed, n*8); err != nil {
|
|
d.Fatalf("failed reading compressed data %v", err)
|
|
}
|
|
decompressed, err := snappy.Decode(nil, compressed)
|
|
if err != nil {
|
|
d.Fatalf("failed decompressing data: %v", err)
|
|
}
|
|
d.MustCopy(bb, bytes.NewReader(decompressed))
|
|
|
|
// Check the checksum
|
|
crc32W := crc32.NewIEEE()
|
|
d.MustCopy(crc32W, bytes.NewReader(bb.Bytes()))
|
|
d.FieldU32("crc", d.ValidateUBytes(crc32W.Sum(nil)), scalar.Hex)
|
|
} else {
|
|
// Unknown codec, just dump the compressed data.
|
|
d.FieldRawLen("compressed", dataSize*8, scalar.Description(codec+" encoded"))
|
|
return nil
|
|
}
|
|
return bb
|
|
}
|
|
|
|
func decodeAvroOCF(d *decode.D, in interface{}) interface{} {
|
|
header := decodeHeader(d)
|
|
|
|
decodeFn, err := decoders.DecodeFnForSchema(header.Schema)
|
|
if err != nil {
|
|
d.Fatalf("unable to create codec: %v", err)
|
|
}
|
|
|
|
d.FieldStructArrayLoop("blocks", "block", func() bool { return d.NotEnd() }, func(d *decode.D) {
|
|
count := d.FieldSFn("count", decoders.VarZigZag)
|
|
if count <= 0 {
|
|
return
|
|
}
|
|
size := d.FieldSFn("size", decoders.VarZigZag)
|
|
i := int64(0)
|
|
|
|
if header.Codec != "null" {
|
|
if bb := decodeBlockCodec(d, size, header.Codec); bb != nil {
|
|
d.FieldArrayRootBitBufFn("data", bitio.NewBitReader(bb.Bytes(), -1), func(d *decode.D) {
|
|
for ; i < count; i++ {
|
|
decodeFn("data", d)
|
|
}
|
|
})
|
|
}
|
|
} else {
|
|
d.FieldArrayLoop("data", func() bool { return i < count }, func(d *decode.D) {
|
|
decodeFn("datum", d)
|
|
i++
|
|
})
|
|
}
|
|
d.FieldRawLen("sync", 16*8, d.AssertBitBuf(header.Sync))
|
|
})
|
|
|
|
return nil
|
|
}
|