Various fixes for Avro encoding (#194)

This commit is contained in:
Alejandro Serrano 2020-05-26 11:06:45 +02:00 committed by GitHub
parent c54ba52a60
commit c6b67332f2
4 changed files with 101 additions and 120 deletions

View File

@ -42,6 +42,7 @@ library
, template-haskell >=2.12
, text
, time
, transformers
, unordered-containers
, uuid
, vector

View File

@ -21,7 +21,6 @@ and to Avro values.
-}
module Mu.Adapter.Avro () where
import Control.Applicative ((<|>))
import Control.Arrow ((***))
import qualified Data.Avro as A
import qualified Data.Avro.Encoding.FromAvro as AVal
@ -30,11 +29,11 @@ import qualified Data.Avro.Schema.ReadSchema as RSch
import qualified Data.Avro.Schema.Schema as ASch
-- 'Tagged . unTagged' can be replaced by 'coerce'
-- eliminating some run-time overhead
import Control.Monad.Trans.State
import Data.Avro.EitherN (putIndexedValue)
import Data.ByteString.Builder (Builder, word8)
import Data.Coerce (coerce)
import qualified Data.HashMap.Strict as HM
import Data.List ((\\))
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NonEmptyList
import qualified Data.Map as M
@ -51,8 +50,9 @@ instance SLess.ToSchemalessTerm AVal.Value where
toSchemalessTerm (AVal.Record s r)
= case s of
RSch.Record { RSch.fields = fs }
-> SLess.TRecord $ map (\(k,v) -> SLess.Field k (SLess.toSchemalessValue v))
$ zip (map RSch.fldName fs) (V.toList r)
-> SLess.TRecord $
zipWith (\k v -> SLess.Field k (SLess.toSchemalessValue v))
(map RSch.fldName fs) (V.toList r)
_ -> error ("this should never happen:\n" ++ show s)
toSchemalessTerm (AVal.Enum _ i _)
= SLess.TEnum i
@ -83,148 +83,118 @@ instance SLess.ToSchemalessValue AVal.Value where
toSchemalessValue e@AVal.Enum {}
= SLess.FSchematic (SLess.toSchemalessTerm e)
instance (HasAvroSchemas sch sch)
instance (HasAvroSchema' (Term sch (sch :/: sty)))
=> A.HasAvroSchema (WithSchema sch sty t) where
schema = Tagged $ ASch.Union $ schemas (Proxy @sch) (Proxy @sch) ts
where ts = typeNames (Proxy @sch) (Proxy @sch)
schema = coerce $ evalState (schema' @(Term sch (sch :/: sty))) []
instance ( FromSchema sch sty t
, A.FromAvro (Term sch (sch :/: sty)) )
=> A.FromAvro (WithSchema sch sty t) where
fromAvro entire@(AVal.Union _ _ v)
= -- remove first layer of union
WithSchema . fromSchema' @_ @_ @sch <$> AVal.fromAvro v
<|> -- try with the entire thing
WithSchema . fromSchema' @_ @_ @sch <$> AVal.fromAvro entire
fromAvro entire
= WithSchema . fromSchema' @_ @_ @sch <$> AVal.fromAvro entire
= WithSchema . fromSchema' @_ @_ @sch <$> AVal.fromAvro entire
instance ( ToSchema sch sty t
, A.ToAvro (Term sch (sch :/: sty))
, KnownNat (IxOf sch sty) )
, A.ToAvro (Term sch (sch :/: sty)) )
=> A.ToAvro (WithSchema sch sty t) where
toAvro (ASch.Union vs) (WithSchema v)
= putIndexedValue (fromInteger $ natVal (Proxy @(IxOf sch sty)))
vs
(toSchema' @_ @_ @sch v)
toAvro s _ = error ("this should never happen:\n" ++ show s)
class HasAvroSchemas (r :: Schema tn fn) (sch :: Schema tn fn) where
schemas :: Proxy r -> Proxy sch -> [ASch.TypeName] -> V.Vector ASch.Schema
typeNames :: Proxy r -> Proxy sch -> [ASch.TypeName]
instance HasAvroSchemas r '[] where
schemas _ _ _ = V.empty
typeNames _ _ = []
instance forall r d ds.
(HasAvroSchema' (Term r d), HasAvroSchemas r ds)
=> HasAvroSchemas r (d ': ds) where
schemas pr _ tys = V.cons thisSchema (schemas pr (Proxy @ds) tys)
where thisSchema = unTagged $ schema' @(Term r d) (tys \\ typeName' (Proxy @(Term r d)))
typeNames pr _ = typeName' (Proxy @(Term r d)) ++ typeNames pr (Proxy @ds)
type family IxOf (sch :: Schema tn fn) (sty :: tn) :: Nat where
IxOf ('DRecord nm fs ': rest) nm = 0
IxOf ('DEnum nm cs ': rest) nm = 0
IxOf (other ': rest) nm = 1 + IxOf rest nm
toAvro sch (WithSchema v)
= A.toAvro sch (toSchema' @_ @_ @sch v)
-- HasAvroSchema instances
class HasAvroSchema' x where
typeName' :: Proxy x -> [ASch.TypeName]
schema' :: [ASch.TypeName] -> Tagged x ASch.Schema
schema' :: State [ASch.TypeName] (Tagged x ASch.Schema)
instance TypeError ('Text "you should never use HasAvroSchema directly on Term, use WithSchema")
=> A.HasAvroSchema (Term sch t) where
schema = error "this should never happen"
instance HasAvroSchema' (FieldValue sch t)
=> A.HasAvroSchema (FieldValue sch t) where
schema = schema' []
schema = evalState schema' []
instance (KnownName name, HasAvroSchemaFields sch args)
=> HasAvroSchema' (Term sch ('DRecord name args)) where
typeName' _ = [nameTypeName (Proxy @name)]
schema' visited
= if recordName `elem` visited
then Tagged $ ASch.NamedType recordName
else Tagged $ ASch.Record recordName [] Nothing fields
where recordName = nameTypeName (Proxy @name)
fields = schemaF (Proxy @sch) (Proxy @args) visited
schema'
= do let recordName = nameTypeName (Proxy @name)
visited <- gets (recordName `elem`)
if visited
then pure $ Tagged $ ASch.NamedType recordName
else do modify (recordName :)
fields <- schemaF (Proxy @sch) (Proxy @args)
pure $ Tagged $ ASch.Record recordName [] Nothing fields
instance (KnownName name, HasAvroSchemaEnum choices)
=> HasAvroSchema' (Term sch ('DEnum name choices)) where
typeName' _ = [nameTypeName (Proxy @name)]
schema' visited
= if enumName `elem` visited
then Tagged $ ASch.NamedType enumName
else Tagged $ ASch.mkEnum enumName [] Nothing choicesNames
where enumName = nameTypeName (Proxy @name)
choicesNames = schemaE (Proxy @choices)
schema'
= do let enumName = nameTypeName (Proxy @name)
choicesNames = schemaE (Proxy @choices)
visited <- gets (enumName `elem`)
if visited
then pure $ Tagged $ ASch.NamedType enumName
else do modify (enumName :)
pure $ Tagged $ ASch.mkEnum enumName [] Nothing choicesNames
instance HasAvroSchema' (FieldValue sch t)
=> HasAvroSchema' (Term sch ('DSimple t)) where
typeName' _ = []
schema' visited = coerce $ schema' @(FieldValue sch t) visited
schema' = coerce <$> schema' @(FieldValue sch t)
instance HasAvroSchema' (FieldValue sch 'TNull) where
typeName' _ = []
schema' _ = Tagged ASch.Null
schema' = pure $ Tagged ASch.Null
instance A.HasAvroSchema t
=> HasAvroSchema' (FieldValue sch ('TPrimitive t)) where
typeName' _ = []
schema' _ = coerce $ A.schema @t
schema' = pure $ coerce $ A.schema @t
instance (HasAvroSchema' (Term sch (sch :/: t)))
=> HasAvroSchema' (FieldValue sch ('TSchematic t)) where
typeName' _ = []
schema' visited = coerce $ schema' @(Term sch (sch :/: t)) visited
schema' = coerce <$> schema' @(Term sch (sch :/: t))
instance forall sch choices.
HasAvroSchemaUnion (FieldValue sch) choices
=> HasAvroSchema' (FieldValue sch ('TUnion choices)) where
typeName' _ = []
schema' visited
= Tagged $ ASch.mkUnion $ schemaU (Proxy @(FieldValue sch)) (Proxy @choices) visited
schema' = do
schs <- schemaU (Proxy @(FieldValue sch)) (Proxy @choices)
pure $ Tagged $ ASch.mkUnion schs
instance HasAvroSchema' (FieldValue sch t)
=> HasAvroSchema' (FieldValue sch ('TOption t)) where
typeName' _ = []
schema' visited
= Tagged $ ASch.mkUnion $ ASch.Null :| [iSchema]
where iSchema = unTagged $ schema' @(FieldValue sch t) visited
schema' = do
iSchema <- unTagged <$> schema' @(FieldValue sch t)
pure $ Tagged $ ASch.mkUnion $ ASch.Null :| [iSchema]
instance HasAvroSchema' (FieldValue sch t)
=> HasAvroSchema' (FieldValue sch ('TList t)) where
typeName' _ = []
schema' visited
= Tagged $ ASch.Array iSchema
where iSchema = unTagged $ schema' @(FieldValue sch t) visited
schema' = do
iSchema <- unTagged <$> schema' @(FieldValue sch t)
pure $ Tagged $ ASch.Array iSchema
-- These are the only two versions of Map supported by the library
instance HasAvroSchema' (FieldValue sch v)
=> HasAvroSchema' (FieldValue sch ('TMap ('TPrimitive T.Text) v)) where
typeName' _ = []
schema' visited
= Tagged $ ASch.Map iSchema
where iSchema = unTagged $ schema' @(FieldValue sch v) visited
schema' = do
iSchema <- unTagged <$> schema' @(FieldValue sch v)
pure $ Tagged $ ASch.Map iSchema
instance HasAvroSchema' (FieldValue sch v)
=> HasAvroSchema' (FieldValue sch ('TMap ('TPrimitive String) v)) where
typeName' _ = []
schema' visited
= Tagged $ ASch.Map iSchema
where iSchema = unTagged $ schema' @(FieldValue sch v) visited
schema' = do
iSchema <- unTagged <$> schema' @(FieldValue sch v)
pure $ Tagged $ ASch.Map iSchema
class HasAvroSchemaUnion (f :: k -> *) (xs :: [k]) where
schemaU :: Proxy f -> Proxy xs -> [ASch.TypeName] -> NonEmpty ASch.Schema
schemaU :: Proxy f -> Proxy xs -> State [ASch.TypeName] (NonEmpty ASch.Schema)
instance HasAvroSchema' (f v) => HasAvroSchemaUnion f '[v] where
schemaU _ _ visited = vSchema :| []
where vSchema = unTagged (schema' @(f v) visited)
schemaU _ _ = do
vSchema <- unTagged <$> schema' @(f v)
pure $ vSchema :| []
instance (HasAvroSchema' (f x), HasAvroSchemaUnion f (y ': zs))
=> HasAvroSchemaUnion f (x ': y ': zs) where
schemaU p _ visited = xSchema :| NonEmptyList.toList yzsSchema
where xSchema = unTagged (schema' @(f x) visited)
yzsSchema = schemaU p (Proxy @(y ': zs)) visited
schemaU p _ = do
xSchema <- unTagged <$> schema' @(f x)
yzsSchema <- schemaU p (Proxy @(y ': zs))
pure $ xSchema :| NonEmptyList.toList yzsSchema
class HasAvroSchemaFields sch (fs :: [FieldDef tn fn]) where
schemaF :: Proxy sch -> Proxy fs -> [ASch.TypeName] -> [ASch.Field]
schemaF :: Proxy sch -> Proxy fs -> State [ASch.TypeName] [ASch.Field]
instance HasAvroSchemaFields sch '[] where
schemaF _ _ _ = []
schemaF _ _ = pure []
instance (KnownName name, HasAvroSchema' (FieldValue sch t), HasAvroSchemaFields sch fs)
=> HasAvroSchemaFields sch ('FieldDef name t ': fs) where
schemaF psch _ visited = schemaThis : schemaF psch (Proxy @fs) visited
where fieldName = nameText (Proxy @name)
schemaT = unTagged $ schema' @(FieldValue sch t) visited
schemaThis = ASch.Field fieldName [] Nothing Nothing schemaT Nothing
schemaF psch _ = do
let fieldName = nameText (Proxy @name)
schemaT <- unTagged <$> schema' @(FieldValue sch t)
let schemaThis = ASch.Field fieldName [] Nothing Nothing schemaT Nothing
rest <- schemaF psch (Proxy @fs)
pure $ schemaThis : rest
class HasAvroSchemaEnum (fs :: [ChoiceDef fn]) where
schemaE :: Proxy fs -> [T.Text]
@ -317,7 +287,8 @@ instance (KnownName name, ToAvroFields sch args, HasAvroSchemaFields sch args)
= A.record s $ toAvroF fields
-- if we don't have a record, fall back to the one from schema
toAvro _ (TRecord fields)
= A.record (unTagged $ schema' @(Term sch ('DRecord name args)) []) $ toAvroF fields
= A.record sch (toAvroF fields)
where sch = unTagged $ evalState (schema' @(Term sch ('DRecord name args))) []
instance (KnownName name, ToAvroEnum choices, HasAvroSchemaEnum choices)
=> A.ToAvro (Term sch ('DEnum name choices)) where
toAvro ASch.Enum { ASch.symbols = ss } (TEnum n)

View File

@ -1,23 +1,23 @@
[ { "type": "enum",
"name": "gender",
"symbols" : ["male", "female", "nb"]
}
, { "type": "record"
, "name": "address"
, "fields": [
{"name": "postcode", "type": "string"},
{"name": "country", "type": "string"}
]
}
,{ "type": "record",
"name": "person",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "age", "type": ["long", "null"]},
{"name": "gender", "type": ["gender", "null"]},
{"name": "address", "type": "address"},
{"name": "lucky_numbers", "type": { "type": "array", "items": "long" } }
]
}
]
{ "type": "record",
"name": "person",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "age", "type": ["long", "null"]},
{"name": "gender", "type": [
{ "type": "enum",
"name": "gender",
"symbols" : ["male", "female", "nb"]
} , "null"]},
{"name": "address", "type":
{ "type": "record"
, "name": "address"
, "fields": [
{"name": "postcode", "type": "string"},
{"name": "country", "type": "string"}
]
}
},
{"name": "lucky_numbers", "type": { "type": "array", "items": "long" } }
]
}

View File

@ -66,13 +66,22 @@ newtype ViaToAvroTypeRef (ref :: TypeRef snm) t
= ViaToAvroTypeRef { unViaToAvroTypeRef :: t }
instance GRPCInput AvroRPC () where
encodeInput _ _ () = mempty
encodeInput _ c () = encodeEmpty c
decodeInput _ _ = runGetIncremental $ pure $ Right ()
instance GRPCOutput AvroRPC () where
encodeOutput _ _ () = mempty
encodeOutput _ c () = encodeEmpty c
decodeOutput _ _ = runGetIncremental $ pure $ Right ()
encodeEmpty :: Compression -> Builder
encodeEmpty compression =
mconcat [ singleton (if _compressionByteSet compression then 1 else 0)
, putWord32be (fromIntegral $ ByteString.length bin)
, fromByteString bin
]
where
bin = _compressionFunction compression ""
instance forall (sch :: Schema') (sty :: Symbol) (i :: Type).
( HasAvroSchema (WithSchema sch sty i)
, FromAvro (WithSchema sch sty i) )