1
1
mirror of https://github.com/wader/fq.git synced 2024-12-24 22:05:31 +03:00

initial work for avro OCF files

This commit is contained in:
Xentripetal 2021-12-28 13:01:53 -06:00
parent eb4a6fdbd6
commit d6ca48182e
23 changed files with 551 additions and 0 deletions

View File

@ -0,0 +1,18 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type BoolCodec struct{}
func (l BoolCodec) Decode(name string, d *decode.D) {
d.FieldBoolFn(name, func(d *decode.D) bool {
return d.U8() >= 1
})
}
func BuildBoolCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &BoolCodec{}, nil
}

View File

@ -0,0 +1,22 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type NullCodec struct {}
func (l NullCodec) Decode(d *decode.D) interface{}{
// null is written as zero bytes.
return nil
}
func (l NullCodec) Type() CodecType {
return SCALAR
}
func BuildNullCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &NullCodec{}, nil
}

View File

@ -0,0 +1,20 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type FloatCodec struct{}
func (l FloatCodec) Decode(d *decode.D) interface{} {
return d.F32()
}
func (l FloatCodec) Type() CodecType {
return SCALAR
}
func BuildFloatCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &FloatCodec{}, nil
}

View File

@ -0,0 +1 @@
package codecs

View File

@ -0,0 +1,20 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type FloatCodec struct{}
func (l FloatCodec) Decode(d *decode.D) interface{} {
return d.F32()
}
func (l FloatCodec) Type() CodecType {
return SCALAR
}
func BuildFloatCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &FloatCodec{}, nil
}

View File

@ -0,0 +1,22 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type BytesCodec struct{}
func (l BytesCodec) Decode(d *decode.D) interface{} {
length := d.FieldSFn("length", VarZigZag)
d.FieldRawLen("value", length*8)
return nil
}
func (l BytesCodec) Type() CodecType {
return STRUCT
}
func BuildBytesCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &BytesCodec{}, nil
}

View File

@ -0,0 +1,19 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type BytesCodec struct{}
func (l BytesCodec) Decode(name string, d *decode.D) {
// What if its a record with a field called name_len?
// using a struct is probably a better idea. But it makes it less usable
length := d.FieldSFn(name+"_len", VarZigZag)
d.FieldRawLen("name", length*8)
}
func BuildBytesCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &BytesCodec{}, nil
}

View File

@ -0,0 +1,38 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
const intMask = byte(127)
const intFlag = byte(128)
// readLong reads a variable length zig zag long from the current position in decoder
func VarZigZag(d *decode.D) int64 {
var value uint64
var shift uint
for d.NotEnd() {
b := byte(d.U8())
value |= uint64(b&intMask) << shift
if b&intFlag == 0 {
return int64(value>>1) ^ -int64(value&1)
}
shift += 7
}
panic("unexpected end of data")
}
type LongCodec struct {}
func (l LongCodec) Decode(d *decode.D) interface{}{
return VarZigZag(d)
}
func (l LongCodec) Type() CodecType {
return SCALAR
}
func BuildLongCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &LongCodec{}, nil
}

22
format/avro/codecs/int.go Normal file
View File

@ -0,0 +1,22 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type BoolCodec struct {}
func (l BoolCodec) Decode(d *decode.D) interface{}{
// a boolean is written as a single byte whose value is either 0 (false) or 1 (true).
return d.U8() != 0
}
func (l BoolCodec) Type() CodecType {
return SCALAR
}
func BuildBoolCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &BoolCodec{}, nil
}

View File

@ -0,0 +1 @@
package codecs

45
format/avro/codecs/map.go Normal file
View File

@ -0,0 +1,45 @@
package codecs
import (
"errors"
"fmt"
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type ArrayCodec struct {
valueCodec Codec
}
func (l ArrayCodec) Decode(name string, d *decode.D) {
d.FieldArray(name, func(d *decode.D) {
count := int64(-1)
for count != 0 {
d.FieldStruct(name, func(d *decode.D) {
count = d.FieldSFn("count", VarZigZag)
if count < 0 {
d.FieldSFn("size", VarZigZag)
count *= -1
}
d.FieldArray("entries", func(d *decode.D) {
for i := int64(0); i < count; i++ {
l.valueCodec.Decode("entry", d)
}
})
})
}
})
}
func BuildArrayCodec(schema schema.SimplifiedSchema) (Codec, error) {
if schema.Items == nil {
return nil, errors.New("array schema must have items")
}
valueCodec, err := BuildCodec(*schema.Items)
if err != nil {
return nil, fmt.Errorf("ArrayCodec: %s", err)
}
return &ArrayCodec{valueCodec: valueCodec}, nil
}

View File

@ -0,0 +1,38 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
const intMask = byte(127)
const intFlag = byte(128)
// readLong reads a variable length zig zag long from the current position in decoder
func VarZigZag(d *decode.D) int64 {
var value uint64
var shift uint
for d.NotEnd() {
b := byte(d.U8())
value |= uint64(b&intMask) << shift
if b&intFlag == 0 {
return int64(value>>1) ^ -int64(value&1)
}
shift += 7
}
panic("unexpected end of data")
}
type LongCodec struct {}
func (l LongCodec) Decode(d *decode.D) interface{}{
return VarZigZag(d)
}
func (l LongCodec) Type() CodecType {
return SCALAR
}
func BuildLongCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &LongCodec{}, nil
}

View File

@ -0,0 +1,15 @@
package codecs
import "github.com/wader/fq/pkg/decode"
type StringCodec struct {}
func (l StringCodec) Decode(d *decode.D) {
length := d.FieldSFn("length", VarZigZag)
d.FieldUTF8("value", int(length))
}
func BuildStringCodec(schema SimplifiedSchema) (Codec, error) {
return &StringCodec{}, nil
}

View File

@ -0,0 +1,31 @@
package codecs
import "github.com/wader/fq/pkg/decode"
const intMask = byte(127)
const intFlag = byte(128)
// readLong reads a variable length zig zag long from the current position in decoder
func VarZigZag(d *decode.D) int64 {
var value uint64
var shift uint
for d.NotEnd() {
b := byte(d.U8())
value |= uint64(b&intMask) << shift
if b&intFlag == 0 {
return int64(value>>1) ^ -int64(value&1)
}
shift += 7
}
panic("unexpected end of data")
}
type LongCodec struct {}
func (l LongCodec) Decode(d *decode.D) {
d.Value.V = VarZigZag(d)
}
func BuildLongCodec(schema SimplifiedSchema) (Codec, error) {
c := LongCodec{}
return &c, nil
}

View File

@ -0,0 +1,26 @@
package codecs
import (
"github.com/wader/fq/format/avro/schema"
"github.com/wader/fq/pkg/decode"
)
type EnumCodec struct{
symbols []string
}
func (l EnumCodec) Decode(d *decode.D) interface{} {
value := int(VarZigZag(d))
if value >= len(l.symbols) {
d.Fatalf("invalid enum value: %d", value)
}
return l.symbols[value]
}
func (l EnumCodec) Type() CodecType {
return SCALAR
}
func BuildEnumCodec(schema schema.SimplifiedSchema) (Codec, error) {
return &EnumCodec{symbols: schema.Symbols}, nil
}

87
format/avro/ocf.go Normal file
View File

@ -0,0 +1,87 @@
package avro
import (
"github.com/wader/fq/format"
"github.com/wader/fq/format/registry"
"github.com/wader/fq/pkg/decode"
)
var jsonGroup decode.Group
func init() {
registry.MustRegister(decode.Format{
Name: format.AVRO_OCF,
Description: "Avro object container file",
Groups: []string{format.PROBE},
DecodeFn: avroDecode,
Dependencies: []decode.Dependency{
{Names: []string{format.JSON}, Group: &jsonGroup},
},
})
}
const headerMetadataSchema = `{"type": "map", "values": "bytes"}`
const intMask = byte(127)
const intFlag = byte(128)
// readLong reads a variable length zig zag long from the current position in decoder
// and returns the decoded value and the number of bytes read.
func varZigZag(d *decode.D) int64 {
var value uint64
var shift uint
for d.NotEnd() {
b := byte(d.U8())
value |= uint64(b&intMask) << shift
if b&intFlag == 0 {
return int64(value>>1) ^ -int64(value&1)
}
shift += 7
}
panic("unexpected end of data")
}
func avroDecode(d *decode.D, in interface{}) interface{} {
d.FieldRawLen("magic", 4*8, d.AssertBitBuf([]byte{'O', 'b', 'j', 1}))
//var schema []byte
var blockCount int64 = -1
d.FieldStructArrayLoop("meta", "block",
func() bool { return blockCount != 0 },
func(d *decode.D) {
blockCount = d.FieldSFn("count", varZigZag)
// If its negative, then theres another long representing byte size
if blockCount < 0 {
blockCount *= -1
d.FieldSFn("size", varZigZag)
}
if blockCount == 0 {
return
}
var i int64 = 0
d.FieldStructArrayLoop("entries", "entry", func() bool { return i < blockCount }, func(d *decode.D) {
keyL := d.FieldSFn("key_length", varZigZag)
key := d.FieldUTF8("key", int(keyL))
valL := d.FieldSFn("value_length", varZigZag)
if key == "avro.schema" {
d.FieldFormatLen("value", valL*8, jsonGroup, nil)
} else {
d.FieldUTF8("value", int(valL))
}
i++
})
})
syncbb := d.FieldRawLen("sync", 16*8)
sync, err := syncbb.BytesLen(16)
if err != nil {
d.Fatalf("unable to read sync bytes: %v", err)
}
d.FieldStructArrayLoop("blocks", "block", func() bool { return d.NotEnd() }, func(d *decode.D) {
count := d.FieldSFn("count", varZigZag)
_ = count
size := d.FieldSFn("size", varZigZag)
d.FieldRawLen("data", size*8)
d.FieldRawLen("sync", 16*8, d.AssertBitBuf(sync))
})
return nil
}

View File

@ -0,0 +1,33 @@
package codecs
const (
NULL = "null"
BOOLEAN = "boolean"
INT = "int"
LONG = "long"
FLOAT = "float"
DOUBLE = "double"
BYTES = "bytes"
STRING = "string"
RECORD = "record"
)
type SimplifiedSchema struct {
Type string
Name *string
Fields []SimplifiedSchemaField
Symbols *[]string
Items *SimplifiedSchema
LogicalType *string
Scale *int
Precision *int
UnionTypes []SimplifiedSchema
//Choosing not to handle Default as it adds a lot of complexity and this is used for showing the binary
//representation of the data, not fully parsing it. See https://github.com/linkedin/goavro/blob/master/record.go
//for how it could be handled.
}
type SimplifiedSchemaField struct {
Name string
Type SimplifiedSchema
}

Binary file not shown.

View File

@ -0,0 +1,12 @@
$ fq . firstBlockCountNotGreaterThanZero.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: firstBlockCountNotGreaterThanZero.avro (avro_ocf)
0x00|4f 62 6a 01 |Obj. | magic: raw bits (valid)
0x00| 02 16 61 76 72 6f 2e 73 63 68 65 6d| ..avro.schem| meta[0:2]:
0x10|61 1e 7b 22 74 79 70 65 22 3a 22 6c 6f 6e 67 22|a.{"type":"long"|
0x20|7d 00 |}. |
0x20| 30 31 32 33 34 35 36 37 38 39 61 62 63 64| 0123456789abcd| sync: raw bits
0x30|65 66 |ef |
0x30| 00| | .| | blocks[0:1]:
$ fq '.blocks[0]' firstBlockCountNotGreaterThanZero.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.blocks[0]{}:
0x30| 00| | .| | count: 0

Binary file not shown.

View File

@ -0,0 +1,20 @@
$ fq . quickstop-deflate.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: quickstop-deflate.avro (avro_ocf)
0x0000|4f 62 6a 01 |Obj. | magic: raw bits (valid)
0x0000| 04 14 61 76 72 6f 2e 63 6f 64 65 63| ..avro.codec| meta[0:2]:
0x0010|0e 64 65 66 6c 61 74 65 16 61 76 72 6f 2e 73 63|.deflate.avro.sc|
* |until 0x119.7 (278) | |
0x0110| 93 e7 87 9e 02 95| ......| sync: raw bits
0x0120|d5 9e 4f 58 37 ad b2 a2 ce cd |..OX7..... |
0x0120| b4 09 be 22 8d db| ..."..| blocks[0:12]:
0x0130|6f 64 ac f9 19 c6 71 f9 37 49 8e 63 1d 55 55 b5|od....q.7I.c.UU.|
* |until 0x5835.7 (end) (22284) | |
$ fq '.blocks[0]' quickstop-deflate.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.blocks[0]{}:
0x120| b4 09 | .. | count: 602
0x120| be 22 | ." | size: 2207
0x120| 8d db| ..| data: raw bits (deflate encoded)
0x130|6f 64 ac f9 19 c6 71 f9 37 49 8e 63 1d 55 55 b5|od....q.7I.c.UU.|
* |until 0x9cc.7 (2207) | |
0x9c0| 93 e7 87| ...| sync: raw bits (valid)
0x9d0|9e 02 95 d5 9e 4f 58 37 ad b2 a2 ce cd |.....OX7..... |

BIN
format/avro/testdata/twitter.avro vendored Normal file

Binary file not shown.

61
format/avro/testdata/twitter.fqtest vendored Normal file
View File

@ -0,0 +1,61 @@
$ fq . twitter.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: twitter.avro (avro_ocf)
0x000|4f 62 6a 01 |Obj. | magic: raw bits (valid)
0x000| 04 16 61 76 72 6f 2e 73 63 68 65 6d| ..avro.schem| meta[0:2]:
0x010|61 e8 05 7b 22 74 79 70 65 22 3a 22 72 65 63 6f|a..{"type":"reco|
* |until 0x197.7 (404) | |
0x190| 67 c7 35 29 73 ef df 94| g.5)s...| sync: raw bits
0x1a0|ad d3 00 7e 9e eb ff ae |...~.... |
0x1a0| 04 c8 01 0c 6d 69 67 75| ....migu| blocks[0:1]:
0x1b0|6e 6f 46 52 6f 63 6b 3a 20 4e 65 72 66 20 70 61|noFRock: Nerf pa|
* |until 0x21e.7 (end) (119) | |
$ fq -c verbose twitter.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: twitter.avro (avro_ocf) 0x0-0x21e.7 (543)
0x000|4f 62 6a 01 |Obj. | magic: raw bits (valid) 0x0-0x3.7 (4)
| | | meta[0:2]: 0x4-0x197.7 (404)
| | | [0]{}: block 0x4-0x196.7 (403)
0x000| 04 | . | count: 2 0x4-0x4.7 (1)
| | | entries[0:2]: 0x5-0x196.7 (402)
| | | [0]{}: entry 0x5-0x186.7 (386)
0x000| 16 | . | key_len: 11 0x5-0x5.7 (1)
0x000| 61 76 72 6f 2e 73 63 68 65 6d| avro.schem| key: "avro.schema" 0x6-0x10.7 (11)
0x010|61 |a |
0x010| e8 05 | .. | value_len: 372 0x11-0x12.7 (2)
0x010| 7b 22 74 79 70 65 22 3a 22 72 65 63 6f| {"type":"reco| value: {} (json) 0x13-0x186.7 (372)
0x020|72 64 22 2c 22 6e 61 6d 65 22 3a 22 74 77 69 74|rd","name":"twit|
* |until 0x186.7 (372) | |
| | | [1]{}: entry 0x187-0x196.7 (16)
0x180| 14 | . | key_len: 10 0x187-0x187.7 (1)
0x180| 61 76 72 6f 2e 63 6f 64| avro.cod| key: "avro.codec" 0x188-0x191.7 (10)
0x190|65 63 |ec |
0x190| 08 | . | value_len: 4 0x192-0x192.7 (1)
0x190| 6e 75 6c 6c | null | value: "null" 0x193-0x196.7 (4)
| | | [1]{}: block 0x197-0x197.7 (1)
0x190| 00 | . | count: 0 0x197-0x197.7 (1)
0x190| 67 c7 35 29 73 ef df 94| g.5)s...| sync: raw bits 0x198-0x1a7.7 (16)
0x1a0|ad d3 00 7e 9e eb ff ae |...~.... |
| | | blocks[0:1]: 0x1a8-0x21e.7 (119)
| | | [0]{}: block 0x1a8-0x21e.7 (119)
0x1a0| 04 | . | count: 2 0x1a8-0x1a8.7 (1)
0x1a0| c8 01 | .. | size: 100 0x1a9-0x1aa.7 (2)
| | | data[0:2]: 0x1ab-0x20e.7 (100)
| | | [0]{}: datum 0x1ab-0x1da.7 (48)
0x1a0| 0c | . | username_len: 6 0x1ab-0x1ab.7 (1)
0x1a0| 6d 69 67 75| migu| username: "miguno" 0x1ac-0x1b1.7 (6)
0x1b0|6e 6f |no |
0x1b0| 46 | F | tweet_len: 35 0x1b2-0x1b2.7 (1)
0x1b0| 52 6f 63 6b 3a 20 4e 65 72 66 20 70 61| Rock: Nerf pa| tweet: "Rock: Nerf paper, scissors is fine." 0x1b3-0x1d5.7 (35)
0x1c0|70 65 72 2c 20 73 63 69 73 73 6f 72 73 20 69 73|per, scissors is|
0x1d0|20 66 69 6e 65 2e | fine. |
0x1d0| b2 b8 ee 96 0a | ..... | timestamp: 1366150681 0x1d6-0x1da.7 (5)
| | | [1]{}: datum 0x1db-0x20e.7 (52)
0x1d0| 14 | . | username_len: 10 0x1db-0x1db.7 (1)
0x1d0| 42 6c 69 7a| Bliz| username: "BlizzardCS" 0x1dc-0x1e5.7 (10)
0x1e0|7a 61 72 64 43 53 |zardCS |
0x1e0| 46 | F | tweet_len: 35 0x1e6-0x1e6.7 (1)
0x1e0| 57 6f 72 6b 73 20 61 73 20| Works as | tweet: "Works as intended. Terran is IMBA." 0x1e7-0x209.7 (35)
0x1f0|69 6e 74 65 6e 64 65 64 2e 20 20 54 65 72 72 61|intended. Terra|
0x200|6e 20 69 73 20 49 4d 42 41 2e |n is IMBA. |
0x200| e2 f3 ee 96 0a | ..... | timestamp: 1366154481 0x20a-0x20e.7 (5)
0x200| 67| g| sync: raw bits (valid) 0x20f-0x21e.7 (16)
0x210|c7 35 29 73 ef df 94 ad d3 00 7e 9e eb ff ae| |.5)s......~....||