Bidirectional gRpc streams compile

This commit is contained in:
Alejandro Serrano 2019-10-03 15:00:43 +02:00
parent 4699bd4d2f
commit 9f4372a400

View File

@ -28,7 +28,8 @@ import Data.Conduit.TMChan
import GHC.TypeLits
import Network.HTTP2 (ErrorCode)
import Network.HTTP2.Client (ClientIO, TooMuchConcurrency, ClientError, runExceptT)
import Network.GRPC.Proto3Wire.Client (RPC(..), RawReply, CompressMode(..), StreamDone(..))
import Network.GRPC.Proto3Wire.Client (RPC(..), RawReply, CompressMode(..), StreamDone(..),
IncomingEvent(..),OutgoingEvent(..))
import Network.GRPC.Proto3Wire.Client.Helpers
import Mu.Rpc
@ -138,7 +139,7 @@ instance (KnownName name, HasProtoSchema vsch vty v, HasProtoSchema rsch rty r)
rawStreamServer (toProtoViaSchema @vsch, fromProtoViaSchema @rsch) rpc client () x
(\_ _ newVal -> liftIO $ atomically $ writeTMChan chan newVal)
case v of
GRpcOk () -> return ()
GRpcOk () -> liftIO $ atomically $ closeTMChan chan
_ -> liftIO $ atomically $ putTMVar var v
-- This conduit feeds information to the other thread
let go = do err <- liftIO $ atomically $ tryTakeTMVar var
@ -147,5 +148,48 @@ instance (KnownName name, HasProtoSchema vsch vty v, HasProtoSchema rsch rty r)
Nothing -> -- no error, everything is fine
sourceTMChan chan .| C.map GRpcOk
return go
where methodName = BS.pack (nameVal (Proxy @name))
rpc = RPC pkgName srvName methodName
instance (KnownName name, HasProtoSchema vsch vty v, HasProtoSchema rsch rty r)
=> GRpcMethodCall ('Method name '[ 'ArgStream vsch vty ] ('RetStream rsch rty))
(CompressMode -> IO (ConduitT v (GRpcReply r) IO ())) where
gRpcMethodCall pkgName srvName _ client compress
= do -- Create a new TMChan
inchan <- newTMChanIO
outchan <- newTMChanIO
var <- newEmptyTMVarIO -- if full, this means an error
-- Start executing the client in another thread
async $ do
v <- simplifyResponse $
buildGRpcReply3 <$>
rawGeneralStream
(toProtoViaSchema @vsch, fromProtoViaSchema @rsch) rpc client
() (\_ ievent -> case ievent of
RecvMessage o -> liftIO $ atomically $ writeTMChan inchan (GRpcOk o)
Invalid e -> liftIO $ atomically $ writeTMChan inchan (GRpcErrorString (show e))
_ -> return () )
() (\_ -> do nextVal <- liftIO $ atomically $ readTMChan outchan
case nextVal of
Nothing -> return ((), Finalize)
Just v -> return ((), SendMessage compress v))
case v of
GRpcOk () -> liftIO $ atomically $ closeTMChan inchan
_ -> liftIO $ atomically $ putTMVar var v
-- This conduit feeds information to the other thread
let go = do err <- liftIO $ atomically $ tryTakeTMVar var
case err of
Just e -> yield $ (\_ -> error "this should never happen") <$> e
Nothing -> -- no error, everything is fine
do nextOut <- await
case nextOut of
Just v -> do liftIO $ atomically $ writeTMChan outchan v
go
Nothing -> do r <- liftIO $ atomically $ tryReadTMChan inchan
case r of
Nothing -> return () -- both are empty, end
Just Nothing -> go
Just (Just nextIn) -> yield nextIn >> go
return go
where methodName = BS.pack (nameVal (Proxy @name))
rpc = RPC pkgName srvName methodName