1
1
mirror of https://github.com/wader/fq.git synced 2024-12-24 13:52:02 +03:00

Parse header using avro decoders. Still not certain this is the best idea. Will get opinions before finalizing.

This commit is contained in:
Xentripetal 2022-01-19 21:00:44 -06:00
parent 16849c8f35
commit ee184075b7
8 changed files with 1026 additions and 1012 deletions

View File

@ -19,7 +19,7 @@
|`avc_pps` |H.264/AVC&nbsp;Picture&nbsp;Parameter&nbsp;Set |<sub></sub>|
|`avc_sei` |H.264/AVC&nbsp;Supplemental&nbsp;Enhancement&nbsp;Information |<sub></sub>|
|`avc_sps` |H.264/AVC&nbsp;Sequence&nbsp;Parameter&nbsp;Set |<sub></sub>|
|`avro_ocf` |Avro&nbsp;object&nbsp;container&nbsp;file |<sub>`json`</sub>|
|`avro_ocf` |Avro&nbsp;object&nbsp;container&nbsp;file |<sub></sub>|
|`bencode` |BitTorrent&nbsp;bencoding |<sub></sub>|
|`bsd_loopback_frame` |BSD&nbsp;loopback&nbsp;frame |<sub>`ipv4_packet`</sub>|
|`bson` |Binary&nbsp;JSON |<sub></sub>|

File diff suppressed because it is too large Load Diff

Before

Width:  |  Height:  |  Size: 105 KiB

After

Width:  |  Height:  |  Size: 107 KiB

View File

@ -47,23 +47,27 @@ func decodeMapFn(s schema.SimplifiedSchema) (DecodeFn, error) {
val := make(map[string]interface{})
rawV := subFn(s, d)
impl, ok := rawV.([]map[string]interface{})
rawSlice, ok := rawV.([]interface{})
if !ok {
d.Fatalf("decode map: expected array of maps, got %T", rawV)
d.Fatalf("decode map: expected array of interfaces, got %v", rawV)
return nil
}
for entry := range impl {
rawKey, ok := impl[entry]["key"]
for _, rawEntry := range rawSlice {
entry, ok := rawEntry.(map[string]interface{})
if !ok {
d.Fatalf("decode map: expected key in map %v", impl[entry])
d.Fatalf("decode map: expected map, got %T", rawEntry)
}
value, ok := impl[entry]["key"]
rawKey, ok := entry["key"]
if !ok {
d.Fatalf("decode map: expected value in map %v", impl[entry])
d.Fatalf("decode map: expected key in map %v", entry)
}
value, ok := entry["value"]
if !ok {
d.Fatalf("decode map: expected value in map %v", entry)
}
key, ok := rawKey.(string)
if !ok {
d.Fatalf("decode map: expected string key in map %v", impl[entry])
d.Fatalf("decode map: expected string key in map %v", entry)
}
val[key] = value
}

View File

@ -9,79 +9,71 @@ import (
"github.com/wader/fq/pkg/scalar"
)
var jsonGroup decode.Group
func init() {
registry.MustRegister(decode.Format{
Name: format.AVRO_OCF,
Description: "Avro object container file",
Groups: []string{format.PROBE},
DecodeFn: avroDecodeOCF,
Dependencies: []decode.Dependency{
{Names: []string{format.JSON}, Group: &jsonGroup},
},
})
}
type HeaderData struct {
Schema *schema.SimplifiedSchema
Schema schema.SimplifiedSchema
Codec string
Sync []byte
}
const headerSchemaSpec = `
{
"type": "record",
"name": "org.apache.avro.file.Header",
"fields": [
{"name": "meta", "type": {"type": "map", "values": "string"}},
{"name": "sync", "type": {"type": "fixed", "name": "Sync", "size": 16}}
]
}`
func decodeHeader(d *decode.D) HeaderData {
d.FieldRawLen("magic", 4*8, d.AssertBitBuf([]byte{'O', 'b', 'j', 1}))
var headerData HeaderData
// Header is encoded in avro so could use avro decoder, but doing it manually so we can
// keep asserts and treating schema as JSON
d.FieldRawLen("magic", 4*8, d.AssertBitBuf([]byte{'O', 'b', 'j', 1}))
var blockCount int64 = -1
d.FieldStructArrayLoop("meta", "block",
func() bool { return blockCount != 0 },
func(d *decode.D) {
blockCount = d.FieldSFn("count", decoders.VarZigZag)
// If its negative, then theres another long representing byte size
if blockCount < 0 {
blockCount *= -1
d.FieldSFn("size", decoders.VarZigZag)
}
if blockCount == 0 {
return
}
var i int64
d.FieldStructArrayLoop("entries", "entry", func() bool { return i < blockCount }, func(d *decode.D) {
keyL := d.FieldSFn("key_len", decoders.VarZigZag)
key := d.FieldUTF8("key", int(keyL))
valL := d.FieldSFn("value_len", decoders.VarZigZag)
if key == "avro.schema" {
v, _ := d.FieldFormatLen("value", valL*8, jsonGroup, nil)
s, err := schema.From(v.V.(*scalar.S).Actual)
headerData.Schema = &s
if err != nil {
d.Fatalf("Failed to parse schema: %s", err)
}
} else if key == "avro.codec" {
headerData.Codec = d.FieldUTF8("value", int(valL))
} else {
d.FieldUTF8("value", int(valL))
}
i++
})
})
if headerData.Schema == nil {
d.Fatalf("No schema found in header")
headerSchema, err := schema.FromSchemaString(headerSchemaSpec)
if err != nil {
d.Fatalf("Failed to parse header schema: %v", err)
}
decodeHeaderFn, err := decoders.DecodeFnForSchema(headerSchema)
if err != nil {
d.Fatalf("failed to parse header: %v", err)
}
if headerData.Codec == "null" {
header := decodeHeaderFn("header", d)
headerRecord, ok := header.(map[string]interface{})
if !ok {
d.Fatalf("header is not a map")
}
meta, ok := headerRecord["meta"].(map[string]interface{})
if !ok {
d.Fatalf("header.meta is not a map")
}
headerData.Schema, err = schema.FromSchemaString(meta["avro.schema"].(string))
if err != nil {
d.Fatalf("failed to parse schema: %v", err)
}
if codec, ok := meta["avro.codec"]; ok && codec != "null" {
headerData.Codec, ok = codec.(string)
if !ok {
d.Fatalf("avro.codec is not a string")
}
} else {
headerData.Codec = ""
}
syncbb := d.FieldRawLen("sync", 16*8)
var err error
headerData.Sync, err = syncbb.BytesLen(16)
if err != nil {
d.Fatalf("unable to read sync bytes: %v", err)
headerData.Sync, ok = headerRecord["sync"].([]byte)
if !ok {
d.Fatalf("header.sync is not a byte array")
}
return headerData
}
@ -89,7 +81,7 @@ func decodeHeader(d *decode.D) HeaderData {
func avroDecodeOCF(d *decode.D, in interface{}) interface{} {
header := decodeHeader(d)
decodeFn, err := decoders.DecodeFnForSchema(*header.Schema)
decodeFn, err := decoders.DecodeFnForSchema(header.Schema)
if err != nil {
d.Fatalf("unable to create codec: %v", err)
}

View File

@ -1,6 +1,7 @@
package schema
import (
"encoding/json"
"errors"
"fmt"
)
@ -34,9 +35,9 @@ type SimplifiedSchema struct {
Symbols []string `json:"symbols,omitempty"`
Values *SimplifiedSchema `json:"values,omitempty"`
UnionTypes []SimplifiedSchema
//Choosing not to handle Default as it adds a lot of complexity and this is used for showing the binary
//representation of the data, not fully parsing it. See https://github.com/linkedin/goavro/blob/master/record.go
//for how it could be handled.
// Choosing not to handle Default as it adds a lot of complexity and this is used for showing the binary
// representation of the data, not fully parsing it. See https://github.com/linkedin/goavro/blob/master/record.go
// for how it could be handled.
}
type Field struct {
@ -44,6 +45,15 @@ type Field struct {
Type SimplifiedSchema
}
func FromSchemaString(schemaString string) (SimplifiedSchema, error) {
var jsonSchema interface{}
if err := json.Unmarshal([]byte(schemaString), &jsonSchema); err != nil {
return SimplifiedSchema{}, fmt.Errorf("failed to unmarshal header schema: %w", err)
}
return From(jsonSchema)
}
func From(schema interface{}) (SimplifiedSchema, error) {
if schema == nil {
return SimplifiedSchema{}, errors.New("schema cannot be nil")
@ -81,7 +91,7 @@ func From(schema interface{}) (SimplifiedSchema, error) {
if s.Precision, err = getInt(v, "precision", false); err != nil {
return s, err
}
if s.Size, err = getInt(v, "precision", false); err != nil {
if s.Size, err = getInt(v, "size", false); err != nil {
return s, err
}
if s.Type == RECORD {
@ -190,9 +200,9 @@ func getInt(m map[string]interface{}, key string, required bool) (int, error) {
}
return 0, nil
}
s, ok := v.(int)
s, ok := v.(float64)
if !ok {
return 0, fmt.Errorf("%s must be a string", key)
return 0, fmt.Errorf("%s must be a int", key)
}
return s, nil
return int(s), nil
}

View File

@ -2,20 +2,24 @@
$ fq verbose firstBlockCountNotGreaterThanZero.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: firstBlockCountNotGreaterThanZero.avro (avro_ocf) 0x0-0x32.7 (51)
0x00|4f 62 6a 01 |Obj. | magic: raw bits (valid) 0x0-0x3.7 (4)
| | | meta[0:2]: 0x4-0x21.7 (30)
| | | [0]{}: block 0x4-0x20.7 (29)
0x00| 02 | . | count: 1 0x4-0x4.7 (1)
| | | entries[0:1]: 0x5-0x20.7 (28)
| | | [0]{}: entry 0x5-0x20.7 (28)
0x00| 16 | . | key_len: 11 0x5-0x5.7 (1)
0x00| 61 76 72 6f 2e 73 63 68 65 6d| avro.schem| key: "avro.schema" 0x6-0x10.7 (11)
| | | header{}: 0x4-0x31.7 (46)
| | | meta[0:2]: 0x4-0x21.7 (30)
| | | [0]{}: block 0x4-0x20.7 (29)
0x00| 02 | . | count: 1 0x4-0x4.7 (1)
| | | data[0:1]: 0x5-0x20.7 (28)
| | | [0]{}: entry 0x5-0x20.7 (28)
| | | key{}: 0x5-0x10.7 (12)
0x00| 16 | . | length: 11 0x5-0x5.7 (1)
0x00| 61 76 72 6f 2e 73 63 68 65 6d| avro.schem| data: "avro.schema" 0x6-0x10.7 (11)
0x10|61 |a |
0x10| 1e | . | value_len: 15 0x11-0x11.7 (1)
0x10| 7b 22 74 79 70 65 22 3a 22 6c 6f 6e 67 22| {"type":"long"| value: {} (json) 0x12-0x20.7 (15)
| | | value{}: 0x11-0x20.7 (16)
0x10| 1e | . | length: 15 0x11-0x11.7 (1)
0x10| 7b 22 74 79 70 65 22 3a 22 6c 6f 6e 67 22| {"type":"long"| data: "{\"type\":\"long\"}" 0x12-0x20.7 (15)
0x20|7d |} |
| | | [1]{}: block 0x21-0x21.7 (1)
0x20| 00 | . | count: 0 0x21-0x21.7 (1)
0x20| 30 31 32 33 34 35 36 37 38 39 61 62 63 64| 0123456789abcd| sync: raw bits 0x22-0x31.7 (16)
| | | [1]{}: block 0x21-0x21.7 (1)
0x20| 00 | . | count: 0 0x21-0x21.7 (1)
| | | data[0:0]: 0x22-NA (0)
0x20| 30 31 32 33 34 35 36 37 38 39 61 62 63 64| 0123456789abcd| sync: raw bits 0x22-0x31.7 (16)
0x30|65 66 |ef |
| | | blocks[0:1]: 0x32-0x32.7 (1)
| | | [0]{}: block 0x32-0x32.7 (1)

View File

@ -2,26 +2,32 @@
$ fq verbose quickstop-deflate.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: quickstop-deflate.avro (avro_ocf) 0x0-0x5835.7 (22582)
0x0000|4f 62 6a 01 |Obj. | magic: raw bits (valid) 0x0-0x3.7 (4)
| | | meta[0:2]: 0x4-0x119.7 (278)
| | | [0]{}: block 0x4-0x118.7 (277)
0x0000| 04 | . | count: 2 0x4-0x4.7 (1)
| | | entries[0:2]: 0x5-0x118.7 (276)
| | | [0]{}: entry 0x5-0x17.7 (19)
0x0000| 14 | . | key_len: 10 0x5-0x5.7 (1)
0x0000| 61 76 72 6f 2e 63 6f 64 65 63| avro.codec| key: "avro.codec" 0x6-0xf.7 (10)
0x0010|0e |. | value_len: 7 0x10-0x10.7 (1)
0x0010| 64 65 66 6c 61 74 65 | deflate | value: "deflate" 0x11-0x17.7 (7)
| | | [1]{}: entry 0x18-0x118.7 (257)
0x0010| 16 | . | key_len: 11 0x18-0x18.7 (1)
0x0010| 61 76 72 6f 2e 73 63| avro.sc| key: "avro.schema" 0x19-0x23.7 (11)
| | | header{}: 0x4-0x129.7 (294)
| | | meta[0:2]: 0x4-0x119.7 (278)
| | | [0]{}: block 0x4-0x118.7 (277)
0x0000| 04 | . | count: 2 0x4-0x4.7 (1)
| | | data[0:2]: 0x5-0x118.7 (276)
| | | [0]{}: entry 0x5-0x17.7 (19)
| | | key{}: 0x5-0xf.7 (11)
0x0000| 14 | . | length: 10 0x5-0x5.7 (1)
0x0000| 61 76 72 6f 2e 63 6f 64 65 63| avro.codec| data: "avro.codec" 0x6-0xf.7 (10)
| | | value{}: 0x10-0x17.7 (8)
0x0010|0e |. | length: 7 0x10-0x10.7 (1)
0x0010| 64 65 66 6c 61 74 65 | deflate | data: "deflate" 0x11-0x17.7 (7)
| | | [1]{}: entry 0x18-0x118.7 (257)
| | | key{}: 0x18-0x23.7 (12)
0x0010| 16 | . | length: 11 0x18-0x18.7 (1)
0x0010| 61 76 72 6f 2e 73 63| avro.sc| data: "avro.schema" 0x19-0x23.7 (11)
0x0020|68 65 6d 61 |hema |
0x0020| e6 03 | .. | value_len: 243 0x24-0x25.7 (2)
0x0020| 7b 22 74 79 70 65 22 3a 22 72| {"type":"r| value: {} (json) 0x26-0x118.7 (243)
| | | value{}: 0x24-0x118.7 (245)
0x0020| e6 03 | .. | length: 243 0x24-0x25.7 (2)
0x0020| 7b 22 74 79 70 65 22 3a 22 72| {"type":"r| data: "{\"type\":\"record\",\"name\":\"Person\",\"fields\":[{\"name\""... 0x26-0x118.7 (243)
0x0030|65 63 6f 72 64 22 2c 22 6e 61 6d 65 22 3a 22 50|ecord","name":"P|
* |until 0x118.7 (243) | |
| | | [1]{}: block 0x119-0x119.7 (1)
0x0110| 00 | . | count: 0 0x119-0x119.7 (1)
0x0110| 93 e7 87 9e 02 95| ......| sync: raw bits 0x11a-0x129.7 (16)
| | | [1]{}: block 0x119-0x119.7 (1)
0x0110| 00 | . | count: 0 0x119-0x119.7 (1)
| | | data[0:0]: 0x11a-NA (0)
0x0110| 93 e7 87 9e 02 95| ......| sync: raw bits 0x11a-0x129.7 (16)
0x0120|d5 9e 4f 58 37 ad b2 a2 ce cd |..OX7..... |
| | | blocks[0:12]: 0x12a-0x5835.7 (22284)
| | | [0]{}: block 0x12a-0x9dc.7 (2227)

View File

@ -2,27 +2,33 @@
$ fq verbose twitter.avro
|00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f|0123456789abcdef|.{}: twitter.avro (avro_ocf) 0x0-0x21e.7 (543)
0x000|4f 62 6a 01 |Obj. | magic: raw bits (valid) 0x0-0x3.7 (4)
| | | meta[0:2]: 0x4-0x197.7 (404)
| | | [0]{}: block 0x4-0x196.7 (403)
0x000| 04 | . | count: 2 0x4-0x4.7 (1)
| | | entries[0:2]: 0x5-0x196.7 (402)
| | | [0]{}: entry 0x5-0x186.7 (386)
0x000| 16 | . | key_len: 11 0x5-0x5.7 (1)
0x000| 61 76 72 6f 2e 73 63 68 65 6d| avro.schem| key: "avro.schema" 0x6-0x10.7 (11)
| | | header{}: 0x4-0x1a7.7 (420)
| | | meta[0:2]: 0x4-0x197.7 (404)
| | | [0]{}: block 0x4-0x196.7 (403)
0x000| 04 | . | count: 2 0x4-0x4.7 (1)
| | | data[0:2]: 0x5-0x196.7 (402)
| | | [0]{}: entry 0x5-0x186.7 (386)
| | | key{}: 0x5-0x10.7 (12)
0x000| 16 | . | length: 11 0x5-0x5.7 (1)
0x000| 61 76 72 6f 2e 73 63 68 65 6d| avro.schem| data: "avro.schema" 0x6-0x10.7 (11)
0x010|61 |a |
0x010| e8 05 | .. | value_len: 372 0x11-0x12.7 (2)
0x010| 7b 22 74 79 70 65 22 3a 22 72 65 63 6f| {"type":"reco| value: {} (json) 0x13-0x186.7 (372)
| | | value{}: 0x11-0x186.7 (374)
0x010| e8 05 | .. | length: 372 0x11-0x12.7 (2)
0x010| 7b 22 74 79 70 65 22 3a 22 72 65 63 6f| {"type":"reco| data: "{\"type\":\"record\",\"name\":\"twitter_schema\",\"namespac"... 0x13-0x186.7 (372)
0x020|72 64 22 2c 22 6e 61 6d 65 22 3a 22 74 77 69 74|rd","name":"twit|
* |until 0x186.7 (372) | |
| | | [1]{}: entry 0x187-0x196.7 (16)
0x180| 14 | . | key_len: 10 0x187-0x187.7 (1)
0x180| 61 76 72 6f 2e 63 6f 64| avro.cod| key: "avro.codec" 0x188-0x191.7 (10)
| | | [1]{}: entry 0x187-0x196.7 (16)
| | | key{}: 0x187-0x191.7 (11)
0x180| 14 | . | length: 10 0x187-0x187.7 (1)
0x180| 61 76 72 6f 2e 63 6f 64| avro.cod| data: "avro.codec" 0x188-0x191.7 (10)
0x190|65 63 |ec |
0x190| 08 | . | value_len: 4 0x192-0x192.7 (1)
0x190| 6e 75 6c 6c | null | value: "null" 0x193-0x196.7 (4)
| | | [1]{}: block 0x197-0x197.7 (1)
0x190| 00 | . | count: 0 0x197-0x197.7 (1)
0x190| 67 c7 35 29 73 ef df 94| g.5)s...| sync: raw bits 0x198-0x1a7.7 (16)
| | | value{}: 0x192-0x196.7 (5)
0x190| 08 | . | length: 4 0x192-0x192.7 (1)
0x190| 6e 75 6c 6c | null | data: "null" 0x193-0x196.7 (4)
| | | [1]{}: block 0x197-0x197.7 (1)
0x190| 00 | . | count: 0 0x197-0x197.7 (1)
| | | data[0:0]: 0x198-NA (0)
0x190| 67 c7 35 29 73 ef df 94| g.5)s...| sync: raw bits 0x198-0x1a7.7 (16)
0x1a0|ad d3 00 7e 9e eb ff ae |...~.... |
| | | blocks[0:1]: 0x1a8-0x21e.7 (119)
| | | [0]{}: block 0x1a8-0x21e.7 (119)