mirror of
https://github.com/urbit/shrub.git
synced 2024-12-30 15:44:03 +03:00
149 lines
5.7 KiB
Haskell
149 lines
5.7 KiB
Haskell
{-
|
|
- TODO When making a request, handle the case where the request id is
|
|
already in use.
|
|
-}
|
|
|
|
module Vere.Http.Client where
|
|
|
|
import Arvo (BlipEv(..), Ev(..), HttpClientEf(..), HttpClientEv(..),
|
|
HttpClientReq(..), HttpEvent(..), KingId,
|
|
ResponseHeader(..))
|
|
import UrbitPrelude hiding (Builder)
|
|
import Vere.Pier.Types
|
|
|
|
import Vere.Http
|
|
|
|
import qualified Data.Map as M
|
|
import qualified Network.HTTP.Client as H
|
|
import qualified Network.HTTP.Types as HT
|
|
|
|
-- Types -----------------------------------------------------------------------
|
|
|
|
type ReqId = Word
|
|
|
|
data HttpClientDrv = HttpClientDrv
|
|
{ hcdManager :: H.Manager
|
|
, hcdLive :: TVar (Map ReqId (Async ()))
|
|
}
|
|
|
|
--------------------------------------------------------------------------------
|
|
|
|
cvtReq :: HttpClientReq -> Maybe H.Request
|
|
cvtReq r =
|
|
H.parseRequest (unpack (unCord $ url r)) <&> \init -> init
|
|
{ H.method = encodeUtf8 $ tshow (method r)
|
|
, H.requestHeaders = unconvertHeaders (headerList r)
|
|
, H.requestBody =
|
|
H.RequestBodyBS $ case body r of
|
|
Nothing -> ""
|
|
Just (Octs bs) -> bs
|
|
}
|
|
|
|
cvtRespHeaders :: H.Response a -> ResponseHeader
|
|
cvtRespHeaders resp =
|
|
ResponseHeader (fromIntegral $ HT.statusCode (H.responseStatus resp)) heads
|
|
where
|
|
heads = convertHeaders (H.responseHeaders resp)
|
|
|
|
--------------------------------------------------------------------------------
|
|
|
|
client :: forall e. HasLogFunc e
|
|
=> KingId -> QueueEv -> ([Ev], RAcquire e (EffCb e HttpClientEf))
|
|
client kingId enqueueEv = ([], runHttpClient)
|
|
where
|
|
runHttpClient :: RAcquire e (EffCb e HttpClientEf)
|
|
runHttpClient = handleEffect <$> mkRAcquire start stop
|
|
|
|
start :: RIO e (HttpClientDrv)
|
|
start = HttpClientDrv <$>
|
|
(io $ H.newManager H.defaultManagerSettings) <*>
|
|
newTVarIO M.empty
|
|
|
|
stop :: HttpClientDrv -> RIO e ()
|
|
stop HttpClientDrv{..} = do
|
|
-- Cancel all the outstanding asyncs, ignoring any exceptions.
|
|
liveThreads <- atomically $ readTVar hcdLive
|
|
mapM_ cancel liveThreads
|
|
|
|
handleEffect :: HttpClientDrv -> HttpClientEf -> RIO e ()
|
|
handleEffect drv = \case
|
|
HCERequest _ id req -> newReq drv id req
|
|
HCECancelRequest _ id -> cancelReq drv id
|
|
|
|
newReq :: HttpClientDrv -> ReqId -> HttpClientReq -> RIO e ()
|
|
newReq drv id req = do
|
|
async <- runReq drv id req
|
|
atomically $ modifyTVar (hcdLive drv) (insertMap id async)
|
|
|
|
-- The problem with the original http client code was that it was written
|
|
-- to the idea of what the events "should have" been instead of what they
|
|
-- actually were. This means that this driver doesn't run like the vere
|
|
-- http client driver. The vere driver was written assuming that parts of
|
|
-- events could be compressed together: a Start might contain the only
|
|
-- chunk of data and immediately complete, where here the Start event, the
|
|
-- Continue (with File) event, and the Continue (completed) event are three
|
|
-- separate things.
|
|
runReq :: HttpClientDrv -> ReqId -> HttpClientReq -> RIO e (Async ())
|
|
runReq HttpClientDrv{..} id req = async $
|
|
case cvtReq req of
|
|
Nothing -> do
|
|
logDebug $ displayShow ("(malformed http client request)", id, req)
|
|
planEvent id (Cancel ())
|
|
Just r -> do
|
|
logDebug $ displayShow ("(http client request)", id, req)
|
|
withRunInIO $ \run ->
|
|
H.withResponse r hcdManager $ \x -> run (exec x)
|
|
where
|
|
recv :: H.BodyReader -> RIO e (Maybe ByteString)
|
|
recv read = io $ read <&> \case chunk | null chunk -> Nothing
|
|
| otherwise -> Just chunk
|
|
|
|
exec :: H.Response H.BodyReader -> RIO e ()
|
|
exec resp = do
|
|
let headers = cvtRespHeaders resp
|
|
getChunk = recv (H.responseBody resp)
|
|
loop = getChunk >>= \case
|
|
Nothing -> planEvent id (Continue Nothing True)
|
|
Just bs -> do
|
|
planEvent id $
|
|
Continue (Just $ File $ Octs bs) False
|
|
loop
|
|
planEvent id (Start headers Nothing False)
|
|
loop
|
|
|
|
planEvent :: ReqId -> HttpEvent -> RIO e ()
|
|
planEvent id ev = do
|
|
logDebug $ displayShow ("(http client response)", id, (describe ev))
|
|
atomically $ enqueueEv $ EvBlip $ BlipEvHttpClient $
|
|
HttpClientEvReceive (kingId, ()) (fromIntegral id) ev
|
|
|
|
-- show an HttpEvent with byte count instead of raw data
|
|
describe :: HttpEvent -> String
|
|
describe (Start header Nothing final) =
|
|
"(Start " ++ (show header) ++ " ~ " ++ (show final)
|
|
describe (Start header (Just (File (Octs bs))) final) =
|
|
"(Start " ++ (show header) ++ " (" ++ (show $ length bs) ++ " bytes) " ++
|
|
(show final)
|
|
describe (Continue Nothing final) =
|
|
"(Continue ~ " ++ (show final)
|
|
describe (Continue (Just (File (Octs bs))) final) =
|
|
"(Continue (" ++ (show $ length bs) ++ " bytes) " ++ (show final)
|
|
describe (Cancel ()) = "(Cancel ())"
|
|
|
|
waitCancel :: Async a -> RIO e (Either SomeException a)
|
|
waitCancel async = cancel async >> waitCatch async
|
|
|
|
cancelThread :: ReqId -> Async a -> RIO e ()
|
|
cancelThread id =
|
|
waitCancel >=> \case Left _ -> planEvent id $ Cancel ()
|
|
Right _ -> pure ()
|
|
|
|
cancelReq :: HttpClientDrv -> ReqId -> RIO e ()
|
|
cancelReq drv id =
|
|
join $ atomically $ do
|
|
tbl <- readTVar (hcdLive drv)
|
|
case lookup id tbl of
|
|
Nothing -> pure (pure ())
|
|
Just async -> do writeTVar (hcdLive drv) (deleteMap id tbl)
|
|
pure (cancelThread id async)
|