Distributed tracing (#206)

This commit is contained in:
Alejandro Serrano 2020-07-20 12:30:27 +02:00 committed by GitHub
parent 2e60f4d1e3
commit 09820afca1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 565 additions and 135 deletions

View File

@ -17,7 +17,8 @@ packages: compendium-client/
grpc/client/
grpc/server/
graphql/
instrumentation/prometheus
instrumentation/prometheus/
instrumentation/tracing/
source-repository-package
type: git

View File

@ -30,11 +30,13 @@ library
build-depends:
base >=4.12 && <5
, conduit >=1.3.2 && <1.4
, http-types >=0.12 && <0.13
, mtl >=2.2 && <2.3
, mu-schema ==0.3.*
, sop-core >=0.5 && <0.6
, template-haskell >=2.14 && <2.16
, text >=1.2 && <1.3
, wai >=3.2 && <4
hs-source-dirs: src
default-language: Haskell2010

View File

@ -28,10 +28,11 @@ module Mu.Rpc (
) where
import Data.Kind
import Data.Text (Text)
import qualified Data.Text as T
import Data.Text (Text)
import qualified Data.Text as T
import GHC.TypeLits
import qualified Language.Haskell.TH as TH
import qualified Language.Haskell.TH as TH
import Network.HTTP.Types.Header
import Type.Reflection
import Mu.Schema
@ -136,6 +137,7 @@ data RpcInfo i
| RpcInfo { packageInfo :: Package Text Text Text TyInfo
, serviceInfo :: Service Text Text Text TyInfo
, methodInfo :: Method Text Text Text TyInfo
, headers :: RequestHeaders
, extraInfo :: i
}
@ -148,13 +150,13 @@ data TyInfo
instance Show (RpcInfo i) where
show NoRpcInfo
= "<no info>"
show (RpcInfo (Package Nothing _) (Service s _) (Method m _ _) _)
show (RpcInfo (Package Nothing _) (Service s _) (Method m _ _) _ _)
= T.unpack (s <> ":" <> m)
show (RpcInfo (Package (Just p) _) (Service s _) (Method m _ _) _)
show (RpcInfo (Package (Just p) _) (Service s _) (Method m _ _) _ _)
= T.unpack (p <> ":" <> s <> ":" <> m)
class ReflectRpcInfo (p :: Package') (s :: Service') (m :: Method') where
reflectRpcInfo :: Proxy p -> Proxy s -> Proxy m -> i -> RpcInfo i
reflectRpcInfo :: Proxy p -> Proxy s -> Proxy m -> RequestHeaders -> i -> RpcInfo i
class ReflectService (s :: Service') where
reflectService :: Proxy s -> Service Text Text Text TyInfo
class ReflectMethod (m :: Method') where
@ -199,10 +201,10 @@ instance (ReflectArg m, ReflectArgs ms)
instance (KnownMaySymbol pname, ReflectServices ss, ReflectService s, ReflectMethod m)
=> ReflectRpcInfo ('Package pname ss) s m where
reflectRpcInfo _ ps pm
reflectRpcInfo _ ps pm req extra
= RpcInfo (Package (maySymbolVal (Proxy @pname))
(reflectServices (Proxy @ss)))
(reflectService ps) (reflectMethod pm)
(reflectService ps) (reflectMethod pm) req extra
instance (KnownSymbol sname, ReflectMethods ms)
=> ReflectService ('Service sname ms) where

View File

@ -1,5 +1,5 @@
let
haskellNix = import (builtins.fetchTarball https://github.com/input-output-hk/haskell.nix/archive/9d491b5.tar.gz) {};
haskellNix = import (builtins.fetchTarball https://github.com/input-output-hk/haskell.nix/archive/d3edb6e.tar.gz) {};
nixpkgsSrc = haskellNix.sources.nixpkgs-2003;
nixpkgsArgs = haskellNix.nixpkgsArgs;
in
@ -31,4 +31,5 @@ in {
mu-protobuf = hnPkgs.mu-protobuf.components.all;
mu-rpc = hnPkgs.mu-rpc.components.library;
mu-schema = hnPkgs.mu-schema.components.library;
mu-tracing = hnPkgs.mu-tracing.components.library;
}

View File

@ -31,10 +31,13 @@ executable health-server
, mu-protobuf >=0.4.0
, mu-rpc >=0.4.0
, mu-schema >=0.3.0
, mu-tracing >=0.4.0
, prometheus-client >= 1 && <2
, stm >=2.5 && <3
, stm-conduit >=4 && <5
, stm-containers >=1.1 && <2
, text >=1.2 && <2
, tracing-control >=0.0.6
, wai >=3.2 && <4
, warp >=3.3 && <4

View File

@ -1,6 +1,8 @@
{-# language DataKinds #-}
{-# language FlexibleContexts #-}
{-# language OverloadedStrings #-}
{-# language PartialTypeSignatures #-}
{-# language PolyKinds #-}
{-# language TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-partial-type-signatures #-}
@ -9,6 +11,7 @@ module Main where
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.IO.Class
import Control.Monad.Trace
import Data.Conduit
import qualified Data.Conduit.Combinators as C
import Data.Conduit.TMChan
@ -16,27 +19,40 @@ import Data.Maybe (fromMaybe)
import Data.Proxy
import qualified Data.Text as T
import DeferredFolds.UnfoldlM
import Monitor.Tracing.Zipkin (Endpoint (..))
import Network.Wai.Handler.Warp
import Prometheus
import qualified StmContainers.Map as M
import Mu.GraphQL.Server
import Mu.GRpc.Server
import Mu.Instrumentation.Prometheus
import Mu.Instrumentation.Tracing
import Mu.Server
import Definition
main :: IO ()
main = do
-- Initialize prometheus
met <- initPrometheus "health"
-- Initialize zipkin
zpk <- newZipkin defaultZipkinSettings
{ settingsPublishPeriod = Just 1
, settingsEndpoint = Just $ Endpoint (Just "me") Nothing Nothing Nothing }
let rootInfo = MuTracing alwaysSampled "health-check"
-- Initialize app
m <- M.newIO
upd <- newTBMChanIO 100
met <- initPrometheus "health"
-- Put together the server
let s = zipkin rootInfo $ prometheus met $ server m upd
-- Run the app
putStrLn "running health check application"
let s = prometheus met (server m upd)
runConcurrently $ (\_ _ _ -> ())
<$> Concurrently (runner 50051 (gRpcApp msgProtoBuf s))
<*> Concurrently (runner 50052 (gRpcApp msgAvro s))
<*> Concurrently (runner 50053 (graphQLAppQuery s (Proxy @"HealthCheckServiceFS2")))
<$> Concurrently (runner 50051 (gRpcAppTrans msgProtoBuf (runZipkin zpk) s))
<*> Concurrently (runner 50052 (gRpcAppTrans msgAvro (runZipkin zpk) s))
<*> Concurrently (runner 50053 (graphQLAppTransQuery (runZipkin zpk) s
(Proxy @"HealthCheckServiceFS2")))
where runner p app = run p (prometheusWai ["metrics"] app)
-- Server implementation
@ -45,7 +61,9 @@ main = do
type StatusMap = M.Map T.Text T.Text
type StatusUpdates = TBMChan HealthStatusMsg
server :: StatusMap -> StatusUpdates -> ServerIO info HealthCheckService _
server :: (MonadServer m, MonadTrace m)
=> StatusMap -> StatusUpdates
-> ServerT '[] info HealthCheckService m _
server m upd
= wrapServer (\info h -> liftIO (print info) >> h) $
singleService ( method @"setStatus" $ setStatus_ m upd
@ -55,30 +73,36 @@ server m upd
, method @"cleanAll" $ cleanAll_ m
, method @"watch" $ watch_ upd)
setStatus_ :: StatusMap -> StatusUpdates -> HealthStatusMsg -> ServerErrorIO ()
setStatus_ :: (MonadServer m, MonadTrace m)
=> StatusMap -> StatusUpdates -> HealthStatusMsg
-> m ()
setStatus_ m upd
s@(HealthStatusMsg (Just (HealthCheckMsg nm)) (Just (ServerStatusMsg ss)))
= alwaysOk $ do
= childSpan "setStatus" $ alwaysOk $ do
putStr "setStatus: " >> print (nm, ss)
atomically $ do
M.insert ss nm m
writeTBMChan upd s
setStatus_ _ _ _ = serverError (ServerError Invalid "name or status missing")
checkH_ :: StatusMap -> HealthCheckMsg -> ServerErrorIO ServerStatusMsg
checkH_ :: (MonadServer m, MonadTrace m)
=> StatusMap -> HealthCheckMsg
-> m ServerStatusMsg
checkH_ _ (HealthCheckMsg "") = serverError (ServerError Invalid "no server name given")
checkH_ m (HealthCheckMsg nm) = alwaysOk $ do
putStr "check: " >> print nm
ss <- atomically $ M.lookup nm m
pure $ ServerStatusMsg (fromMaybe "" ss)
clearStatus_ :: StatusMap -> HealthCheckMsg -> ServerErrorIO ()
clearStatus_ :: (MonadServer m, MonadTrace m)
=> StatusMap -> HealthCheckMsg -> m ()
clearStatus_ _ (HealthCheckMsg "") = serverError (ServerError Invalid "no server name given")
clearStatus_ m (HealthCheckMsg nm) = alwaysOk $ do
putStr "clearStatus: " >> print nm
atomically $ M.delete nm m
checkAll_ :: StatusMap -> ServerErrorIO AllStatusMsg
checkAll_ :: (MonadServer m, MonadTrace m)
=> StatusMap -> m AllStatusMsg
checkAll_ m = alwaysOk $ do
putStrLn "checkAll"
AllStatusMsg <$> atomically (consumeValues kvToStatus (M.unfoldlM m))
@ -87,15 +111,17 @@ checkAll_ m = alwaysOk $ do
consumeValues f = foldlM' (\xs (x,y) -> pure (f x y:xs)) []
kvToStatus k v = HealthStatusMsg (Just (HealthCheckMsg k)) (Just (ServerStatusMsg v))
cleanAll_ :: StatusMap -> ServerErrorIO ()
cleanAll_ :: (MonadServer m, MonadTrace m)
=> StatusMap -> m ()
cleanAll_ m = alwaysOk $ do
putStrLn "cleanAll"
atomically $ M.reset m
watch_ :: StatusUpdates
watch_ :: (MonadServer m, MonadTrace m)
=> StatusUpdates
-> HealthCheckMsg
-> ConduitT ServerStatusMsg Void ServerErrorIO ()
-> ServerErrorIO ()
-> ConduitT ServerStatusMsg Void m ()
-> m ()
watch_ upd hcm@(HealthCheckMsg nm) sink = do
alwaysOk (putStr "watch: " >> print nm)
runConduit $ sourceTBMChan upd
@ -109,3 +135,5 @@ watch_ upd hcm@(HealthCheckMsg nm) sink = do
Just (Just y) -> yield y >> catMaybesC
Just Nothing -> catMaybesC
Nothing -> pure ()
instance MonadMonitor m => MonadMonitor (TraceT m)

View File

@ -31,6 +31,7 @@ import Control.Monad.Except (MonadError, runExceptT)
import Control.Monad.Writer
import qualified Data.Aeson as Aeson
import qualified Data.Aeson.Types as Aeson
import Data.Coerce (coerce)
import Data.Conduit
import Data.Conduit.Combinators (sinkList, yieldMany)
import Data.Conduit.TQueue
@ -39,8 +40,8 @@ import Data.Maybe
import qualified Data.Text as T
import GHC.TypeLits
import qualified Language.GraphQL.Draft.Syntax as GQL
import Network.HTTP.Types.Header
import Data.Coerce (coerce)
import Mu.GraphQL.Query.Definition
import qualified Mu.GraphQL.Query.Introspection as Intro
import Mu.GraphQL.Query.Parse
@ -57,15 +58,16 @@ type GraphQLApp p qr mut sub m chn hs
runPipeline
:: forall qr mut sub p m chn hs. GraphQLApp p qr mut sub m chn hs
=> (forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> ServerT chn GQL.Field p m hs
-> Proxy qr -> Proxy mut -> Proxy sub
-> Maybe T.Text -> VariableMapC -> GQL.ExecutableDocument
-> IO Aeson.Value
runPipeline f svr _ _ _ opName vmap doc
runPipeline f req svr _ _ _ opName vmap doc
= case parseDoc @qr @mut @sub opName vmap doc of
Left e -> pure $ singleErrValue e
Right (d :: Document p qr mut sub) -> do
(data_, errors) <- runWriterT (runDocument f svr d)
(data_, errors) <- runWriterT (runDocument f req svr d)
case errors of
[] -> pure $ Aeson.object [ ("data", data_) ]
_ -> pure $ Aeson.object [ ("data", data_), ("errors", Aeson.listValue errValue errors) ]
@ -73,17 +75,18 @@ runPipeline f svr _ _ _ opName vmap doc
runSubscriptionPipeline
:: forall qr mut sub p m chn hs. GraphQLApp p qr mut sub m chn hs
=> (forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> ServerT chn GQL.Field p m hs
-> Proxy qr -> Proxy mut -> Proxy sub
-> Maybe T.Text -> VariableMapC -> GQL.ExecutableDocument
-> ConduitT Aeson.Value Void IO ()
-> IO ()
runSubscriptionPipeline f svr _ _ _ opName vmap doc sink
runSubscriptionPipeline f req svr _ _ _ opName vmap doc sink
= case parseDoc @qr @mut @sub opName vmap doc of
Left e
-> yieldSingleError e sink
Right (d :: Document p qr mut sub)
-> runDocumentSubscription f svr d sink
-> runDocumentSubscription f req svr d sink
singleErrValue :: T.Text -> Aeson.Value
singleErrValue e
@ -116,11 +119,13 @@ class RunDocument (p :: Package')
m chn hs where
runDocument ::
(forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> ServerT chn GQL.Field p m hs
-> Document p qr mut sub
-> WriterT [GraphQLError] IO Aeson.Value
runDocumentSubscription ::
(forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> ServerT chn GQL.Field p m hs
-> Document p qr mut sub
-> ConduitT Aeson.Value Void IO ()
@ -139,18 +144,18 @@ instance
, MappingRight chn sub ~ ()
, Intro.Introspect p ('Just qr) ('Just mut) ('Just sub)
) => RunDocument p ('Just qr) ('Just mut) ('Just sub) m chn hs where
runDocument f svr d
runDocument f req svr d
= let i = Intro.introspect (Proxy @p) (Proxy @('Just qr)) (Proxy @('Just mut)) (Proxy @('Just sub))
in case d of
QueryDoc q
-> runQuery f i svr [] () q
-> runQuery f req i svr [] () q
MutationDoc q
-> runQuery f i svr [] () q
-> runQuery f req i svr [] () q
SubscriptionDoc _
-> pure $ singleErrValue "cannot execute subscriptions in this wire"
runDocumentSubscription f svr (SubscriptionDoc d)
= runSubscription f svr [] () d
runDocumentSubscription f svr d = yieldDocument f svr d
runDocumentSubscription f req svr (SubscriptionDoc d)
= runSubscription f req svr [] () d
runDocumentSubscription f req svr d = yieldDocument f req svr d
instance
( p ~ 'Package pname ss
@ -162,13 +167,13 @@ instance
, MappingRight chn mut ~ ()
, Intro.Introspect p ('Just qr) ('Just mut) 'Nothing
) => RunDocument p ('Just qr) ('Just mut) 'Nothing m chn hs where
runDocument f svr d
runDocument f req svr d
= let i = Intro.introspect (Proxy @p) (Proxy @('Just qr)) (Proxy @('Just mut)) (Proxy @'Nothing)
in case d of
QueryDoc q
-> runQuery f i svr [] () q
-> runQuery f req i svr [] () q
MutationDoc q
-> runQuery f i svr [] () q
-> runQuery f req i svr [] () q
runDocumentSubscription = yieldDocument
instance
@ -181,16 +186,16 @@ instance
, MappingRight chn sub ~ ()
, Intro.Introspect p ('Just qr) 'Nothing ('Just sub)
) => RunDocument p ('Just qr) 'Nothing ('Just sub) m chn hs where
runDocument f svr d
runDocument f req svr d
= let i = Intro.introspect (Proxy @p) (Proxy @('Just qr)) (Proxy @'Nothing) (Proxy @('Just sub))
in case d of
QueryDoc q
-> runQuery f i svr [] () q
-> runQuery f req i svr [] () q
SubscriptionDoc _
-> pure $ singleErrValue "cannot execute subscriptions in this wire"
runDocumentSubscription f svr (SubscriptionDoc d)
= runSubscription f svr [] () d
runDocumentSubscription f svr d = yieldDocument f svr d
runDocumentSubscription f req svr (SubscriptionDoc d)
= runSubscription f req svr [] () d
runDocumentSubscription f req svr d = yieldDocument f req svr d
instance
( p ~ 'Package pname ss
@ -199,11 +204,11 @@ instance
, MappingRight chn qr ~ ()
, Intro.Introspect p ('Just qr) 'Nothing 'Nothing
) => RunDocument p ('Just qr) 'Nothing 'Nothing m chn hs where
runDocument f svr d
runDocument f req svr d
= let i = Intro.introspect (Proxy @p) (Proxy @('Just qr)) (Proxy @'Nothing) (Proxy @'Nothing)
in case d of
QueryDoc q
-> runQuery f i svr [] () q
-> runQuery f req i svr [] () q
runDocumentSubscription = yieldDocument
instance
@ -216,12 +221,13 @@ yieldDocument ::
forall p qr mut sub m chn hs.
RunDocument p qr mut sub m chn hs
=> (forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> ServerT chn GQL.Field p m hs
-> Document p qr mut sub
-> ConduitT Aeson.Value Void IO ()
-> IO ()
yieldDocument f svr doc sink = do
(data_, errors) <- runWriterT (runDocument @p @qr @mut @sub @m @chn @hs f svr doc)
yieldDocument f req svr doc sink = do
(data_, errors) <- runWriterT (runDocument @p @qr @mut @sub @m @chn @hs f req svr doc)
let (val :: Aeson.Value)
= case errors of
[] -> Aeson.object [ ("data", data_) ]
@ -235,12 +241,13 @@ runQuery
, s ~ 'Service sname ms
, inh ~ MappingRight chn sname )
=> (forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> Intro.Schema -> ServerT chn GQL.Field p m hs
-> [T.Text]
-> inh
-> ServiceQuery p s
-> WriterT [GraphQLError] IO Aeson.Value
runQuery f sch whole@(Services ss) path = runQueryFindHandler f sch whole path ss
runQuery f req sch whole@(Services ss) path = runQueryFindHandler f req sch whole path ss
runSubscription
:: forall m p s pname ss hs sname ms chn inh.
@ -249,14 +256,15 @@ runSubscription
, s ~ 'Service sname ms
, inh ~ MappingRight chn sname )
=> (forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> ServerT chn GQL.Field p m hs
-> [T.Text]
-> inh
-> OneMethodQuery p s
-> ConduitT Aeson.Value Void IO ()
-> IO ()
runSubscription f whole@(Services ss) path
= runSubscriptionFindHandler f whole path ss
runSubscription f req whole@(Services ss) path
= runSubscriptionFindHandler f req whole path ss
class RunQueryFindHandler m p whole chn ss s hs where
runQueryFindHandler
@ -264,6 +272,7 @@ class RunQueryFindHandler m p whole chn ss s hs where
, s ~ 'Service sname ms
, inh ~ MappingRight chn sname )
=> (forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> Intro.Schema -> ServerT chn GQL.Field p m whole
-> [T.Text]
-> ServicesT chn GQL.Field ss m hs
@ -275,6 +284,7 @@ class RunQueryFindHandler m p whole chn ss s hs where
, s ~ 'Service sname ms
, inh ~ MappingRight chn sname )
=> (forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> ServerT chn GQL.Field p m whole
-> [T.Text]
-> ServicesT chn GQL.Field ss m hs
@ -290,21 +300,21 @@ instance TypeError ('Text "Could not find handler for " ':<>: 'ShowType s)
instance {-# OVERLAPPABLE #-}
RunQueryFindHandler m p whole chn ss s hs
=> RunQueryFindHandler m p whole chn (other ': ss) s (h ': hs) where
runQueryFindHandler f sch whole path (_ :<&>: that)
= runQueryFindHandler f sch whole path that
runSubscriptionFindHandler f whole path (_ :<&>: that)
= runSubscriptionFindHandler f whole path that
runQueryFindHandler f req sch whole path (_ :<&>: that)
= runQueryFindHandler f req sch whole path that
runSubscriptionFindHandler f req whole path (_ :<&>: that)
= runSubscriptionFindHandler f req whole path that
instance {-# OVERLAPS #-}
( s ~ 'Service sname ms, KnownName sname
, RunMethod m p whole chn s ms h )
=> RunQueryFindHandler m p whole chn (s ': ss) s (h ': hs) where
runQueryFindHandler f sch whole path (this :<&>: _) inh queries
runQueryFindHandler f req sch whole path (this :<&>: _) inh queries
= Aeson.object . catMaybes <$> mapM runOneQuery queries
where
-- if we include the signature we have to write
-- an explicit type signature for 'runQueryFindHandler'
runOneQuery (OneMethodQuery nm args)
= runMethod f whole (Proxy @s) path nm inh this args
= runMethod f req whole (Proxy @s) path nm inh this args
-- handle __typename
runOneQuery (TypeNameQuery nm)
= let realName = fromMaybe "__typename" nm
@ -325,13 +335,13 @@ instance {-# OVERLAPS #-}
path]
pure $ Just (realName, Aeson.Null)
-- subscriptions should only have one element
runSubscriptionFindHandler f whole path (this :<&>: _) inh (OneMethodQuery nm args) sink
= runMethodSubscription f whole (Proxy @s) path nm inh this args sink
runSubscriptionFindHandler _ _ _ _ _ (TypeNameQuery nm) sink
runSubscriptionFindHandler f req whole path (this :<&>: _) inh (OneMethodQuery nm args) sink
= runMethodSubscription f req whole (Proxy @s) path nm inh this args sink
runSubscriptionFindHandler _ _ _ _ _ _ (TypeNameQuery nm) sink
= let realName = fromMaybe "__typename" nm
o = Aeson.object [(realName, Aeson.String $ T.pack $ nameVal (Proxy @sname))]
in runConduit $ yieldMany ([o] :: [Aeson.Value]) .| sink
runSubscriptionFindHandler _ _ _ _ _ _ sink
runSubscriptionFindHandler _ _ _ _ _ _ _ sink
= runConduit $ yieldMany
([singleErrValue "__schema and __type are not supported in subscriptions"]
:: [Aeson.Value])
@ -343,6 +353,7 @@ class RunMethod m p whole chn s ms hs where
, s ~ 'Service sname allMs
, inh ~ MappingRight chn sname )
=> (forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> ServerT chn GQL.Field p m whole
-> Proxy s -> [T.Text] -> Maybe T.Text -> inh
-> HandlersT chn GQL.Field inh ms m hs
@ -353,6 +364,7 @@ class RunMethod m p whole chn s ms hs where
, s ~ 'Service sname allMs
, inh ~ MappingRight chn sname )
=> (forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> ServerT chn GQL.Field p m whole
-> Proxy s -> [T.Text] -> Maybe T.Text -> inh
-> HandlersT chn GQL.Field inh ms m hs
@ -369,27 +381,25 @@ instance ( RunMethod m p whole chn s ms hs
, ReflectRpcInfo p s ('Method mname args r) )
=> RunMethod m p whole chn s ('Method mname args r ': ms) (h ': hs) where
-- handle normal methods
runMethod f whole _ path nm inh (h :<||>: _) (Z (ChosenMethodQuery fld args ret))
= ((realName ,) <$>)
<$> runHandler f whole (path ++ [realName]) (h rpcInfo inh) args ret
runMethod f req whole _ path nm inh (h :<||>: _) (Z (ChosenMethodQuery fld args ret))
= ((realName ,) <$>) <$> runHandler f req whole (path ++ [realName]) (h rpcInfo inh) args ret
where realName = fromMaybe (T.pack $ nameVal (Proxy @mname)) nm
rpcInfo = reflectRpcInfo (Proxy @p) (Proxy @s)
(Proxy @('Method mname args r)) fld
runMethod f whole p path nm inh (_ :<||>: r) (S cont)
= runMethod f whole p path nm inh r cont
rpcInfo = reflectRpcInfo (Proxy @p) (Proxy @s) (Proxy @('Method mname args r)) req fld
runMethod f req whole p path nm inh (_ :<||>: r) (S cont)
= runMethod f req whole p path nm inh r cont
-- handle subscriptions
runMethodSubscription f whole _ path nm inh (h :<||>: _) (Z (ChosenMethodQuery fld args ret)) sink
= runHandlerSubscription f whole (path ++ [realName]) (h rpcInfo inh) args ret sink
runMethodSubscription f req whole _ path nm inh (h :<||>: _) (Z (ChosenMethodQuery fld args ret)) sink
= runHandlerSubscription f req whole (path ++ [realName]) (h rpcInfo inh) args ret sink
where realName = fromMaybe (T.pack $ nameVal (Proxy @mname)) nm
rpcInfo = reflectRpcInfo (Proxy @p) (Proxy @s)
(Proxy @('Method mname args r)) fld
runMethodSubscription f whole p path nm inh (_ :<||>: r) (S cont) sink
= runMethodSubscription f whole p path nm inh r cont sink
rpcInfo = reflectRpcInfo (Proxy @p) (Proxy @s) (Proxy @('Method mname args r)) req fld
runMethodSubscription f req whole p path nm inh (_ :<||>: r) (S cont) sink
= runMethodSubscription f req whole p path nm inh r cont sink
class Handles chn args r m h
=> RunHandler m p whole chn args r h where
runHandler
:: (forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> ServerT chn GQL.Field p m whole
-> [T.Text]
-> h
@ -398,6 +408,7 @@ class Handles chn args r m h
-> WriterT [GraphQLError] IO (Maybe Aeson.Value)
runHandlerSubscription
:: (forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> ServerT chn GQL.Field p m whole
-> [T.Text]
-> h
@ -408,45 +419,45 @@ class Handles chn args r m h
instance (ArgumentConversion chn ref t, RunHandler m p whole chn rest r h)
=> RunHandler m p whole chn ('ArgSingle aname ref ': rest) r (t -> h) where
runHandler f whole path h (ArgumentValue one :* rest)
= runHandler f whole path (h (convertArg (Proxy @chn) one)) rest
runHandlerSubscription f whole path h (ArgumentValue one :* rest)
= runHandlerSubscription f whole path (h (convertArg (Proxy @chn) one)) rest
runHandler f req whole path h (ArgumentValue one :* rest)
= runHandler f req whole path (h (convertArg (Proxy @chn) one)) rest
runHandlerSubscription f req whole path h (ArgumentValue one :* rest)
= runHandlerSubscription f req whole path (h (convertArg (Proxy @chn) one)) rest
instance ( MonadError ServerError m
, FromRef chn ref t
, ArgumentConversion chn ('ListRef ref) [t]
, RunHandler m p whole chn rest r h )
=> RunHandler m p whole chn ('ArgStream aname ref ': rest) r (ConduitT () t m () -> h) where
runHandler f whole path h (ArgumentStream lst :* rest)
runHandler f req whole path h (ArgumentStream lst :* rest)
= let converted :: [t] = convertArg (Proxy @chn) lst
in runHandler f whole path (h (yieldMany converted)) rest
runHandlerSubscription f whole path h (ArgumentStream lst :* rest) sink
in runHandler f req whole path (h (yieldMany converted)) rest
runHandlerSubscription f req whole path h (ArgumentStream lst :* rest) sink
= let converted :: [t] = convertArg (Proxy @chn) lst
in runHandlerSubscription f whole path (h (yieldMany converted)) rest sink
in runHandlerSubscription f req whole path (h (yieldMany converted)) rest sink
instance (MonadError ServerError m)
=> RunHandler m p whole chn '[] 'RetNothing (m ()) where
runHandler f _ path h Nil _ = do
runHandler f _req _ path h Nil _ = do
res <- liftIO $ runExceptT (f h)
case res of
Right _ -> pure $ Just Aeson.Null
Left e -> tell [GraphQLError e path] >> pure Nothing
runHandlerSubscription f _ path h Nil _ sink = do
runHandlerSubscription f _req _ path h Nil _ sink = do
res <- liftIO $ runExceptT (f h)
case res of
Right _ -> runConduit $ yieldMany ([] :: [Aeson.Value]) .| sink
Left e -> yieldError e path sink
instance (MonadError ServerError m, ResultConversion m p whole chn r l)
=> RunHandler m p whole chn '[] ('RetSingle r) (m l) where
runHandler f whole path h Nil (RSingle q) = do
runHandler f req whole path h Nil (RSingle q) = do
res <- liftIO $ runExceptT (f h)
case res of
Right v -> convertResult f whole path q v
Right v -> convertResult f req whole path q v
Left e -> tell [GraphQLError e path] >> pure Nothing
runHandlerSubscription f whole path h Nil (RSingle q) sink = do
runHandlerSubscription f req whole path h Nil (RSingle q) sink = do
res <- liftIO $ runExceptT (f h)
val <- case res of
Right v -> do
(data_, errors) <- runWriterT (convertResult f whole path q v)
(data_, errors) <- runWriterT (convertResult f req whole path q v)
case errors of
[] -> pure $ Aeson.object [ ("data", fromMaybe Aeson.Null data_) ]
_ -> pure $ Aeson.object [ ("data", fromMaybe Aeson.Null data_)
@ -455,15 +466,15 @@ instance (MonadError ServerError m, ResultConversion m p whole chn r l)
runConduit $ yieldMany ([val] :: [Aeson.Value]) .| sink
instance (MonadIO m, MonadError ServerError m, ResultConversion m p whole chn r l)
=> RunHandler m p whole chn '[] ('RetStream r) (ConduitT l Void m () -> m ()) where
runHandler f whole path h Nil (RStream q) = do
runHandler f req whole path h Nil (RStream q) = do
queue <- liftIO newTMQueueIO
res <- liftIO $ runExceptT $ f $ h (sinkTMQueue queue)
case res of
Right _ -> do
info <- runConduit $ sourceTMQueue queue .| sinkList
Just . Aeson.toJSON . catMaybes <$> traverse (convertResult f whole path q) info
Just . Aeson.toJSON . catMaybes <$> traverse (convertResult f req whole path q) info
Left e -> tell [GraphQLError e []] >> pure Nothing
runHandlerSubscription f whole path h Nil (RStream q) sink = do
runHandlerSubscription f req whole path h Nil (RStream q) sink = do
res <- liftIO $ runExceptT $ f $ h
(transPipe liftIO (mapInputM convert (error "this should not be called") sink))
case res of
@ -472,7 +483,7 @@ instance (MonadIO m, MonadError ServerError m, ResultConversion m p whole chn r
where
convert :: l -> IO Aeson.Value
convert v = do
(data_, errors) <- runWriterT (convertResult f whole path q v)
(data_, errors) <- runWriterT (convertResult f req whole path q v)
case errors of
[] -> pure $ Aeson.object [ ("data", fromMaybe Aeson.Null data_) ]
_ -> pure $ Aeson.object [ ("data", fromMaybe Aeson.Null data_)
@ -495,37 +506,38 @@ instance ArgumentConversion chn ref t
class ToRef chn r l => ResultConversion m p whole chn r l where
convertResult :: (forall a. m a -> ServerErrorIO a)
-> RequestHeaders
-> ServerT chn GQL.Field p m whole
-> [T.Text]
-> ReturnQuery' p r
-> l -> WriterT [GraphQLError] IO (Maybe Aeson.Value)
instance Aeson.ToJSON t => ResultConversion m p whole chn ('PrimitiveRef t) t where
convertResult _ _ _ RetPrimitive = pure . Just . Aeson.toJSON
convertResult _ _ _ _ RetPrimitive = pure . Just . Aeson.toJSON
instance ( ToSchema sch l r
, RunSchemaQuery sch (sch :/: l) )
=> ResultConversion m p whole chn ('SchemaRef sch l) r where
convertResult _ _ _ (RetSchema r) t
convertResult _ _ _ _ (RetSchema r) t
= pure $ Just $ runSchemaQuery (toSchema' @_ @_ @sch @r t) r
instance ( MappingRight chn ref ~ t
, MappingRight chn sname ~ t
, LookupService ss ref ~ 'Service sname ms
, RunQueryFindHandler m ('Package pname ss) whole chn ss ('Service sname ms) whole)
=> ResultConversion m ('Package pname ss) whole chn ('ObjectRef ref) t where
convertResult f whole path (RetObject q) h
= Just <$> runQuery @m @('Package pname ss) @(LookupService ss ref) f
convertResult f req whole path (RetObject q) h
= Just <$> runQuery @m @('Package pname ss) @(LookupService ss ref) f req
(error "cannot inspect schema inside a field")
whole path h q
instance ResultConversion m p whole chn r s
=> ResultConversion m p whole chn ('OptionalRef r) (Maybe s) where
convertResult _ _ _ _ Nothing
convertResult _ _ _ _ _ Nothing
= pure Nothing
convertResult f whole path (RetOptional q) (Just x)
= convertResult f whole path q x
convertResult f req whole path (RetOptional q) (Just x)
= convertResult f req whole path q x
instance ResultConversion m p whole chn r s
=> ResultConversion m p whole chn ('ListRef r) [s] where
convertResult f whole path (RetList q) xs
= Just . Aeson.toJSON . catMaybes <$> mapM (convertResult f whole path q) xs
convertResult f req whole path (RetList q) xs
= Just . Aeson.toJSON . catMaybes <$> mapM (convertResult f req whole path q) xs
class RunSchemaQuery sch r where
runSchemaQuery

View File

@ -156,7 +156,8 @@ httpGraphQLAppTrans f server q m s req res =
execQuery opn vals qry =
case parseExecutableDoc qry of
Left err -> toError err
Right doc -> runPipeline f server q m s opn vals doc >>= toResponse
Right doc -> runPipeline f (requestHeaders req) server q m s opn vals doc
>>= toResponse
toError :: T.Text -> IO ResponseReceived
toError err = toResponse $ A.object [ ("errors", A.Array [ A.object [ ("message", A.String err) ] ])]
toResponse :: A.Value -> IO ResponseReceived
@ -171,8 +172,9 @@ wsGraphQLAppTrans
-> Proxy sub
-> WS.ServerApp
wsGraphQLAppTrans f server q m s conn
= do conn' <- WS.acceptRequest conn
flip protocol conn' $ runSubscriptionPipeline f server q m s
= do let headers = WS.requestHeaders $ WS.pendingRequest conn
conn' <- WS.acceptRequest conn
flip protocol conn' $ runSubscriptionPipeline f headers server q m s
-- | Run a Mu 'graphQLApp' using the given 'Settings'.
--

View File

@ -51,6 +51,7 @@ library
, template-haskell >=2.14 && <2.16
, text >=1.2 && <2
, th-abstraction >=0.3.2 && <0.4
, tracing >=0.0.5
hs-source-dirs: src
default-language: Haskell2010

View File

@ -4,7 +4,9 @@
{-# language FlexibleContexts #-}
{-# language FlexibleInstances #-}
{-# language GADTs #-}
{-# language LambdaCase #-}
{-# language MultiParamTypeClasses #-}
{-# language OverloadedStrings #-}
{-# language PolyKinds #-}
{-# language ScopedTypeVariables #-}
{-# language TypeApplications #-}
@ -26,7 +28,10 @@ import Data.Conduit
import qualified Data.Conduit.Combinators as C
import Data.Conduit.TMChan
import Data.Kind
import Data.Text as T
import GHC.TypeLits
import Monitor.Tracing
import Monitor.Tracing.Zipkin
import Network.GRPC.Client (CompressMode (..), IncomingEvent (..),
OutgoingEvent (..), RawReply, StreamDone (..))
import Network.GRPC.Client.Helpers
@ -42,8 +47,22 @@ import Mu.Rpc
import Mu.Schema
-- | Initialize a connection to a gRPC server.
setupGrpcClient' :: GrpcClientConfig -> IO (Either ClientError GrpcClient)
setupGrpcClient' = runExceptT . setupGrpcClient
setupGrpcClient' :: MonadIO m
=> GrpcClientConfig -> m (Either ClientError GrpcClient)
setupGrpcClient' = liftIO . runExceptT . setupGrpcClient
-- | Initialize a connection to a gRPC server
-- and pass information about distributed tracing.
setupGrpcClientZipkin
:: (MonadIO m, MonadTrace m)
=> GrpcClientConfig -> T.Text -> m (Either ClientError GrpcClient)
setupGrpcClientZipkin cfg spanName
= clientSpan spanName $ \case
Nothing -> setupGrpcClient' cfg
(Just b3) -> setupGrpcClient' cfg {
_grpcClientConfigHeaders = ("b3", b3ToHeaderValue b3)
: _grpcClientConfigHeaders cfg
}
class GRpcServiceMethodCall (p :: GRpcMessageProtocol)
(pkg :: snm) (s :: snm)

View File

@ -19,6 +19,7 @@ module Mu.GRpc.Client.Optics (
-- * Initialization of the gRPC client
GRpcConnection
, initGRpc
, initGRpcZipkin
, GRpcMessageProtocol(..)
, msgProtoBuf
, msgAvro
@ -32,10 +33,13 @@ module Mu.GRpc.Client.Optics (
, module Mu.Schema.Optics
) where
import Control.Monad.IO.Class
import qualified Data.ByteString.Char8 as BS
import Data.Conduit
import Data.Proxy
import Data.Text as T
import GHC.TypeLits
import Monitor.Tracing
import Network.GRPC.Client (CompressMode)
import qualified Network.GRPC.Client.Helpers as G
import Network.HTTP2.Client (ClientError)
@ -58,15 +62,35 @@ newtype GRpcConnection (s :: Package') (p :: GRpcMessageProtocol)
--
-- > initGRpc config msgProtoBuf @Service
--
initGRpc :: G.GrpcClientConfig -- ^ gRPC configuration
initGRpc :: MonadIO m
=> G.GrpcClientConfig -- ^ gRPC configuration
-> Proxy p
-> forall s. IO (Either ClientError (GRpcConnection s p))
-> forall s. m (Either ClientError (GRpcConnection s p))
initGRpc config _ = do
setup <- setupGrpcClient' config
pure $ case setup of
Left e -> Left e
Right c -> Right $ GRpcConnection c
-- | Initializes a connection to a gRPC server,
-- creating a new span for distributed tracing.
-- Usually the service you are connecting to is
-- inferred from the usage later on.
-- However, it can also be made explicit by using
--
-- > initGRpcZipkin config msgProtoBuf "person" @Service
--
initGRpcZipkin :: (MonadIO m, MonadTrace m)
=> G.GrpcClientConfig -- ^ gRPC configuration
-> Proxy p
-> T.Text
-> forall s. m (Either ClientError (GRpcConnection s p))
initGRpcZipkin config _ spanName = do
setup <- setupGrpcClientZipkin config spanName
pure $ case setup of
Left e -> Left e
Right c -> Right $ GRpcConnection c
instance forall (pkg :: Package') (pkgName :: Symbol)
(service :: Service') (serviceName :: Symbol)
(methods :: [Method'])

View File

@ -22,6 +22,7 @@ module Mu.GRpc.Client.Record (
, GrpcClientConfig
, grpcClientConfigSimple
, setupGrpcClient'
, setupGrpcClientZipkin
-- * Fill and generate the Haskell record of functions
, buildService
, GRpcMessageProtocol(..)

View File

@ -19,6 +19,7 @@ module Mu.GRpc.Client.TyApps (
, GrpcClientConfig
, grpcClientConfigSimple
, setupGrpcClient'
, setupGrpcClientZipkin
-- * Call methods from the gRPC service
, gRpcCall
, GRpcMessageProtocol(..)

View File

@ -51,7 +51,7 @@ import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput, gzip
import Network.GRPC.HTTP2.Types (GRPCStatus (..), GRPCStatusCode (..))
import Network.GRPC.Server.Handlers.Trans
import Network.GRPC.Server.Wai as Wai
import Network.Wai (Application)
import Network.Wai (Application, Request, requestHeaders)
import Network.Wai.Handler.Warp (Port, Settings, run, runSettings)
import Network.Wai.Handler.WarpTLS (TLSSettings, runTLS)
@ -200,15 +200,16 @@ instance ( KnownName name, MkRPC p
('Method name args r ': rest) (h ': hs) where
gRpcMethodHandlers f pfullP pfullS pr p s (Hmore _ _ h rest)
= gRpcMethodHandler f pr (Proxy @args) (Proxy @r) (mkRPC pr p s methodName)
(h reflectInfo ())
(\req -> h (reflectInfo (requestHeaders req)) ())
: gRpcMethodHandlers f pfullP pfullS pr p s rest
where methodName = BS.pack (nameVal (Proxy @name))
reflectInfo = reflectRpcInfo pfullP pfullS (Proxy @('Method name args r)) ()
reflectInfo hdrs
= reflectRpcInfo pfullP pfullS (Proxy @('Method name args r)) hdrs ()
class GRpcMethodHandler p m (args :: [Argument snm anm (TypeRef snm)]) r h where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy p -> Proxy args -> Proxy r
-> RPCTy p -> h -> ServiceHandler
-> RPCTy p -> (Request -> h) -> ServiceHandler
-- | Turns a 'Conduit' working on 'ServerErrorIO'
-- into any other base monad which supports 'IO',
@ -301,7 +302,7 @@ instance forall (sch :: Schema') sty (r :: Type).
instance (MonadIO m, GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ())
=> GRpcMethodHandler p m '[ ] 'RetNothing (m ()) where
gRpcMethodHandler f _ _ _ rpc h
= unary @m @_ @() @() (raiseErrors . f) rpc (\_ _ -> h)
= unary @m @_ @() @() (raiseErrors . f) rpc (\req _ -> h req)
-----
@ -309,7 +310,8 @@ instance (MonadIO m, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r)
=> GRpcMethodHandler p m '[ ] ('RetSingle rref) (m r) where
gRpcMethodHandler f _ _ _ rpc h
= unary @m @_ @() @(GRpcOWTy p rref r)
(raiseErrors . f) rpc (\_ _ -> buildGRpcOWTy (Proxy @p) (Proxy @rref) <$> h)
(raiseErrors . f) rpc
(\req _ -> buildGRpcOWTy (Proxy @p) (Proxy @rref) <$> h req)
-----
@ -318,13 +320,13 @@ instance (MonadIO m, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r, MonadIO
(ConduitT r Void m () -> m ()) where
gRpcMethodHandler f _ _ _ rpc h
= serverStream @m @_ @() @(GRpcOWTy p rref r) (raiseErrors . f) rpc sstream
where sstream :: req -> ()
where sstream :: Request -> ()
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
sstream _ _ = do
sstream req _ = do
-- Variable to connect input and output
var <- liftIO newEmptyTMVarIO :: m (TMVar (Maybe r))
-- Start executing the handler
promise <- liftIO $ async (raiseErrors $ f (h (toTMVarConduit var)))
promise <- liftIO $ async (raiseErrors $ f (h req (toTMVarConduit var)))
-- Return the information
let readNext _
= do nextOutput <- liftIO $ atomically $ takeTMVar var
@ -340,7 +342,8 @@ instance (MonadIO m, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ())
=> GRpcMethodHandler p m '[ 'ArgSingle aname vref ] 'RetNothing (v -> m ()) where
gRpcMethodHandler f _ _ _ rpc h
= unary @m @_ @(GRpcIWTy p vref v) @()
(raiseErrors . f) rpc (\_ -> h . unGRpcIWTy (Proxy @p) (Proxy @vref))
(raiseErrors . f) rpc
(\req -> h req . unGRpcIWTy (Proxy @p) (Proxy @vref))
-----
@ -349,9 +352,9 @@ instance (MonadIO m, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r)
gRpcMethodHandler f _ _ _ rpc h
= unary @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(raiseErrors . f) rpc
(\_ -> (buildGRpcOWTy (Proxy @p) (Proxy @rref) <$>)
. h
. unGRpcIWTy (Proxy @p) (Proxy @vref))
(\req -> (buildGRpcOWTy (Proxy @p) (Proxy @rref) <$>)
. h req
. unGRpcIWTy (Proxy @p) (Proxy @vref))
-----
@ -361,14 +364,14 @@ instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
gRpcMethodHandler f _ _ _ rpc h
= serverStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(raiseErrors . f) rpc sstream
where sstream :: req -> GRpcIWTy p vref v
where sstream :: Request -> GRpcIWTy p vref v
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
sstream _ v = do
sstream req v = do
-- Variable to connect input and output
var <- liftIO newEmptyTMVarIO :: m (TMVar (Maybe r))
-- Start executing the handler
let v' = unGRpcIWTy (Proxy @p) (Proxy @vref) v
promise <- liftIO $ async (raiseErrors $ f (h v' (toTMVarConduit var)))
promise <- liftIO $ async (raiseErrors $ f (h req v' (toTMVarConduit var)))
-- Return the information
let readNext _
= do nextOutput <- liftIO $ atomically $ takeTMVar var
@ -386,14 +389,14 @@ instance (MonadIO m, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) (), MonadIO
gRpcMethodHandler f _ _ _ rpc h
= clientStream @m @_ @(GRpcIWTy p vref v) @()
(raiseErrors . f) rpc cstream
where cstream :: req
where cstream :: Request
-> m ((), ClientStream m (GRpcIWTy p vref v) () ())
cstream _ = do
cstream req = do
-- Create a new TMChan
chan <- liftIO newTMChanIO :: m (TMChan v)
let producer = sourceTMChan @m chan
-- Start executing the handler in another thread
promise <- liftIO $ async (raiseErrors $ f (h producer))
promise <- liftIO $ async (raiseErrors $ f (h req producer))
-- Build the actual handler
let cstreamHandler _ newInput
= liftIO $ atomically $
@ -411,15 +414,18 @@ instance (MonadIO m, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, Mona
gRpcMethodHandler f _ _ _ rpc h
= clientStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(raiseErrors . f) rpc cstream
where cstream :: req
where cstream :: Request
-> m ((), ClientStream m (GRpcIWTy p vref v)
(GRpcOWTy p rref r) ())
cstream _ = do
cstream req = do
-- Create a new TMChan
chan <- liftIO newTMChanIO :: m (TMChan v)
let producer = sourceTMChan @m chan
-- Start executing the handler in another thread
promise <- liftIO $ async (raiseErrors $ buildGRpcOWTy (Proxy @p) (Proxy @rref) <$> f (h producer))
promise <- liftIO $ async
(raiseErrors
$ buildGRpcOWTy (Proxy @p) (Proxy @rref)
<$> f (h req producer))
-- Build the actual handler
let cstreamHandler _ newInput
= liftIO $ atomically $
@ -437,15 +443,17 @@ instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
gRpcMethodHandler f _ _ _ rpc h
= generalStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(raiseErrors . f) rpc bdstream
where bdstream :: req -> m ( (), IncomingStream m (GRpcIWTy p vref v) ()
, (), OutgoingStream m (GRpcOWTy p rref r) () )
bdstream _ = do
where bdstream :: Request
-> m ( (), IncomingStream m (GRpcIWTy p vref v) ()
, (), OutgoingStream m (GRpcOWTy p rref r) () )
bdstream req = do
-- Create a new TMChan and a new variable
chan <- liftIO newTMChanIO :: m (TMChan v)
let producer = sourceTMChan @m chan
var <- liftIO newEmptyTMVarIO :: m (TMVar (Maybe r))
-- Start executing the handler
promise <- liftIO $ async (raiseErrors $ f $ h producer (toTMVarConduit var))
promise <- liftIO $ async
(raiseErrors $ f $ h req producer (toTMVarConduit var))
-- Build the actual handler
let cstreamHandler _ newInput
= liftIO $ atomically $

View File

@ -49,7 +49,7 @@ prometheusMetrics :: forall m a info. (MonadBaseControl IO m, MonadMonitor m)
prometheusMetrics metrics NoRpcInfo run = do
incGauge (activeCalls metrics)
run `finally` decGauge (activeCalls metrics)
prometheusMetrics metrics (RpcInfo _pkg (Service sname _) (Method mname _ _) _) run = do
prometheusMetrics metrics (RpcInfo _pkg (Service sname _) (Method mname _ _) _ _) run = do
incGauge (activeCalls metrics)
withLabel (messagesReceived metrics) (sname, mname) incCounter
( do -- We are forced to use a MVar because 'withLabel' only allows IO ()

View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright © 2019-2020 47 Degrees. <http://47deg.com>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1 @@
cradle: { stack: { component: "mu-tracing:lib" } }

View File

@ -0,0 +1,35 @@
name: mu-tracing
version: 0.4.0.0
synopsis: Tracing support for Mu
description:
Generate distributed traces for Mu services
license: Apache-2.0
license-file: LICENSE
author: Alejandro Serrano
maintainer: alejandro.serrano@47deg.com
copyright: Copyright © 2020 <http://47deg.com 47 Degrees>
category: Network
build-type: Simple
cabal-version: >=1.10
homepage: https://higherkindness.io/mu-haskell/
bug-reports: https://github.com/higherkindness/mu-haskell/issues
source-repository head
type: git
location: https://github.com/higherkindness/mu-haskell
library
exposed-modules:
Mu.Instrumentation.Tracing
build-depends:
base >=4.12 && <5
, containers >=0.6 && <0.7
, mu-rpc >=0.4.0
, text >=1.2 && <2
, tracing-control >=0.0.6
hs-source-dirs: src
default-language: Haskell2010
ghc-options: -Wall -fprint-potential-instances

View File

@ -0,0 +1,80 @@
{-# language FlexibleInstances #-}
{-# language MultiParamTypeClasses #-}
{-# language OverloadedStrings #-}
{-# language PolyKinds #-}
{-# language UndecidableInstances #-}
{-# language ViewPatterns #-}
{-|
Description : Distributed tracing for Mu
This module injects distributed tracing
for Mu servers. Currently it only supports
Zipkin as backend.
In order to use this module, you need to
follow these steps:
1. Establish a connection with 'newZipkin'.
2. Wrap the server using 'zipkin', giving
information for the root.
3. Run the server using the transformer version
of your protocol, like |grpcAppTrans|.
-}
module Mu.Instrumentation.Tracing (
-- * Distributed tracing
MuTracing(..)
, zipkin
, runZipkin
-- ** Establish connection
, newZipkin
, defaultZipkinSettings
, Settings(..)
-- * Useful re-exports
, module Monitor.Tracing
) where
import Control.Applicative ((<|>))
import Control.Monad.IO.Class
import Control.Monad.Trace
import Control.Monad.Trace.Class
import qualified Data.Map.Strict as M
import Data.Text
import Monitor.Tracing
import Monitor.Tracing.Zipkin
import Mu.Rpc
import Mu.Server
data MuTracing
= MuTracing {
samplingPolicy :: SamplingPolicy
, rootName :: Text
}
-- | Runs with a given 'Zipkin' connection.
-- You can create one with 'newZipkin'.
runZipkin :: Zipkin -> TraceT m a -> m a
runZipkin = flip run
-- | Create a new connection to 'Zipkin'.
newZipkin :: Settings -> IO Zipkin
newZipkin = new
defaultZipkinSettings :: Settings
defaultZipkinSettings = defaultSettings
-- | Wraps a server to do distributed tracing
-- using 'Zipkin' as backend.
zipkin :: (MonadIO m, MonadTrace m)
=> MuTracing -> ServerT chn i p m topHs -> ServerT chn i p m topHs
zipkin m = wrapServer (zipkinTracing m)
zipkinTracing :: (MonadIO m, MonadTrace m)
=> MuTracing -> RpcInfo i -> m a -> m a
zipkinTracing zpk NoRpcInfo h =
rootSpan (samplingPolicy zpk) (rootName zpk) h
zipkinTracing zpk (RpcInfo _ _ _ (M.fromList -> hdrs) _) h =
case getB3 of
Nothing -> rootSpan (samplingPolicy zpk) (rootName zpk) h
Just spn -> serverSpan spn h
where getB3 = (b3FromHeaderValue =<< M.lookup "b3" hdrs)
<|> b3FromHeaders hdrs

View File

@ -14,6 +14,7 @@ packages:
- grpc/server
- graphql
- instrumentation/prometheus
- instrumentation/tracing
- examples/health-check
- examples/route-guide
- examples/seed
@ -30,6 +31,7 @@ extra-deps:
- warp-grpc-0.4.0.1
- hw-kafka-client-3.1.1
- hw-kafka-conduit-2.7.0
- tracing-control-0.0.6
- wai-middleware-prometheus-1.0.0
- git: https://github.com/hasura/graphql-parser-hs.git
commit: f4a093981ca5626982a17c2bfaad047cc0834a81

View File

@ -14,6 +14,7 @@ packages:
- grpc/server
- graphql
- instrumentation/prometheus
- instrumentation/tracing
- examples/health-check
- examples/route-guide
- examples/seed
@ -30,6 +31,7 @@ extra-deps:
- warp-grpc-0.4.0.1
- hw-kafka-client-3.1.1
- hw-kafka-conduit-2.7.0
- tracing-control-0.0.6
- wai-middleware-prometheus-1.0.0
- git: https://github.com/hasura/graphql-parser-hs.git
commit: f4a093981ca5626982a17c2bfaad047cc0834a81

View File

@ -14,6 +14,7 @@ packages:
- grpc/server
- graphql
- instrumentation/prometheus
- instrumentation/tracing
- examples/health-check
- examples/route-guide
- examples/seed
@ -30,6 +31,7 @@ extra-deps:
- warp-grpc-0.4.0.1
- hw-kafka-client-3.1.1
- hw-kafka-conduit-2.7.0
- tracing-control-0.0.6
- wai-middleware-prometheus-1.0.0
- git: https://github.com/hasura/graphql-parser-hs.git
commit: f4a093981ca5626982a17c2bfaad047cc0834a81
@ -38,3 +40,4 @@ extra-deps:
- primitive-unlifted-0.1.3.0
- stm-hamt-1.2.0.4
- stm-containers-1.1.0.4
- stm-lifted-2.5.0.0