2021-12-30 01:12:36 +03:00
|
|
|
package decoders
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2022-01-14 08:32:19 +03:00
|
|
|
"github.com/wader/fq/pkg/scalar"
|
2021-12-30 01:46:56 +03:00
|
|
|
|
2021-12-30 01:12:36 +03:00
|
|
|
"github.com/wader/fq/format/avro/schema"
|
|
|
|
"github.com/wader/fq/pkg/decode"
|
|
|
|
)
|
|
|
|
|
2022-01-14 08:32:19 +03:00
|
|
|
func decodeRecordFn(schema schema.SimplifiedSchema, sms ...scalar.Mapper) (DecodeFn, error) {
|
2021-12-30 01:12:36 +03:00
|
|
|
if len(schema.Fields) == 0 {
|
|
|
|
return nil, fmt.Errorf("record must have fields")
|
|
|
|
}
|
|
|
|
var fieldNames []string
|
2022-01-14 08:32:19 +03:00
|
|
|
var fieldDecoders []func(string, *decode.D) interface{}
|
2021-12-30 01:12:36 +03:00
|
|
|
|
|
|
|
for _, f := range schema.Fields {
|
|
|
|
fieldNames = append(fieldNames, f.Name)
|
|
|
|
fc, err := DecodeFnForSchema(f.Type)
|
|
|
|
if err != nil {
|
2021-12-30 01:46:56 +03:00
|
|
|
return nil, fmt.Errorf("failed parsing record field %s: %w", f.Name, err)
|
2021-12-30 01:12:36 +03:00
|
|
|
}
|
|
|
|
fieldDecoders = append(fieldDecoders, fc)
|
|
|
|
}
|
|
|
|
|
2022-01-14 08:32:19 +03:00
|
|
|
// A record is encoded by encoding the values of its fields in the order that they are declared. In other words, a
|
|
|
|
// record is encoded as just the concatenation of the encodings of its fields. Field values are encoded per their
|
|
|
|
// schema. For example, the record schema
|
|
|
|
// { "type": "record",
|
|
|
|
// "name": "test",
|
|
|
|
// "fields" : [ {"name": "a", "type": "long"},
|
|
|
|
// {"name": "b", "type": "string"} ]
|
|
|
|
// }
|
2021-12-30 01:12:36 +03:00
|
|
|
//
|
2022-01-14 08:32:19 +03:00
|
|
|
// An instance of this record whose a field has value 27 (encoded as hex 36) and whose b field has value "foo"
|
|
|
|
// (encoded as hex bytes 06 66 6f 6f), would be encoded simply as the concatenation of these, namely
|
|
|
|
// the hex byte sequence:
|
|
|
|
// 36 06 66 6f 6f
|
2021-12-30 01:12:36 +03:00
|
|
|
|
2022-01-14 08:32:19 +03:00
|
|
|
return func(name string, d *decode.D) interface{} {
|
|
|
|
val := make(map[string]interface{})
|
2021-12-30 01:12:36 +03:00
|
|
|
d.FieldStruct(name, func(d *decode.D) {
|
|
|
|
for i, f := range fieldNames {
|
2022-01-14 08:32:19 +03:00
|
|
|
val[f] = fieldDecoders[i](f, d)
|
2021-12-30 01:12:36 +03:00
|
|
|
}
|
|
|
|
})
|
2022-01-14 08:32:19 +03:00
|
|
|
return val
|
2021-12-30 01:12:36 +03:00
|
|
|
}, nil
|
|
|
|
}
|