1
1
mirror of https://github.com/wader/fq.git synced 2024-11-24 11:16:09 +03:00

Change avro codec to funcs

This commit is contained in:
Xentripetal 2021-12-29 16:12:36 -06:00
parent 7345b8c7c2
commit 4b809a73f6
32 changed files with 504 additions and 412 deletions

View File

@ -1,45 +0,0 @@
package codecs
import (
"errors"
"fmt"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type ArrayCodec struct {
valueCodec Codec
}
func (l ArrayCodec) Decode(name string, d *decode.D) {
d.FieldArray(name, func(d *decode.D) {
count := int64(-1)
for count != 0 {
d.FieldStruct(name, func(d *decode.D) {
count = d.FieldSFn("count", VarZigZag)
if count < 0 {
d.FieldSFn("size", VarZigZag)
count *= -1
}
d.FieldArray("entries", func(d *decode.D) {
for i := int64(0); i < count; i++ {
l.valueCodec.Decode("entry", d)
}
})
})
}
})
}
func BuildArrayCodec(schema schema.SimplifiedSchema) (Codec, error) {
if schema.Items == nil {
return nil, errors.New("array schema must have items")
}
valueCodec, err := BuildCodec(*schema.Items)
if err != nil {
return nil, fmt.Errorf("ArrayCodec: %s", err)
}
return &ArrayCodec{valueCodec: valueCodec}, nil
}

View File

@ -1,18 +0,0 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type BoolCodec struct{}
func (l BoolCodec) Decode(name string, d *decode.D) {
d.FieldBoolFn(name, func(d *decode.D) bool {
return d.U8() >= 1
})
}
func BuildBoolCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &BoolCodec{}, nil
}

View File

@ -1,19 +0,0 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type BytesCodec struct{}
func (l BytesCodec) Decode(name string, d *decode.D) {
// What if its a record with a field called name_len?
// using a struct is probably a better idea. But it makes it less usable
length := d.FieldSFn(name+"_len", VarZigZag)
d.FieldRawLen("name", length*8)
}
func BuildBytesCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &BytesCodec{}, nil
}

View File

@ -1,16 +0,0 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type DoubleCodec struct{}
func (l DoubleCodec) Decode(name string, d *decode.D) {
d.FieldF64(name)
}
func BuildDoubleCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &DoubleCodec{}, nil
}

View File

@ -1,24 +0,0 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type EnumCodec struct {
symbols []string
}
func (l EnumCodec) Decode(name string, d *decode.D) {
d.FieldScalarStrFn(name, func(d *decode.D) string {
value := int(VarZigZag(d))
if value >= len(l.symbols) {
d.Fatalf("invalid enum value: %d", value)
}
return l.symbols[value]
})
}
func BuildEnumCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &EnumCodec{symbols: schema.Symbols}, nil
}

View File

@ -1,18 +0,0 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type FixedCodec struct {
size int64
}
func (l FixedCodec) Decode(name string, d *decode.D) {
d.FieldRawLen(name, l.size*8)
}
func BuildFixedCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &FixedCodec{size: int64(schema.Size)}, nil
}

View File

@ -1,16 +0,0 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type FloatCodec struct{}
func (l FloatCodec) Decode(name string, d *decode.D) {
d.FieldF32(name)
}
func BuildFloatCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &FloatCodec{}, nil
}

View File

@ -1,17 +0,0 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type IntCodec struct{}
func (l IntCodec) Decode(name string, d *decode.D) {
// a boolean is written as a single byte whose value is either 0 (false) or 1 (true).
d.FieldSFn(name, VarZigZag)
}
func BuildIntCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &IntCodec{}, nil
}

View File

@ -1,45 +0,0 @@
package codecs
import (
"errors"
"fmt"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type MapCodec struct {
subCodec Codec
}
func (l MapCodec) Decode(name string, d *decode.D) {
l.subCodec.Decode(name, d)
}
func BuildMapCodec(s schema.SimplifiedSchema) (Codec, error) {
if s.Values == nil {
return nil, errors.New("map schema must have values")
}
subSchema := schema.SimplifiedSchema{
Type: schema.ARRAY,
Items: &schema.SimplifiedSchema{
Type: schema.RECORD,
Fields: []schema.Field{
{
Name: "key",
Type: schema.SimplifiedSchema{Type: schema.STRING},
},
{
Name: "value",
Type: *s.Values,
},
},
},
}
subCodec, err := BuildCodec(subSchema)
if err != nil {
return nil, fmt.Errorf("MapCodec: %v", err)
}
return &MapCodec{subCodec: subCodec}, nil
}

View File

@ -1,17 +0,0 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type NullCodec struct{}
func (l NullCodec) Decode(name string, d *decode.D) {
// null is written as zero bytes.
d.FieldRawLen(name, 0)
}
func BuildNullCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &NullCodec{}, nil
}

View File

@ -1,40 +0,0 @@
package codecs
import (
"fmt"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type RecordCodec struct {
fields []string
codecs []Codec
name string
}
func (l RecordCodec) Decode(name string, d *decode.D) {
d.FieldStruct(name, func(d *decode.D) {
for i, f := range l.fields {
c := l.codecs[i]
c.Decode(f, d)
}
})
}
func BuildRecordCodec(schema schema.SimplifiedSchema) (Codec, error) {
var c RecordCodec
if schema.Fields == nil {
return c, fmt.Errorf("RecordCodec: no fields")
}
c.name = schema.Name
for _, f := range schema.Fields {
c.fields = append(c.fields, f.Name)
fc, err := BuildCodec(f.Type)
if err != nil {
return c, fmt.Errorf("RecordCodec: %v", err)
}
c.codecs = append(c.codecs, fc)
}
return &c, nil
}

View File

@ -1,17 +0,0 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type StringCodec struct{}
func (c StringCodec) Decode(name string, d *decode.D) {
length := d.FieldSFn(name+"_len", VarZigZag)
d.FieldUTF8(name, int(length))
}
func BuildStringCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &StringCodec{}, nil
}

View File

@ -1,40 +0,0 @@
package codecs
import (
"errors"
"fmt"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type UnionCodec struct {
codecs []Codec
}
func (l UnionCodec) Decode(name string, d *decode.D) {
// A union is encoded by first writing an int value indicating the zero-based position within the union of the
// schema of its value. The value is then encoded per the indicated schema within the union.
d.FieldStruct(name, func(d *decode.D) {
v := int(d.FieldSFn("type", VarZigZag))
if v >= len(l.codecs) {
d.Fatalf("invalid union value: %d", v)
}
l.codecs[v].Decode("value", d)
})
}
func BuildUnionCodec(schema schema.SimplifiedSchema) (Codec, error) {
var c UnionCodec
if schema.UnionTypes == nil {
return nil, errors.New("UnionCodec: no union types")
}
for _, t := range schema.UnionTypes {
tc, err := BuildCodec(t)
if err != nil {
return nil, fmt.Errorf("UnionCodec: %v", err)
}
c.codecs = append(c.codecs, tc)
}
return &c, nil
}

View File

@ -0,0 +1,50 @@
package decoders
import (
"errors"
"fmt"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
func decodeArrayFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
if schema.Items == nil {
return nil, errors.New("array schema must have items")
}
valueD, err := DecodeFnForSchema(*schema.Items)
if err != nil {
return nil, fmt.Errorf("ArrayCodec: %s", err)
}
//Arrays are encoded as a series of blocks. Each block consists of a long count value, followed by that many array
//items. A block with count zero indicates the end of the array. Each item is encoded per the array's item schema.
//If a block's count is negative, its absolute value is used, and the count is followed immediately by a long block
//size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when
//projecting a record to a subset of its fields.
//For example, the array schema {"type": "array", "items": "long"}
//an array containing the items 3 and 27 could be encoded as the long value 2 (encoded as hex 04)
//followed by long values 3 and 27 (encoded as hex 06 36) terminated by zero:
//04 06 36 00
return func(name string, d *decode.D) {
d.FieldArray(name, func(d *decode.D) {
count := int64(-1)
for count != 0 {
d.FieldStruct("block", func(d *decode.D) {
count = d.FieldSFn("count", VarZigZag)
if count < 0 {
d.FieldSFn("size", VarZigZag)
count *= -1
}
d.FieldArray("data", func(d *decode.D) {
for i := int64(0); i < count; i++ {
valueD("entry", d)
}
})
})
}
})
}, nil
}

View File

@ -0,0 +1,15 @@
package decoders
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
func decodeBoolFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
//a boolean is written as a single byte whose value is either 0 (false) or 1 (true).
return func(name string, d *decode.D) {
d.FieldBoolFn(name, func(d *decode.D) bool {
return d.U8() >= 1
})
}, nil
}

View File

@ -0,0 +1,18 @@
package decoders
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type BytesCodec struct{}
func decodeBytesFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
//bytes are encoded as a long followed by that many bytes of data.
return func(name string, d *decode.D) {
// What if its a record with a field called name_len?
// using a struct is probably a better idea. But it makes it less usable
length := d.FieldSFn(name+"_len", VarZigZag)
d.FieldRawLen("name", length*8)
}, nil
}

View File

@ -1,4 +1,4 @@
package codecs
package decoders
import (
"fmt"
@ -6,39 +6,35 @@ import (
"github.com/wader/fq/pkg/decode"
)
type CodecType int
type Codec interface {
Decode(name string, d *decode.D)
}
func BuildCodec(s schema.SimplifiedSchema) (Codec, error) {
func DecodeFnForSchema(s schema.SimplifiedSchema) (func(string, *decode.D), error) {
// TODO support logical types. Right now we will just get the raw type.
switch s.Type {
case schema.BOOLEAN:
return BuildBoolCodec(s)
return decodeBoolFn(s)
case schema.BYTES:
return BuildBytesCodec(s)
return decodeBytesFn(s)
case schema.DOUBLE:
return BuildDoubleCodec(s)
return decodeDoubleFn(s)
case schema.ENUM:
return BuildEnumCodec(s)
return decodeEnumFn(s)
case schema.FIXED:
return decodeFixedFn(s)
case schema.FLOAT:
return BuildFloatCodec(s)
return decodeFloatFn(s)
case schema.INT:
return BuildIntCodec(s)
return decodeIntFn(s)
case schema.LONG:
return BuildLongCodec(s)
return decodeLongFn(s)
case schema.NULL:
return BuildNullCodec(s)
return decodeNullFn(s)
case schema.RECORD:
return BuildRecordCodec(s)
return decodeRecordFn(s)
case schema.STRING:
return BuildStringCodec(s)
return decodeStringFn(s)
case schema.UNION:
return BuildUnionCodec(s)
return decodeUnionFn(s)
case schema.MAP:
return BuildMapCodec(s)
return decodeMapFn(s)
default:
return nil, fmt.Errorf("unknown type: %s", s.Type)
}

View File

@ -0,0 +1,14 @@
package decoders
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
func decodeDoubleFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
//a double is written as 8 bytes. The double is converted into a 64-bit integer using a method equivalent to Java's
//doubleToLongBits and then encoded in little-endian format.
return func(name string, d *decode.D) {
d.FieldF64(name)
}, nil
}

View File

@ -0,0 +1,29 @@
package decoders
import (
"errors"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
"github.com/wader/fq/pkg/scalar"
)
func decodeEnumFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
if len(schema.Symbols) == 0 {
return nil, errors.New("enum requires symbols")
}
//An enum is encoded by a int, representing the zero-based position of the symbol in the schema.
//For example, consider the enum:
// {"type": "enum", "name": "Foo", "symbols": ["A", "B", "C", "D"] }
//This would be encoded by an int between zero and three, with zero indicating "A", and 3 indicating "D".
return func(name string, d *decode.D) {
d.FieldSFn(name, VarZigZag, scalar.Fn(func(s scalar.S) (scalar.S, error) {
v := int(s.ActualS())
if v < 0 || v >= len(schema.Symbols) {
return s, errors.New("enum value of out range")
}
s.Sym = schema.Symbols[v]
return s, nil
}))
}, nil
}

View File

@ -0,0 +1,18 @@
package decoders
import (
"errors"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
func decodeFixedFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
if schema.Size < 0 {
return nil, errors.New("fixed size must be greater than or equal to zero")
}
size := int64(schema.Size)
//Fixed instances are encoded using the number of bytes declared in the schema.
return func(name string, d *decode.D) {
d.FieldRawLen(name, size*8)
}, nil
}

View File

@ -0,0 +1,14 @@
package decoders
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
func decodeFloatFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
//a float is written as 4 bytes. The float is converted into a 32-bit integer using a method equivalent to Java's
//floatToIntBits and then encoded in little-endian format.
return func(name string, d *decode.D) {
d.FieldF32(name)
}, nil
}

View File

@ -0,0 +1,13 @@
package decoders
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
func decodeIntFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
// int and long values are written using variable-length zig-zag coding.
return func(name string, d *decode.D) {
d.FieldSFn(name, VarZigZag)
}, nil
}

View File

@ -1,4 +1,4 @@
package codecs
package decoders
import (
"github.com/wader/fq/format/avro/schema"
@ -8,7 +8,7 @@ import (
const intMask = byte(127)
const intFlag = byte(128)
// readLong reads a variable length zig zag long from the current position in decoder
// VarZigZag reads a variable length zigzag long from the current position in decoder
func VarZigZag(d *decode.D) int64 {
var value uint64
var shift uint
@ -23,12 +23,9 @@ func VarZigZag(d *decode.D) int64 {
panic("unexpected end of data")
}
type LongCodec struct{}
func (l LongCodec) Decode(name string, d *decode.D) {
d.FieldSFn(name, VarZigZag)
}
func BuildLongCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &LongCodec{}, nil
func decodeLongFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
// int and long values are written using variable-length zig-zag coding.
return func(name string, d *decode.D) {
d.FieldSFn(name, VarZigZag)
}, nil
}

View File

@ -0,0 +1,49 @@
package decoders
import (
"errors"
"fmt"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
func decodeMapFn(s schema.SimplifiedSchema) (func(string, *decode.D), error) {
if s.Values == nil {
return nil, errors.New("map schema must have values")
}
//Maps are encoded as a series of blocks. Each block consists of a long count value, followed by that many
//key/value pairs. A block with count zero indicates the end of the map. Each item is encoded per the map's
//value schema.
//If a block's count is negative, its absolute value is used, and the count is followed immediately by a long
//block size indicating the number of bytes in the block. This block size permits fast skipping through data,
//e.g., when projecting a record to a subset of its fields.
//The blocked representation permits one to read and write maps larger than can be buffered in memory, since one
//can start writing items without knowing the full length of the map.
//This is the exact same as the array decoder, with the value being a KV record, so we just use the array decoder
subSchema := schema.SimplifiedSchema{
Type: schema.ARRAY,
Items: &schema.SimplifiedSchema{
Type: schema.RECORD,
Fields: []schema.Field{
{
Name: "key",
Type: schema.SimplifiedSchema{Type: schema.STRING},
},
{
Name: "value",
Type: *s.Values,
},
},
},
}
subFn, err := DecodeFnForSchema(subSchema)
if err != nil {
return nil, fmt.Errorf("decode map: %v", err)
}
return subFn, nil
}

View File

@ -0,0 +1,14 @@
package decoders
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
func decodeNullFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
// null is written as zero bytes.
return func(name string, d *decode.D) {
// Is this the best way to represent null in fq?
d.FieldRawLen(name, 0)
}, nil
}

View File

@ -0,0 +1,46 @@
package decoders
import (
"fmt"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
func decodeRecordFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
if len(schema.Fields) == 0 {
return nil, fmt.Errorf("record must have fields")
}
var fieldNames []string
var fieldDecoders []func(string, *decode.D)
for _, f := range schema.Fields {
fieldNames = append(fieldNames, f.Name)
fc, err := DecodeFnForSchema(f.Type)
if err != nil {
return nil, fmt.Errorf("failed parsing record field %s: %v", f.Name, err)
}
fieldDecoders = append(fieldDecoders, fc)
}
//A record is encoded by encoding the values of its fields in the order that they are declared. In other words, a
//record is encoded as just the concatenation of the encodings of its fields. Field values are encoded per their
//schema. For example, the record schema
// { "type": "record",
// "name": "test",
// "fields" : [ {"name": "a", "type": "long"},
// {"name": "b", "type": "string"} ]
// }
//
//An instance of this record whose a field has value 27 (encoded as hex 36) and whose b field has value "foo"
//(encoded as hex bytes 06 66 6f 6f), would be encoded simply as the concatenation of these, namely
//the hex byte sequence:
//36 06 66 6f 6f
return func(name string, d *decode.D) {
d.FieldStruct(name, func(d *decode.D) {
for i, f := range fieldNames {
fieldDecoders[i](f, d)
}
})
}, nil
}

View File

@ -0,0 +1,17 @@
package decoders
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
func decodeStringFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
//string is encoded as a long followed by that many bytes of UTF-8 encoded character data.
//For example, the three-character string "foo" would be encoded as the long value 3 (encoded as hex 06) followed
//by the UTF-8 encoding of 'f', 'o', and 'o' (the hex bytes 66 6f 6f):
//06 66 6f 6f
return func(name string, d *decode.D) {
length := d.FieldSFn(name+"_len", VarZigZag)
d.FieldUTF8(name, int(length))
}, nil
}

View File

@ -0,0 +1,35 @@
package decoders
import (
"errors"
"fmt"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
func decodeUnionFn(schema schema.SimplifiedSchema) (func(string, *decode.D), error) {
if len(schema.UnionTypes) == 0 {
return nil, errors.New("union must have types")
}
var decoders []func(string, *decode.D)
for i, t := range schema.UnionTypes {
decodeFn, err := DecodeFnForSchema(t)
if err != nil {
return nil, fmt.Errorf("failed getting decodeFn for union type %d: %v", i, err)
}
decoders = append(decoders, decodeFn)
}
// A union is encoded by first writing an int value indicating the zero-based position within the union of the
// schema of its value. The value is then encoded per the indicated schema within the union.
return func(name string, d *decode.D) {
d.FieldStruct(name, func(d *decode.D) {
v := int(d.FieldSFn("type", VarZigZag))
if v < 0 || v >= len(decoders) {
d.Fatalf("invalid union value: %d", v)
}
decoders[v]("value", d)
})
}, nil
}

View File

@ -2,7 +2,7 @@ package avro
import (
"github.com/wader/fq/format"
"github.com/wader/fq/format/avro/codecs"
"github.com/wader/fq/format/avro/decoders"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/format/registry"
"github.com/wader/fq/pkg/decode"
@ -46,11 +46,11 @@ func decodeHeader(d *decode.D) HeaderData {
d.FieldStructArrayLoop("meta", "block",
func() bool { return blockCount != 0 },
func(d *decode.D) {
blockCount = d.FieldSFn("count", codecs.VarZigZag)
blockCount = d.FieldSFn("count", decoders.VarZigZag)
// If its negative, then theres another long representing byte size
if blockCount < 0 {
blockCount *= -1
d.FieldSFn("size", codecs.VarZigZag)
d.FieldSFn("size", decoders.VarZigZag)
}
if blockCount == 0 {
return
@ -58,9 +58,9 @@ func decodeHeader(d *decode.D) HeaderData {
var i int64 = 0
d.FieldStructArrayLoop("entries", "entry", func() bool { return i < blockCount }, func(d *decode.D) {
keyL := d.FieldSFn("key_len", codecs.VarZigZag)
keyL := d.FieldSFn("key_len", decoders.VarZigZag)
key := d.FieldUTF8("key", int(keyL))
valL := d.FieldSFn("value_len", codecs.VarZigZag)
valL := d.FieldSFn("value_len", decoders.VarZigZag)
if key == "avro.schema" {
v, _ := d.FieldFormatLen("value", valL*8, jsonGroup, nil)
s, err := schema.SchemaFromJson(v.V.(*scalar.S).Actual)
@ -96,24 +96,24 @@ func decodeHeader(d *decode.D) HeaderData {
func avroDecodeOCF(d *decode.D, in interface{}) interface{} {
header := decodeHeader(d)
c, err := codecs.BuildCodec(*header.Schema)
decodeFn, err := decoders.DecodeFnForSchema(*header.Schema)
if err != nil {
d.Fatalf("unable to create codec: %v", err)
}
d.FieldStructArrayLoop("blocks", "block", func() bool { return d.NotEnd() }, func(d *decode.D) {
count := d.FieldSFn("count", codecs.VarZigZag)
count := d.FieldSFn("count", decoders.VarZigZag)
if count <= 0 {
return
}
size := d.FieldSFn("size", codecs.VarZigZag)
size := d.FieldSFn("size", decoders.VarZigZag)
// Currently not supporting encodings.
if header.Codec != "" {
d.FieldRawLen("data", size*8, ScalarDescription(header.Codec+" encoded"))
} else {
i := int64(0)
d.FieldArrayLoop("data", func() bool { return i < count }, func(d *decode.D) {
c.Decode("datum", d)
decodeFn("datum", d)
i += 1
})
}

View File

@ -1,12 +1,21 @@
$ fq . firstBlockCountNotGreaterThanZero.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: firstBlockCountNotGreaterThanZero.avro (avro_ocf)
0x00|4f 62 6a 01 |Obj. | magic: raw bits (valid)
0x00| 02 16 61 76 72 6f 2e 73 63 68 65 6d| ..avro.schem| meta[0:2]:
0x10|61 1e 7b 22 74 79 70 65 22 3a 22 6c 6f 6e 67 22|a.{"type":"long"|
0x20|7d 00 |}. |
0x20| 30 31 32 33 34 35 36 37 38 39 61 62 63 64| 0123456789abcd| sync: raw bits
$ fq verbose firstBlockCountNotGreaterThanZero.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: firstBlockCountNotGreaterThanZero.avro (avro_ocf) 0x0-0x32.7 (51)
0x00|4f 62 6a 01 |Obj. | magic: raw bits (valid) 0x0-0x3.7 (4)
| | | meta[0:2]: 0x4-0x21.7 (30)
| | | [0]{}: block 0x4-0x20.7 (29)
0x00| 02 | . | count: 1 0x4-0x4.7 (1)
| | | entries[0:1]: 0x5-0x20.7 (28)
| | | [0]{}: entry 0x5-0x20.7 (28)
0x00| 16 | . | key_len: 11 0x5-0x5.7 (1)
0x00| 61 76 72 6f 2e 73 63 68 65 6d| avro.schem| key: "avro.schema" 0x6-0x10.7 (11)
0x10|61 |a |
0x10| 1e | . | value_len: 15 0x11-0x11.7 (1)
0x10| 7b 22 74 79 70 65 22 3a 22 6c 6f 6e 67 22| {"type":"long"| value: {} (json) 0x12-0x20.7 (15)
0x20|7d |} |
| | | [1]{}: block 0x21-0x21.7 (1)
0x20| 00 | . | count: 0 0x21-0x21.7 (1)
0x20| 30 31 32 33 34 35 36 37 38 39 61 62 63 64| 0123456789abcd| sync: raw bits 0x22-0x31.7 (16)
0x30|65 66 |ef |
0x30| 00| | .| | blocks[0:1]:
$ fq '.blocks[0]' firstBlockCountNotGreaterThanZero.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.blocks[0]{}:
0x30| 00| | .| | count: 0
| | | blocks[0:1]: 0x32-0x32.7 (1)
| | | [0]{}: block 0x32-0x32.7 (1)
0x30| 00| | .| | count: 0 0x32-0x32.7 (1)

View File

@ -1,20 +1,121 @@
$ fq . quickstop-deflate.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: quickstop-deflate.avro (avro_ocf)
0x0000|4f 62 6a 01 |Obj. | magic: raw bits (valid)
0x0000| 04 14 61 76 72 6f 2e 63 6f 64 65 63| ..avro.codec| meta[0:2]:
0x0010|0e 64 65 66 6c 61 74 65 16 61 76 72 6f 2e 73 63|.deflate.avro.sc|
* |until 0x119.7 (278) | |
0x0110| 93 e7 87 9e 02 95| ......| sync: raw bits
$ fq verbose quickstop-deflate.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: quickstop-deflate.avro (avro_ocf) 0x0-0x5835.7 (22582)
0x0000|4f 62 6a 01 |Obj. | magic: raw bits (valid) 0x0-0x3.7 (4)
| | | meta[0:2]: 0x4-0x119.7 (278)
| | | [0]{}: block 0x4-0x118.7 (277)
0x0000| 04 | . | count: 2 0x4-0x4.7 (1)
| | | entries[0:2]: 0x5-0x118.7 (276)
| | | [0]{}: entry 0x5-0x17.7 (19)
0x0000| 14 | . | key_len: 10 0x5-0x5.7 (1)
0x0000| 61 76 72 6f 2e 63 6f 64 65 63| avro.codec| key: "avro.codec" 0x6-0xf.7 (10)
0x0010|0e |. | value_len: 7 0x10-0x10.7 (1)
0x0010| 64 65 66 6c 61 74 65 | deflate | value: "deflate" 0x11-0x17.7 (7)
| | | [1]{}: entry 0x18-0x118.7 (257)
0x0010| 16 | . | key_len: 11 0x18-0x18.7 (1)
0x0010| 61 76 72 6f 2e 73 63| avro.sc| key: "avro.schema" 0x19-0x23.7 (11)
0x0020|68 65 6d 61 |hema |
0x0020| e6 03 | .. | value_len: 243 0x24-0x25.7 (2)
0x0020| 7b 22 74 79 70 65 22 3a 22 72| {"type":"r| value: {} (json) 0x26-0x118.7 (243)
0x0030|65 63 6f 72 64 22 2c 22 6e 61 6d 65 22 3a 22 50|ecord","name":"P|
* |until 0x118.7 (243) | |
| | | [1]{}: block 0x119-0x119.7 (1)
0x0110| 00 | . | count: 0 0x119-0x119.7 (1)
0x0110| 93 e7 87 9e 02 95| ......| sync: raw bits 0x11a-0x129.7 (16)
0x0120|d5 9e 4f 58 37 ad b2 a2 ce cd |..OX7..... |
0x0120| b4 09 be 22 8d db| ..."..| blocks[0:12]:
| | | blocks[0:12]: 0x12a-0x5835.7 (22284)
| | | [0]{}: block 0x12a-0x9dc.7 (2227)
0x0120| b4 09 | .. | count: 602 0x12a-0x12b.7 (2)
0x0120| be 22 | ." | size: 2207 0x12c-0x12d.7 (2)
0x0120| 8d db| ..| data: raw bits (deflate encoded) 0x12e-0x9cc.7 (2207)
0x0130|6f 64 ac f9 19 c6 71 f9 37 49 8e 63 1d 55 55 b5|od....q.7I.c.UU.|
* |until 0x5835.7 (end) (22284) | |
$ fq '.blocks[0]' quickstop-deflate.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.blocks[0]{}:
0x120| b4 09 | .. | count: 602
0x120| be 22 | ." | size: 2207
0x120| 8d db| ..| data: raw bits (deflate encoded)
0x130|6f 64 ac f9 19 c6 71 f9 37 49 8e 63 1d 55 55 b5|od....q.7I.c.UU.|
* |until 0x9cc.7 (2207) | |
0x9c0| 93 e7 87| ...| sync: raw bits (valid)
0x9d0|9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd |.....OX7..... |
* |until 0x9cc.7 (2207) | |
0x09c0| 93 e7 87| ...| sync: raw bits (valid) 0x9cd-0x9dc.7 (16)
0x09d0|9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd |.....OX7..... |
| | | [1]{}: block 0x9dd-0x1257.7 (2171)
0x09d0| a0 09 | .. | count: 592 0x9dd-0x9de.7 (2)
0x09d0| ce| .| size: 2151 0x9df-0x9e0.7 (2)
0x09e0|21 |! |
0x09e0| 8d d8 5f 44 fc 7b 1e c7 f1 bb 9f d3 ff 3f d7| .._D.{.......?.| data: raw bits (deflate encoded) 0x9e1-0x1247.7 (2151)
0x09f0|7b b1 97 e7 7b 71 f8 55 df fe fc d6 f2 1b e7 2c|{...{q.U.......,|
* |until 0x1247.7 (2151) | |
0x1240| 93 e7 87 9e 02 95 d5 9e| ........| sync: raw bits (valid) 0x1248-0x1257.7 (16)
0x1250|4f 58 37 ad b2 a2 ce cd |OX7..... |
| | | [2]{}: block 0x1258-0x1ad5.7 (2174)
0x1250| 9e 09 | .. | count: 591 0x1258-0x1259.7 (2)
0x1250| d4 21 | .! | size: 2154 0x125a-0x125b.7 (2)
0x1250| 8d d8 5f 44| .._D| data: raw bits (deflate encoded) 0x125c-0x1ac5.7 (2154)
0x1260|ec 0d 1e c7 f1 bb 63 1d c7 7a f4 ff cf 5e ec e5|......c..z...^..|
* |until 0x1ac5.7 (2154) | |
0x1ac0| 93 e7 87 9e 02 95 d5 9e 4f 58| ........OX| sync: raw bits (valid) 0x1ac6-0x1ad5.7 (16)
0x1ad0|37 ad b2 a2 ce cd |7..... |
| | | [3]{}: block 0x1ad6-0x235f.7 (2186)
0x1ad0| a0 09 | .. | count: 592 0x1ad6-0x1ad7.7 (2)
0x1ad0| ec 21 | .! | size: 2166 0x1ad8-0x1ad9.7 (2)
0x1ad0| 8d d8 7f 44 ec 7b| ...D.{| data: raw bits (deflate encoded) 0x1ada-0x234f.7 (2166)
0x1ae0|1e c7 f1 ff 8e 75 1c d7 fd bf df bf b8 df 3f 96|.....u........?.|
* |until 0x234f.7 (2166) | |
0x2350|93 e7 87 9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd|........OX7.....| sync: raw bits (valid) 0x2350-0x235f.7 (16)
| | | [4]{}: block 0x2360-0x2bda.7 (2171)
0x2360|9e 09 |.. | count: 591 0x2360-0x2361.7 (2)
0x2360| ce 21 | .! | size: 2151 0x2362-0x2363.7 (2)
0x2360| 8d d8 df 47 ec fb 1e c7 f1 bb 65 5b| ...G......e[| data: raw bits (deflate encoded) 0x2364-0x2bca.7 (2151)
0x2370|96 6d ff 01 e7 e2 dc f4 e3 db 2f 6b d5 b7 5f c7|.m......../k.._.|
* |until 0x2bca.7 (2151) | |
0x2bc0| 93 e7 87 9e 02| .....| sync: raw bits (valid) 0x2bcb-0x2bda.7 (16)
0x2bd0|95 d5 9e 4f 58 37 ad b2 a2 ce cd |...OX7..... |
| | | [5]{}: block 0x2bdb-0x3453.7 (2169)
0x2bd0| a0 09 | .. | count: 592 0x2bdb-0x2bdc.7 (2)
0x2bd0| ca 21 | .! | size: 2149 0x2bdd-0x2bde.7 (2)
0x2bd0| 8d| .| data: raw bits (deflate encoded) 0x2bdf-0x3443.7 (2149)
0x2be0|d8 d1 47 fc 7b 1e c7 f1 bb 9f f5 f3 b3 ce 1f b0|..G.{...........|
* |until 0x3443.7 (2149) | |
0x3440| 93 e7 87 9e 02 95 d5 9e 4f 58 37 ad| ........OX7.| sync: raw bits (valid) 0x3444-0x3453.7 (16)
0x3450|b2 a2 ce cd |.... |
| | | [6]{}: block 0x3454-0x3ccd.7 (2170)
0x3450| a0 09 | .. | count: 592 0x3454-0x3455.7 (2)
0x3450| cc 21 | .! | size: 2150 0x3456-0x3457.7 (2)
0x3450| 8d d8 df 47 f4 7d 1e c7| ...G.}..| data: raw bits (deflate encoded) 0x3458-0x3cbd.7 (2150)
0x3460|f1 b3 cb ba 5c d6 fd 07 ec c1 1e de df 83 e5 ba|....\...........|
* |until 0x3cbd.7 (2150) | |
0x3cb0| 93 e7| ..| sync: raw bits (valid) 0x3cbe-0x3ccd.7 (16)
0x3cc0|87 9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd |......OX7..... |
| | | [7]{}: block 0x3cce-0x454c.7 (2175)
0x3cc0| 9e 09| ..| count: 591 0x3cce-0x3ccf.7 (2)
0x3cd0|d6 21 |.! | size: 2155 0x3cd0-0x3cd1.7 (2)
0x3cd0| 8d d8 df 47 ec 0f 1e c7 f1 bb 63 1d c7 fa| ...G......c...| data: raw bits (deflate encoded) 0x3cd2-0x453c.7 (2155)
0x3ce0|fe 01 7b b1 97 df cf c5 72 4e 7d fa b5 96 33 fd|..{.....rN}...3.|
* |until 0x453c.7 (2155) | |
0x4530| 93 e7 87| ...| sync: raw bits (valid) 0x453d-0x454c.7 (16)
0x4540|9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd |.....OX7..... |
| | | [8]{}: block 0x454d-0x4dcd.7 (2177)
0x4540| a0 09 | .. | count: 592 0x454d-0x454e.7 (2)
0x4540| da| .| size: 2157 0x454f-0x4550.7 (2)
0x4550|21 |! |
0x4550| 8d d8 ef 47 ec 7d 1e c7 f1 7b c7 3a 8e 75 fd| ...G.}...{.:.u.| data: raw bits (deflate encoded) 0x4551-0x4dbd.7 (2157)
0x4560|01 7b e3 ba 79 7d 6f 2c e7 d4 b7 1f 67 5d 9c 71|.{..y}o,....g].q|
* |until 0x4dbd.7 (2157) | |
0x4db0| 93 e7| ..| sync: raw bits (valid) 0x4dbe-0x4dcd.7 (16)
0x4dc0|87 9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd |......OX7..... |
| | | [9]{}: block 0x4dce-0x564a.7 (2173)
0x4dc0| 9e 09| ..| count: 591 0x4dce-0x4dcf.7 (2)
0x4dd0|d2 21 |.! | size: 2153 0x4dd0-0x4dd1.7 (2)
0x4dd0| 8d d8 df 47 ec fb 1e c7 f1 bb 65 5b 96 63| ...G......e[.c| data: raw bits (deflate encoded) 0x4dd2-0x563a.7 (2153)
0x4de0|ff 01 e7 e2 5c ee ef c5 61 ad fa f6 6b db ac b1|....\...a...k...|
* |until 0x563a.7 (2153) | |
0x5630| 93 e7 87 9e 02| .....| sync: raw bits (valid) 0x563b-0x564a.7 (16)
0x5640|95 d5 9e 4f 58 37 ad b2 a2 ce cd |...OX7..... |
| | | [10]{}: block 0x564b-0x580d.7 (451)
0x5640| 94 01 | .. | count: 74 0x564b-0x564c.7 (2)
0x5640| de 06 | .. | size: 431 0x564d-0x564e.7 (2)
0x5640| 8d| .| data: raw bits (deflate encoded) 0x564f-0x57fd.7 (431)
0x5650|d5 c1 2b 04 71 14 c0 f1 db a6 6d 93 3f c0 c1 d1|..+.q.....m.?...|
* |until 0x57fd.7 (431) | |
0x57f0| 93 e7| ..| sync: raw bits (valid) 0x57fe-0x580d.7 (16)
0x5800|87 9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd |......OX7..... |
| | | [11]{}: block 0x580e-0x5835.7 (40)
0x5800| 02 | . | count: 1 0x580e-0x580e.7 (1)
0x5800| 2c| ,| size: 22 0x580f-0x580f.7 (1)
0x5810|7b 14 cb 15 5c 5a 90 5a c4 e6 9b 98 c7 63 68 64|{...\Z.Z.....chd| data: raw bits (deflate encoded) 0x5810-0x5825.7 (22)
0x5820|6c 62 6a 66 07 00 |lbjf.. |
0x5820| 93 e7 87 9e 02 95 d5 9e 4f 58| ........OX| sync: raw bits (valid) 0x5826-0x5835.7 (16)
0x5830|37 ad b2 a2 ce cd| |7.....| |

View File

@ -1,15 +1,4 @@
$ fq . twitter.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: twitter.avro (avro_ocf)
0x000|4f 62 6a 01 |Obj. | magic: raw bits (valid)
0x000| 04 16 61 76 72 6f 2e 73 63 68 65 6d| ..avro.schem| meta[0:2]:
0x010|61 e8 05 7b 22 74 79 70 65 22 3a 22 72 65 63 6f|a..{"type":"reco|
* |until 0x197.7 (404) | |
0x190| 67 c7 35 29 73 ef df 94| g.5)s...| sync: raw bits
0x1a0|ad d3 00 7e 9e eb ff ae |...~.... |
0x1a0| 04 c8 01 0c 6d 69 67 75| ....migu| blocks[0:1]:
0x1b0|6e 6f 46 52 6f 63 6b 3a 20 4e 65 72 66 20 70 61|noFRock: Nerf pa|
* |until 0x21e.7 (end) (119) | |
$ fq -c verbose twitter.avro
$ fq verbose twitter.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: twitter.avro (avro_ocf) 0x0-0x21e.7 (543)
0x000|4f 62 6a 01 |Obj. | magic: raw bits (valid) 0x0-0x3.7 (4)
| | | meta[0:2]: 0x4-0x197.7 (404)