[grpc-client] Handle bidirectional streams correctly (#314)

This is a breaking change: The handler for bidirectional streams is returns two
conduits now, instead of one. This enables the client to correctly tackle the
concurrent nature of the client to server stream and the server to client
stream.

Each response in the server-to-client stream is no longer wrapped in GRpcReply,
any error during parsing the stream is thrown in IO.

Other connection related errors are returned in the result value of the conduit
corresponding to the server-to-client Conduit.

Note: The client didn't and still doesn't handle any errors that the server
might indicate using headers or trailers, e.g. grpc-status or the HTTP status
code. This commit also adds TODO comments to handle these.
This commit is contained in:
Akshay Mankar 2021-06-01 13:32:17 +02:00 committed by GitHub
parent 5315abd39c
commit 0f4942b1c4

View File

@ -21,6 +21,7 @@ import Control.Concurrent.Async
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMChan
import Control.Concurrent.STM.TMVar
import Control.Exception (throwIO)
import Control.Monad.IO.Class
import Data.Avro
import qualified Data.ByteString.Char8 as BS
@ -38,7 +39,7 @@ import Network.GRPC.Client.Helpers
import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput)
import Network.HTTP2 (ErrorCode)
import Network.HTTP2.Client (ClientError, ClientIO, TooMuchConcurrency,
runExceptT)
runExceptT, ExceptT)
import Mu.Adapter.ProtoBuf.Via
import Mu.GRpc.Avro
@ -304,47 +305,50 @@ conduitFromChannel chan promise = go
instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (CompressMode -> IO (ConduitT v (GRpcReply r) IO ())) )
, handler ~ (CompressMode -> IO (ConduitT v Void IO (), ConduitT () r IO (GRpcReply ()))))
=> GRpcMethodCall p ('Method name '[ 'ArgStream aname vref ]
('RetStream rref)) handler where
gRpcMethodCall rpc _ client compress
= do -- Create a new TMChan
inchan <- newTMChanIO :: IO (TMChan (GRpcReply r))
outchan <- newTMChanIO :: IO (TMChan v)
var <- newEmptyTMVarIO -- if full, this means an error
= do serverChan <- newTMChanIO :: IO (TMChan r)
clientChan <- newTMChanIO :: IO (TMChan v)
finalReply <- newEmptyTMVarIO :: IO (TMVar (GRpcReply ()))
-- Start executing the client in another thread
-- TODO: Is there anything that makes sure that this thread doesn't keep running forever?
_ <- async $ do
v <- simplifyResponse $
buildGRpcReply3 <$>
rawGeneralStream
@_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
rpc client
() (\_ ievent -> do -- on the first iteration, say that everything is OK
_ <- liftIO $ atomically $ tryPutTMVar var (GRpcOk ())
case ievent of
RecvMessage o -> liftIO $ atomically $ writeTMChan inchan (GRpcOk $ unGRpcOWTy(Proxy @p) (Proxy @rref) o)
Invalid e -> liftIO $ atomically $ writeTMChan inchan (GRpcErrorString (show e))
_ -> pure () )
() (\_ -> do
nextVal <- liftIO $ atomically $ readTMChan outchan
case nextVal of
Nothing -> pure ((), Finalize)
Just v -> pure ((), SendMessage compress (buildGRpcIWTy (Proxy @p) (Proxy @vref) v)))
case v of
GRpcOk () -> liftIO $ atomically $ closeTMChan inchan
_ -> liftIO $ atomically $ putTMVar var v
-- This conduit feeds information to the other thread
let go = do err <- liftIO $ atomically $ takeTMVar var
case err of
GRpcOk _ -> go2
e -> yield $ (\_ -> error "this should never happen") <$> e
go2 = do nextOut <- await
case nextOut of
Just v -> do liftIO $ atomically $ writeTMChan outchan v
go2
Nothing -> do r <- liftIO $ atomically $ tryReadTMChan inchan
case r of
Nothing -> pure () -- both are empty, end
Just Nothing -> go2
Just (Just nextIn) -> yield nextIn >> go2
pure go
() (incomingEventConsumer serverChan)
() (outgoingEventProducer clientChan)
liftIO $ atomically $ putTMVar finalReply v
let clientConduit = do
sinkTMChan clientChan
liftIO . atomically . closeTMChan $ clientChan
serverConduit = do
sourceTMChan serverChan
liftIO . atomically . readTMVar $ finalReply
pure (clientConduit, serverConduit)
where
incomingEventConsumer :: TMChan r -> () -> IncomingEvent (GRpcOWTy p rref r) () -> ExceptT ClientError IO ()
incomingEventConsumer serverChan _ ievent =
case ievent of
RecvMessage o -> do
liftIO $ atomically $ writeTMChan serverChan (unGRpcOWTy (Proxy @p) (Proxy @rref) o)
Invalid e -> liftIO $ do
atomically $ closeTMChan serverChan
throwIO e
Trailers _ ->
-- TODO: Read the trailers and use them to make the 'finalReply'
liftIO $ atomically $ closeTMChan serverChan
Headers _ ->
-- TODO: Read the headers and use them to make the 'finalReply'
pure ()
outgoingEventProducer :: TMChan v -> () -> ExceptT ClientError IO ((), OutgoingEvent (GRpcIWTy p vref v) ())
outgoingEventProducer clientChan _ = do
nextVal <- liftIO $ atomically $ readTMChan clientChan
case nextVal of
Nothing -> pure ((), Finalize)
Just v -> pure ((), SendMessage compress (buildGRpcIWTy (Proxy @p) (Proxy @vref) v))