Add support for logging and other transformers (#35)

Fixes #32
This commit is contained in:
Alejandro Serrano 2019-12-05 14:03:10 +01:00 committed by GitHub
parent fa2ade13db
commit 0ef0f9f7f4
7 changed files with 127 additions and 91 deletions

View File

@ -1,6 +1,7 @@
{-# language DataKinds #-}
{-# language DeriveAnyClass #-}
{-# language DeriveGeneric #-}
{-# language FlexibleContexts #-}
{-# language FlexibleInstances #-}
{-# language GADTs #-}
{-# language MultiParamTypeClasses #-}
@ -9,7 +10,7 @@
{-# language PolyKinds #-}
{-# language TypeFamilies #-}
{-# language TypeOperators #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# OPTIONS_GHC -fno-warn-partial-type-signatures #-}
module Mu.Rpc.Examples where
import Data.Conduit
@ -52,19 +53,21 @@ newtype HelloResponse = HelloResponse { message :: T.Text }
newtype HiRequest = HiRequest { number :: Int }
deriving (Generic, HasSchema QuickstartSchema "HiRequest")
quickstartServer :: ServerIO QuickStartService _
quickstartServer :: (MonadServer m) => ServerT QuickStartService m _
quickstartServer
= Server (sayHello :<|>: sayHi :<|>: sayManyHellos :<|>: H0)
where sayHello :: HelloRequest -> ServerErrorIO HelloResponse
where sayHello :: (Monad m) => HelloRequest -> m HelloResponse
sayHello (HelloRequest nm)
= return (HelloResponse ("hi, " <> nm))
sayHi :: HiRequest
-> ConduitT HelloResponse Void ServerErrorIO ()
-> ServerErrorIO ()
sayHi :: (MonadServer m)
=> HiRequest
-> ConduitT HelloResponse Void m ()
-> m ()
sayHi (HiRequest n) sink
= runConduit $ C.replicate n (HelloResponse "hi!") .| sink
sayManyHellos :: ConduitT () HelloRequest ServerErrorIO ()
-> ConduitT HelloResponse Void ServerErrorIO ()
-> ServerErrorIO ()
sayManyHellos :: (MonadServer m)
=> ConduitT () HelloRequest m ()
-> ConduitT HelloResponse Void m ()
-> m ()
sayManyHellos source sink
= runConduit $ source .| C.mapM sayHello .| sink

View File

@ -1,6 +1,7 @@
{-# language ConstraintKinds #-}
{-# language DataKinds #-}
{-# language ExistentialQuantification #-}
{-# language FlexibleContexts #-}
{-# language FlexibleInstances #-}
{-# language GADTs #-}
{-# language MultiParamTypeClasses #-}
@ -27,10 +28,11 @@
-- and long type you would need to write there otherwise.
module Mu.Server (
-- * Servers and handlers
ServerIO, ServerT(..)
, HandlersIO, HandlersT(..)
MonadServer, ServerT(..), HandlersT(..)
-- ** Simple servers using only IO
, ServerErrorIO, ServerIO
-- * Errors which might be raised
, serverError, ServerErrorIO, ServerError(..), ServerErrorCode(..)
, serverError, ServerError(..), ServerErrorCode(..)
-- ** Useful when you do not want to deal with errors
, alwaysOk
) where
@ -42,10 +44,17 @@ import Data.Kind
import Mu.Rpc
import Mu.Schema
serverError :: ServerError -> ServerErrorIO a
-- | Constraint for monads that can be used as servers
type MonadServer m = (MonadError ServerError m, MonadIO m)
type ServerErrorIO = ExceptT ServerError IO
type ServerIO srv = ServerT srv ServerErrorIO
serverError :: (MonadError ServerError m)
=> ServerError -> m a
serverError = throwError
alwaysOk :: IO a -> ServerErrorIO a
alwaysOk :: (MonadIO m)
=> IO a -> m a
alwaysOk = liftIO
data ServerError
@ -61,18 +70,14 @@ data ServerErrorCode
| NotFound
deriving (Eq, Show)
type ServerErrorIO = ExceptT ServerError IO
data ServerT (s :: Service snm mnm) (m :: Type -> Type) (hs :: [Type]) where
Server :: HandlersT methods m hs -> ServerT ('Service sname anns methods) m hs
type ServerIO service = ServerT service ServerErrorIO
infixr 5 :<|>:
data HandlersT (methods :: [Method mnm]) (m :: Type -> Type) (hs :: [Type]) where
H0 :: HandlersT '[] m '[]
(:<|>:) :: Handles args ret m h => h -> HandlersT ms m hs
-> HandlersT ('Method name anns args ret ': ms) m (h ': hs)
type HandlersIO methods = HandlersT methods ServerErrorIO
-- Define a relation for handling
class Handles (args :: [Argument]) (ret :: Return)

View File

@ -1,5 +1,7 @@
{-# language OverloadedStrings #-}
{-# language PartialTypeSignatures #-}
{-# OPTIONS_GHC -fno-warn-partial-type-signatures #-}
module Main where
import Control.Concurrent.STM

View File

@ -2,6 +2,8 @@
{-# language OverloadedStrings #-}
{-# language PartialTypeSignatures #-}
{-# language ScopedTypeVariables #-}
{-# OPTIONS_GHC -fno-warn-partial-type-signatures #-}
module Main where
import Control.Concurrent.Async

View File

@ -20,6 +20,7 @@ executable seed-server
default-language: Haskell2010
build-depends: base >= 4.12 && < 5
, conduit
, monad-logger
, mu-schema
, mu-rpc
, mu-protobuf

View File

@ -1,11 +1,16 @@
{-# language FlexibleContexts #-}
{-# language OverloadedStrings #-}
{-# language PartialTypeSignatures #-}
{-# OPTIONS_GHC -fno-warn-partial-type-signatures #-}
module Main where
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Logger
import Data.Conduit
import Data.Conduit.Combinators as C
import Data.Text as T
import Mu.GRpc.Server
import Mu.Server
@ -14,25 +19,27 @@ import Schema
main :: IO ()
main = do
putStrLn "running seed application"
runGRpcApp 8080 server
runGRpcAppTrans 8080 runStderrLoggingT server
-- Server implementation
-- https://github.com/higherkindness/mu/blob/master/modules/examples/seed/server/modules/process/src/main/scala/example/seed/server/process/ProtoPeopleServiceHandler.scala
server :: ServerIO PeopleService _
server :: (MonadServer m, MonadLogger m) => ServerT PeopleService m _
server = Server (getPerson :<|>: getPersonStream :<|>: H0)
evolvePerson :: PeopleRequest -> PeopleResponse
evolvePerson (PeopleRequest n) = PeopleResponse $ Person n 18
getPerson :: PeopleRequest -> ServerErrorIO PeopleResponse
getPerson = return . evolvePerson
getPerson :: Monad m => PeopleRequest -> m PeopleResponse
getPerson = pure . evolvePerson
getPersonStream :: ConduitT () PeopleRequest ServerErrorIO ()
-> ConduitT PeopleResponse Void ServerErrorIO ()
-> ServerErrorIO ()
getPersonStream :: (MonadServer m, MonadLogger m)
=> ConduitT () PeopleRequest m ()
-> ConduitT PeopleResponse Void m ()
-> m ()
getPersonStream source sink = runConduit $ source .| C.mapM reStream .| sink
where
reStream req = do
liftIO $ threadDelay (2 * 1000 * 1000) -- 2 sec
return $ evolvePerson req
logDebugN $ T.pack $ "stream request: " ++ show req
pure $ evolvePerson req

View File

@ -4,14 +4,15 @@
{-# language GADTs #-}
{-# language MultiParamTypeClasses #-}
{-# language PolyKinds #-}
{-# language RankNTypes #-}
{-# language ScopedTypeVariables #-}
{-# language TypeApplications #-}
{-# language TypeOperators #-}
{-# language UndecidableInstances #-}
-- | Execute a Mu 'Server' using gRPC as transport layer
module Mu.GRpc.Server (
-- * Run a 'Server' directly
runGRpcApp
module Mu.GRpc.Server
( -- * Run a 'Server' directly
runGRpcApp, runGRpcAppTrans
, runGRpcAppSettings, Settings
, runGRpcAppTLS, TLSSettings
-- * Convert a 'Server' into a WAI application
@ -30,7 +31,7 @@ import Data.Kind
import Data.Proxy
import Network.GRPC.HTTP2.Encoding (gzip, uncompressed)
import Network.GRPC.HTTP2.Proto3Wire
import Network.GRPC.HTTP2.Types (GRPCStatus(..), GRPCStatusCode (..))
import Network.GRPC.HTTP2.Types (GRPCStatus (..), GRPCStatusCode (..))
import Network.GRPC.Server.Handlers
import Network.GRPC.Server.Wai (ServiceHandler)
import Network.GRPC.Server.Wai as Wai
@ -46,20 +47,33 @@ import Mu.Server
-- | Run a Mu 'Server' on the given port.
runGRpcApp
:: ( KnownName name, KnownName (FindPackageName anns)
, GRpcMethodHandlers methods handlers )
=> Port -> ServerIO ('Service name anns methods) handlers
, GRpcMethodHandlers ServerErrorIO methods handlers )
=> Port
-> ServerT ('Service name anns methods) ServerErrorIO handlers
-> IO ()
runGRpcApp port svr = run port (gRpcApp svr)
runGRpcApp port = runGRpcAppTrans port id
-- | Run a Mu 'Server' on the given port.
runGRpcAppTrans
:: ( KnownName name, KnownName (FindPackageName anns)
, GRpcMethodHandlers m methods handlers )
=> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT ('Service name anns methods) m handlers
-> IO ()
runGRpcAppTrans port f svr = run port (gRpcApp f svr)
-- | Run a Mu 'Server' using the given 'Settings'.
--
-- Go to 'Network.Wai.Handler.Warp' to declare 'Settings'.
runGRpcAppSettings
:: ( KnownName name, KnownName (FindPackageName anns)
, GRpcMethodHandlers methods handlers )
=> Settings -> ServerIO ('Service name anns methods) handlers
, GRpcMethodHandlers m methods handlers )
=> Settings
-> (forall a. m a -> ServerErrorIO a)
-> ServerT ('Service name anns methods) m handlers
-> IO ()
runGRpcAppSettings st svr = runSettings st (gRpcApp svr)
runGRpcAppSettings st f svr = runSettings st (gRpcApp f svr)
-- | Run a Mu 'Server' using the given 'TLSSettings' and 'Settings'.
--
@ -67,11 +81,12 @@ runGRpcAppSettings st svr = runSettings st (gRpcApp svr)
-- and to 'Network.Wai.Handler.Warp' to declare 'Settings'.
runGRpcAppTLS
:: ( KnownName name, KnownName (FindPackageName anns)
, GRpcMethodHandlers methods handlers )
, GRpcMethodHandlers m methods handlers )
=> TLSSettings -> Settings
-> ServerIO ('Service name anns methods) handlers
-> (forall a. m a -> ServerErrorIO a)
-> ServerT ('Service name anns methods) m handlers
-> IO ()
runGRpcAppTLS tls st svr = runTLS tls st (gRpcApp svr)
runGRpcAppTLS tls st f svr = runTLS tls st (gRpcApp f svr)
-- | Turn a Mu 'Server' into a WAI 'Application'.
--
@ -79,36 +94,40 @@ runGRpcAppTLS tls st svr = runTLS tls st (gRpcApp svr)
-- for example, @wai-routes@, or you can add middleware
-- from @wai-extra@, among others.
gRpcApp
:: (KnownName name, KnownName (FindPackageName anns), GRpcMethodHandlers methods handlers)
=> ServerIO ('Service name anns methods) handlers
:: (KnownName name, KnownName (FindPackageName anns), GRpcMethodHandlers m methods handlers)
=> (forall a. m a -> ServerErrorIO a)
-> ServerT ('Service name anns methods) m handlers
-> Application
gRpcApp svr = Wai.grpcApp [uncompressed, gzip]
(gRpcServiceHandlers svr)
gRpcApp f svr = Wai.grpcApp [uncompressed, gzip]
(gRpcServiceHandlers f svr)
gRpcServiceHandlers
:: forall name anns methods handlers.
(KnownName name, KnownName (FindPackageName anns), GRpcMethodHandlers methods handlers)
=> ServerIO ('Service name anns methods) handlers
:: forall name anns methods handlers m.
(KnownName name, KnownName (FindPackageName anns), GRpcMethodHandlers m methods handlers)
=> (forall a. m a -> ServerErrorIO a)
-> ServerT ('Service name anns methods) m handlers
-> [ServiceHandler]
gRpcServiceHandlers (Server svr) = gRpcMethodHandlers packageName serviceName svr
gRpcServiceHandlers f (Server svr) = gRpcMethodHandlers f packageName serviceName svr
where packageName = BS.pack (nameVal (Proxy @(FindPackageName anns)))
serviceName = BS.pack (nameVal (Proxy @name))
class GRpcMethodHandlers (ms :: [Method mnm]) (hs :: [Type]) where
gRpcMethodHandlers :: ByteString -> ByteString
-> HandlersIO ms hs -> [ServiceHandler]
class GRpcMethodHandlers (m :: Type -> Type) (ms :: [Method mnm]) (hs :: [Type]) where
gRpcMethodHandlers :: (forall a. m a -> ServerErrorIO a)
-> ByteString -> ByteString
-> HandlersT ms m hs -> [ServiceHandler]
instance GRpcMethodHandlers '[] '[] where
gRpcMethodHandlers _ _ H0 = []
instance (KnownName name, GRpcMethodHandler args r h, GRpcMethodHandlers rest hs)
=> GRpcMethodHandlers ('Method name anns args r ': rest) (h ': hs) where
gRpcMethodHandlers p s (h :<|>: rest)
= gRpcMethodHandler (Proxy @args) (Proxy @r) (RPC p s methodName) h
: gRpcMethodHandlers p s rest
instance GRpcMethodHandlers m '[] '[] where
gRpcMethodHandlers _ _ _ H0 = []
instance (KnownName name, GRpcMethodHandler m args r h, GRpcMethodHandlers m rest hs)
=> GRpcMethodHandlers m ('Method name anns args r ': rest) (h ': hs) where
gRpcMethodHandlers f p s (h :<|>: rest)
= gRpcMethodHandler f (Proxy @args) (Proxy @r) (RPC p s methodName) h
: gRpcMethodHandlers f p s rest
where methodName = BS.pack (nameVal (Proxy @name))
class GRpcMethodHandler args r h where
gRpcMethodHandler :: Proxy args -> Proxy r -> RPC -> h -> ServiceHandler
class GRpcMethodHandler m args r h where
gRpcMethodHandler :: (forall a. m a -> ServerErrorIO a)
-> Proxy args -> Proxy r -> RPC -> h -> ServiceHandler
raiseErrors :: ServerErrorIO a -> IO a
raiseErrors h
@ -128,33 +147,32 @@ raiseErrors h
serverErrorToGRpcError NotFound = NOT_FOUND
serverErrorToGRpcError Invalid = INVALID_ARGUMENT
instance GRpcMethodHandler '[ ] 'RetNothing (ServerErrorIO ()) where
gRpcMethodHandler _ _ rpc h
= unary @_ @() @() rpc (\_ _ -> raiseErrors h)
instance GRpcMethodHandler m '[ ] 'RetNothing (m ()) where
gRpcMethodHandler f _ _ rpc h
= unary @_ @() @() rpc (\_ _ -> raiseErrors (f h))
instance (ProtoBufTypeRef rref r)
=> GRpcMethodHandler '[ ] ('RetSingle rref) (ServerErrorIO r) where
gRpcMethodHandler _ _ rpc h
=> GRpcMethodHandler m '[ ] ('RetSingle rref) (m r) where
gRpcMethodHandler f _ _ rpc h
= unary @_ @() @(ViaProtoBufTypeRef rref r)
rpc (\_ _ -> ViaProtoBufTypeRef <$> raiseErrors h)
rpc (\_ _ -> ViaProtoBufTypeRef <$> raiseErrors (f h))
instance (ProtoBufTypeRef vref v)
=> GRpcMethodHandler '[ 'ArgSingle vref ] 'RetNothing (v -> ServerErrorIO ()) where
gRpcMethodHandler _ _ rpc h
=> GRpcMethodHandler m '[ 'ArgSingle vref ] 'RetNothing (v -> m ()) where
gRpcMethodHandler f _ _ rpc h
= unary @_ @(ViaProtoBufTypeRef vref v) @()
rpc (\_ -> raiseErrors . h . unViaProtoBufTypeRef)
rpc (\_ -> raiseErrors . f . h . unViaProtoBufTypeRef)
instance (ProtoBufTypeRef vref v, ProtoBufTypeRef rref r)
=> GRpcMethodHandler '[ 'ArgSingle vref ] ('RetSingle rref)
(v -> ServerErrorIO r) where
gRpcMethodHandler _ _ rpc h
=> GRpcMethodHandler m '[ 'ArgSingle vref ] ('RetSingle rref) (v -> m r) where
gRpcMethodHandler f _ _ rpc h
= unary @_ @(ViaProtoBufTypeRef vref v) @(ViaProtoBufTypeRef rref r)
rpc (\_ -> (ViaProtoBufTypeRef <$>) . raiseErrors . h . unViaProtoBufTypeRef)
rpc (\_ -> (ViaProtoBufTypeRef <$>) . raiseErrors . f . h . unViaProtoBufTypeRef)
instance (ProtoBufTypeRef vref v, ProtoBufTypeRef rref r)
=> GRpcMethodHandler '[ 'ArgStream vref ] ('RetSingle rref)
(ConduitT () v ServerErrorIO () -> ServerErrorIO r) where
gRpcMethodHandler _ _ rpc h
instance (ProtoBufTypeRef vref v, ProtoBufTypeRef rref r, MonadIO m)
=> GRpcMethodHandler m '[ 'ArgStream vref ] ('RetSingle rref)
(ConduitT () v m () -> m r) where
gRpcMethodHandler f _ _ rpc h
= clientStream @_ @(ViaProtoBufTypeRef vref v) @(ViaProtoBufTypeRef rref r)
rpc cstream
where cstream :: req
@ -163,9 +181,9 @@ instance (ProtoBufTypeRef vref v, ProtoBufTypeRef rref r)
cstream _ = do
-- Create a new TMChan
chan <- newTMChanIO :: IO (TMChan v)
let producer = sourceTMChan @ServerErrorIO chan
let producer = sourceTMChan @m chan
-- Start executing the handler in another thread
promise <- async (raiseErrors $ ViaProtoBufTypeRef <$> h producer)
promise <- async (raiseErrors $ ViaProtoBufTypeRef <$> f (h producer))
-- Build the actual handler
let cstreamHandler _ (ViaProtoBufTypeRef newInput)
= atomically (writeTMChan chan newInput)
@ -174,10 +192,10 @@ instance (ProtoBufTypeRef vref v, ProtoBufTypeRef rref r)
-- Return the information
return ((), ClientStream cstreamHandler cstreamFinalizer)
instance (ProtoBufTypeRef vref v, ProtoBufTypeRef rref r)
=> GRpcMethodHandler '[ 'ArgSingle vref ] ('RetStream rref)
(v -> ConduitT r Void ServerErrorIO () -> ServerErrorIO ()) where
gRpcMethodHandler _ _ rpc h
instance (ProtoBufTypeRef vref v, ProtoBufTypeRef rref r, MonadIO m)
=> GRpcMethodHandler m '[ 'ArgSingle vref ] ('RetStream rref)
(v -> ConduitT r Void m () -> m ()) where
gRpcMethodHandler f _ _ rpc h
= serverStream @_ @(ViaProtoBufTypeRef vref v) @(ViaProtoBufTypeRef rref r)
rpc sstream
where sstream :: req -> ViaProtoBufTypeRef vref v
@ -186,7 +204,7 @@ instance (ProtoBufTypeRef vref v, ProtoBufTypeRef rref r)
-- Variable to connect input and output
var <- newEmptyTMVarIO :: IO (TMVar (Maybe r))
-- Start executing the handler
promise <- async (raiseErrors $ ViaProtoBufTypeRef <$> h v (toTMVarConduit var))
promise <- async (raiseErrors $ ViaProtoBufTypeRef <$> f (h v (toTMVarConduit var)))
-- Return the information
let readNext _
= do nextOutput <- atomically $ takeTMVar var
@ -196,12 +214,10 @@ instance (ProtoBufTypeRef vref v, ProtoBufTypeRef rref r)
return Nothing
return ((), ServerStream readNext)
instance (ProtoBufTypeRef vref v, ProtoBufTypeRef rref r)
=> GRpcMethodHandler '[ 'ArgStream vref ] ('RetStream rref)
(ConduitT () v ServerErrorIO ()
-> ConduitT r Void ServerErrorIO ()
-> ServerErrorIO ()) where
gRpcMethodHandler _ _ rpc h
instance (ProtoBufTypeRef vref v, ProtoBufTypeRef rref r, MonadIO m)
=> GRpcMethodHandler m '[ 'ArgStream vref ] ('RetStream rref)
(ConduitT () v m () -> ConduitT r Void m () -> m ()) where
gRpcMethodHandler f _ _ rpc h
= generalStream @_ @(ViaProtoBufTypeRef vref v) @(ViaProtoBufTypeRef rref r)
rpc bdstream
where bdstream :: req -> IO ( (), IncomingStream (ViaProtoBufTypeRef vref v) ()
@ -209,10 +225,10 @@ instance (ProtoBufTypeRef vref v, ProtoBufTypeRef rref r)
bdstream _ = do
-- Create a new TMChan and a new variable
chan <- newTMChanIO :: IO (TMChan v)
let producer = sourceTMChan @ServerErrorIO chan
let producer = sourceTMChan @m chan
var <- newEmptyTMVarIO :: IO (TMVar (Maybe r))
-- Start executing the handler
promise <- async (raiseErrors $ h producer (toTMVarConduit var))
promise <- async (raiseErrors $ f $ h producer (toTMVarConduit var))
-- Build the actual handler
let cstreamHandler _ (ViaProtoBufTypeRef newInput)
= atomically (writeTMChan chan newInput)