Update to warp-grpc 0.4 and fix multi-servers (#151)

This commit is contained in:
Alejandro Serrano 2020-03-23 15:04:32 +01:00 committed by GitHub
parent bfdf5f432d
commit c0db0980c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 131 additions and 114 deletions

View File

@ -1,5 +1,5 @@
{ nixpkgs ? (fetchTarball https://github.com/NixOS/nixpkgs/archive/484ea7bbac5bf7f958b0bb314a3c130b6c328800.tar.gz)
, pkgs ? import nixpkgs (import (builtins.fetchTarball https://github.com/input-output-hk/haskell.nix/archive/3db580a.tar.gz))
{ nixpkgs ? (fetchTarball https://github.com/NixOS/nixpkgs/archive/3578bb5.tar.gz)
, pkgs ? import nixpkgs (import (builtins.fetchTarball https://github.com/input-output-hk/haskell.nix/archive/350b594.tar.gz))
}:
let

View File

@ -21,7 +21,8 @@ executable health-server
main-is: Server.hs
other-modules: Definition
build-depends:
base >=4.12 && <5
async
, base >=4.12 && <5
, conduit
, deferred-folds
, mu-graphql
@ -34,8 +35,6 @@ executable health-server
, stm-containers
, text
, wai
, wai-route
, warp
hs-source-dirs: src
default-language: Haskell2010

View File

@ -6,6 +6,7 @@
module Main where
import Control.Concurrent.Async
import Control.Concurrent.STM
import Data.Conduit
import qualified Data.Conduit.Combinators as C
@ -14,8 +15,6 @@ import Data.Maybe (fromMaybe)
import Data.Proxy
import qualified Data.Text as T
import DeferredFolds.UnfoldlM
import Network.Wai.Handler.Warp
import Network.Wai.Route
import qualified StmContainers.Map as M
import Mu.GraphQL.Server
@ -30,14 +29,10 @@ main = do
upd <- newTBMChanIO 100
putStrLn "running health check application"
let s = server m upd
run 50051 $ flip route app404 $ compileRoutes
[ defRoute (str "proto" ./ end) $
\_ -> gRpcApp msgProtoBuf s
, defRoute (str "avro" ./ end) $
\_ -> gRpcApp msgAvro s
, defRoute (str "graphql" ./ end) $
\_ -> graphQLAppQuery s (Proxy @"HealthCheckServiceFS2")
]
runConcurrently $ (\_ _ _ -> ())
<$> Concurrently (runGRpcApp msgProtoBuf 50051 s)
<*> Concurrently (runGRpcApp msgAvro 50052 s)
<*> Concurrently (runGraphQLAppQuery 50053 s (Proxy @"HealthCheckServiceFS2"))
-- Server implementation
-- https://github.com/higherkindness/mu/blob/master/modules/health-check-unary/src/main/scala/higherkindness/mu/rpc/healthcheck/unary/handler/HealthServiceImpl.scala

View File

@ -27,7 +27,8 @@ executable seed-server
other-modules: Schema
default-language: Haskell2010
build-depends:
base >=4.12 && <5
async
, base >=4.12 && <5
, conduit
, monad-logger
, mu-graphql
@ -38,8 +39,6 @@ executable seed-server
, stm
, text
, wai
, wai-route
, warp
executable seed-server-optics
hs-source-dirs: src

View File

@ -11,6 +11,7 @@
module Main where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Logger
import Data.Conduit
@ -23,8 +24,6 @@ import Mu.GRpc.Server
import Mu.Schema
import Mu.Server
import Network.Wai
import Network.Wai.Handler.Warp
import Network.Wai.Route
import Schema
@ -50,14 +49,12 @@ newtype PeopleResponse = PeopleResponse
main :: IO ()
main = do
putStrLn "running seed application"
run 8080 $ flip route app404 $ compileRoutes
[ defRoute (str "proto" ./ end) $
\_ -> gRpcAppTrans msgProtoBuf runStderrLoggingT server
, defRoute (str "avro" ./ end) $
\_ -> gRpcAppTrans msgAvro runStderrLoggingT server
, defRoute (str "graphql" ./ end) $
\_ -> graphQLAppTransQuery runStderrLoggingT server (Proxy @"PeopleService")
]
runConcurrently $ (\_ _ _ -> ())
<$> Concurrently (runGRpcAppTrans msgProtoBuf 8080 runStderrLoggingT server)
<*> Concurrently (runGRpcAppTrans msgAvro 8081 runStderrLoggingT server)
<*> Concurrently (runGraphQLAppTrans 50053 runStderrLoggingT server
(Proxy @('Just "PeopleService"))
(Proxy @'Nothing) (Proxy @'Nothing))
-- Server implementation

View File

@ -12,6 +12,8 @@ module Mu.GraphQL.Server (
-- * Run an GraphQL resolver directly
, runGraphQLApp
, runGraphQLAppSettings
, runGraphQLAppQuery
, runGraphQLAppTrans
-- * Build a WAI 'Application'
, graphQLApp
, graphQLAppTrans
@ -177,3 +179,24 @@ runGraphQLApp ::
-> Proxy sub
-> IO ()
runGraphQLApp port svr q m s = run port (graphQLApp svr q m s)
-- | Run a Mu 'graphQLApp' on a transformer stack on the given port.
runGraphQLAppTrans ::
( GraphQLApp p qr mut sub m chn hs )
=> Port
-> (forall a. m a -> ServerErrorIO a)
-> ServerT chn p m hs
-> Proxy qr
-> Proxy mut
-> Proxy sub
-> IO ()
runGraphQLAppTrans port f svr q m s = run port (graphQLAppTrans f svr q m s)
-- | Run a query-only Mu 'graphQLApp' on the given port.
runGraphQLAppQuery ::
( GraphQLApp p ('Just qr) 'Nothing 'Nothing ServerErrorIO chn hs )
=> Port
-> ServerT chn p ServerErrorIO hs
-> Proxy qr
-> IO ()
runGraphQLAppQuery port svr q = run port (graphQLAppQuery svr q)

View File

@ -36,28 +36,28 @@ module Mu.GRpc.Server
) where
import Control.Concurrent.Async
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMVar
import Control.Exception
import Control.Monad.Except
import Data.Avro
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BS
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BS
import Data.Conduit
import Data.Conduit.TMChan
import Data.Kind
import Data.Proxy
import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput, gzip, uncompressed)
import Network.GRPC.HTTP2.Types (GRPCStatus (..), GRPCStatusCode (..))
import Network.GRPC.Server.Handlers
import Network.GRPC.Server.Wai as Wai
import Network.Wai (Application)
import Network.Wai.Handler.Warp (Port, Settings, run, runSettings)
import Network.Wai.Handler.WarpTLS (TLSSettings, runTLS)
import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput, gzip, uncompressed)
import Network.GRPC.HTTP2.Types (GRPCStatus (..), GRPCStatusCode (..))
import Network.GRPC.Server.Handlers.Trans
import Network.GRPC.Server.Wai as Wai
import Network.Wai (Application)
import Network.Wai.Handler.Warp (Port, Settings, run, runSettings)
import Network.Wai.Handler.WarpTLS (TLSSettings, runTLS)
import Mu.Adapter.ProtoBuf.Via
import Mu.GRpc.Avro
import qualified Mu.GRpc.Avro as Avro
import qualified Mu.GRpc.Avro as Avro
import Mu.GRpc.Bridge
import Mu.Rpc
import Mu.Schema
@ -271,84 +271,87 @@ instance forall (sch :: Schema') sty (r :: Type).
---
instance (GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ())
instance (MonadIO m, GRPCInput (RPCTy p) (), GRPCOutput (RPCTy p) ())
=> GRpcMethodHandler p m '[ ] 'RetNothing (m ()) where
gRpcMethodHandler f _ _ _ rpc h
= unary @_ @() @() rpc (\_ _ -> raiseErrors (f h))
= unary @m @_ @() @() (raiseErrors . f) rpc (\_ _ -> h)
-----
instance (GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r)
instance (MonadIO m, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r)
=> GRpcMethodHandler p m '[ ] ('RetSingle rref) (m r) where
gRpcMethodHandler f _ _ _ rpc h
= unary @_ @() @(GRpcOWTy p rref r)
rpc (\_ _ -> buildGRpcOWTy (Proxy @p) (Proxy @rref) <$> raiseErrors (f h))
= unary @m @_ @() @(GRpcOWTy p rref r)
(raiseErrors . f) rpc (\_ _ -> buildGRpcOWTy (Proxy @p) (Proxy @rref) <$> h)
-----
instance (GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r, MonadIO m)
instance (MonadIO m, GRPCInput (RPCTy p) (), GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ ] ('RetStream rref)
(ConduitT r Void m () -> m ()) where
gRpcMethodHandler f _ _ _ rpc h
= serverStream @_ @() @(GRpcOWTy p rref r) rpc sstream
= serverStream @m @_ @() @(GRpcOWTy p rref r) (raiseErrors . f) rpc sstream
where sstream :: req -> ()
-> IO ((), ServerStream (GRpcOWTy p rref r) ())
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
sstream _ _ = do
-- Variable to connect input and output
var <- newEmptyTMVarIO :: IO (TMVar (Maybe r))
var <- liftIO newEmptyTMVarIO :: m (TMVar (Maybe r))
-- Start executing the handler
promise <- async (raiseErrors $ f (h (toTMVarConduit var)))
liftIO $ withAsync (raiseErrors $ f (h (toTMVarConduit var))) $ \promise -> do
-- Return the information
let readNext _
= do nextOutput <- atomically $ takeTMVar var
case nextOutput of
Just o -> pure $ Just ((), buildGRpcOWTy (Proxy @p) (Proxy @rref) o)
Nothing -> do cancel promise
pure Nothing
pure ((), ServerStream readNext)
let readNext _
= do nextOutput <- liftIO $ atomically $ takeTMVar var
case nextOutput of
Just o -> pure $ Just ((), buildGRpcOWTy (Proxy @p) (Proxy @rref) o)
Nothing -> do liftIO $ cancel promise
pure Nothing
pure ((), ServerStream readNext)
-----
instance (GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ())
instance (MonadIO m, GRpcInputWrapper p vref v, GRPCOutput (RPCTy p) ())
=> GRpcMethodHandler p m '[ 'ArgSingle aname anns vref ] 'RetNothing (v -> m ()) where
gRpcMethodHandler f _ _ _ rpc h
= unary @_ @(GRpcIWTy p vref v) @()
rpc (\_ -> raiseErrors . f . h . unGRpcIWTy (Proxy @p) (Proxy @vref))
= unary @m @_ @(GRpcIWTy p vref v) @()
(raiseErrors . f) rpc (\_ -> h . unGRpcIWTy (Proxy @p) (Proxy @vref))
-----
instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r)
instance (MonadIO m, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r)
=> GRpcMethodHandler p m '[ 'ArgSingle aname anns vref ] ('RetSingle rref) (v -> m r) where
gRpcMethodHandler f _ _ _ rpc h
= unary @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
rpc (\_ -> (buildGRpcOWTy (Proxy @p) (Proxy @rref) <$>)
. raiseErrors . f . h
. unGRpcIWTy (Proxy @p) (Proxy @vref))
= unary @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(raiseErrors . f) rpc
(\_ -> (buildGRpcOWTy (Proxy @p) (Proxy @rref) <$>)
. h
. unGRpcIWTy (Proxy @p) (Proxy @vref))
-----
instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
instance (MonadIO m, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgStream aname anns vref ] ('RetSingle rref)
(ConduitT () v m () -> m r) where
gRpcMethodHandler f _ _ _ rpc h
= clientStream @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
rpc cstream
= clientStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(raiseErrors . f) rpc cstream
where cstream :: req
-> IO ((), ClientStream (GRpcIWTy p vref v)
-> m ((), ClientStream m (GRpcIWTy p vref v)
(GRpcOWTy p rref r) ())
cstream _ = do
-- Create a new TMChan
chan <- newTMChanIO :: IO (TMChan v)
chan <- liftIO newTMChanIO :: m (TMChan v)
let producer = sourceTMChan @m chan
-- Start executing the handler in another thread
promise <- async (raiseErrors $ buildGRpcOWTy (Proxy @p) (Proxy @rref) <$> f (h producer))
-- Build the actual handler
let cstreamHandler _ newInput
= atomically (writeTMChan chan (unGRpcIWTy (Proxy @p) (Proxy @vref) newInput))
cstreamFinalizer _
= atomically (closeTMChan chan) >> wait promise
-- Return the information
pure ((), ClientStream cstreamHandler cstreamFinalizer)
liftIO $ withAsync (raiseErrors $ buildGRpcOWTy (Proxy @p) (Proxy @rref) <$> f (h producer))
$ \promise -> do
-- Build the actual handler
let cstreamHandler _ newInput
= liftIO $ atomically $
writeTMChan chan (unGRpcIWTy (Proxy @p) (Proxy @vref) newInput)
cstreamFinalizer _
= liftIO $ atomically (closeTMChan chan) >> wait promise
-- Return the information
pure ((), ClientStream cstreamHandler cstreamFinalizer)
-----
@ -356,24 +359,24 @@ instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgSingle aname anns vref ] ('RetStream rref)
(v -> ConduitT r Void m () -> m ()) where
gRpcMethodHandler f _ _ _ rpc h
= serverStream @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
rpc sstream
= serverStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(raiseErrors . f) rpc sstream
where sstream :: req -> GRpcIWTy p vref v
-> IO ((), ServerStream (GRpcOWTy p rref r) ())
-> m ((), ServerStream m (GRpcOWTy p rref r) ())
sstream _ v = do
-- Variable to connect input and output
var <- newEmptyTMVarIO :: IO (TMVar (Maybe r))
var <- liftIO newEmptyTMVarIO :: m (TMVar (Maybe r))
-- Start executing the handler
let v' = unGRpcIWTy (Proxy @p) (Proxy @vref) v
promise <- async (raiseErrors $ f (h v' (toTMVarConduit var)))
liftIO $ withAsync (raiseErrors $ f (h v' (toTMVarConduit var))) $ \promise -> do
-- Return the information
let readNext _
= do nextOutput <- atomically $ takeTMVar var
case nextOutput of
Just o -> pure $ Just ((), buildGRpcOWTy (Proxy @p) (Proxy @rref) o)
Nothing -> do cancel promise
pure Nothing
pure ((), ServerStream readNext)
let readNext _
= do nextOutput <- liftIO $ atomically $ takeTMVar var
case nextOutput of
Just o -> pure $ Just ((), buildGRpcOWTy (Proxy @p) (Proxy @rref) o)
Nothing -> do liftIO $ cancel promise
pure Nothing
pure ((), ServerStream readNext)
-----
@ -381,33 +384,34 @@ instance (GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r, MonadIO m)
=> GRpcMethodHandler p m '[ 'ArgStream aname anns vref ] ('RetStream rref)
(ConduitT () v m () -> ConduitT r Void m () -> m ()) where
gRpcMethodHandler f _ _ _ rpc h
= generalStream @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
rpc bdstream
where bdstream :: req -> IO ( (), IncomingStream (GRpcIWTy p vref v) ()
, (), OutgoingStream (GRpcOWTy p rref r) () )
= generalStream @m @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
(raiseErrors . f) rpc bdstream
where bdstream :: req -> m ( (), IncomingStream m (GRpcIWTy p vref v) ()
, (), OutgoingStream m (GRpcOWTy p rref r) () )
bdstream _ = do
-- Create a new TMChan and a new variable
chan <- newTMChanIO :: IO (TMChan v)
chan <- liftIO newTMChanIO :: m (TMChan v)
let producer = sourceTMChan @m chan
var <- newEmptyTMVarIO :: IO (TMVar (Maybe r))
var <- liftIO newEmptyTMVarIO :: m (TMVar (Maybe r))
-- Start executing the handler
promise <- async (raiseErrors $ f $ h producer (toTMVarConduit var))
-- Build the actual handler
let cstreamHandler _ newInput
= atomically (writeTMChan chan (unGRpcIWTy (Proxy @p) (Proxy @vref) newInput))
cstreamFinalizer _
= atomically (closeTMChan chan) >> wait promise
readNext _
= do nextOutput <- atomically $ tryTakeTMVar var
case nextOutput of
Just (Just o) ->
pure $ Just ((), buildGRpcOWTy (Proxy @p) (Proxy @rref) o)
Just Nothing -> do
cancel promise
pure Nothing
Nothing -> -- no new elements to output
readNext ()
pure ((), IncomingStream cstreamHandler cstreamFinalizer, (), OutgoingStream readNext)
liftIO $ withAsync (raiseErrors $ f $ h producer (toTMVarConduit var)) $ \promise -> do
-- Build the actual handler
let cstreamHandler _ newInput
= liftIO $ atomically $
writeTMChan chan (unGRpcIWTy (Proxy @p) (Proxy @vref) newInput)
cstreamFinalizer _
= liftIO $ atomically (closeTMChan chan) >> wait promise
readNext _
= do nextOutput <- liftIO $ atomically $ tryTakeTMVar var
case nextOutput of
Just (Just o) ->
pure $ Just ((), buildGRpcOWTy (Proxy @p) (Proxy @rref) o)
Just Nothing -> do
liftIO $ cancel promise
pure Nothing
Nothing -> -- no new elements to output
readNext ()
pure ((), IncomingStream cstreamHandler cstreamFinalizer, (), OutgoingStream readNext)
-----

View File

@ -25,7 +25,7 @@ extra-deps:
- http2-grpc-types-0.5.0.0
- proto3-wire-1.1.0
- http2-grpc-proto3-wire-0.1.0.0
- warp-grpc-0.3.0.0
- warp-grpc-0.4.0.0
- http2-client-grpc-0.8.0.0
- avro-0.4.7.0
- language-protobuf-1.0.1

View File

@ -25,7 +25,7 @@ extra-deps:
- http2-grpc-types-0.5.0.0
- proto3-wire-1.1.0
- http2-grpc-proto3-wire-0.1.0.0
- warp-grpc-0.3.0.0
- warp-grpc-0.4.0.0
- http2-client-grpc-0.8.0.0
- avro-0.4.7.0
- language-protobuf-1.0.1