mirror of
https://github.com/ilyakooo0/urbit.git
synced 2025-01-03 04:40:50 +03:00
Don't send a Canceled event if the request already finished.
This commit is contained in:
parent
fc6f3028e2
commit
2374ed3ce8
@ -1,37 +1,31 @@
|
||||
{-
|
||||
- TODO When making a request, handle the case where the request id is
|
||||
already in use.
|
||||
- TODO When canceling a request, don't send Http.Canceled if the
|
||||
request already finished.
|
||||
-}
|
||||
|
||||
module Vere.Http.Client where
|
||||
|
||||
import ClassyPrelude
|
||||
import Data.Void
|
||||
import Vere.Http as Http
|
||||
import Control.Concurrent hiding (newEmptyMVar, putMVar)
|
||||
import Vere.Http
|
||||
|
||||
import qualified Network.HTTP.Client as H
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
|
||||
type ReqId = Word
|
||||
|
||||
data Ev = Receive ReqId Http.Event -- %receive
|
||||
data Ev = Receive ReqId Event -- [%receive @ todo]
|
||||
|
||||
data Eff
|
||||
= NewReq ReqId Request -- %request
|
||||
| CancelReq ReqId -- %cancel-request
|
||||
= NewReq ReqId Request -- [%request @ todo]
|
||||
| CancelReq ReqId -- [%cancel-request @]
|
||||
|
||||
data State = State
|
||||
{ sManager :: H.Manager
|
||||
, sLive :: TVar (Map ReqId ThreadId)
|
||||
, sLive :: TVar (Map ReqId (Async ()))
|
||||
, sChan :: MVar Ev
|
||||
}
|
||||
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
cvtReq :: Request -> H.Request
|
||||
@ -40,7 +34,6 @@ cvtReq = undefined
|
||||
cvtRespHeaders :: H.Response a -> ResponseHeader
|
||||
cvtRespHeaders resp = undefined
|
||||
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
initState :: IO State
|
||||
@ -49,38 +42,46 @@ initState = State <$> H.newManager H.defaultManagerSettings
|
||||
<*> newEmptyMVar
|
||||
|
||||
emit :: State -> Ev -> IO ()
|
||||
emit (State _ _ chan) event = putMVar chan event
|
||||
emit st event = putMVar (sChan st) event
|
||||
|
||||
runEff :: State -> Eff -> IO ()
|
||||
runEff st@(State _ s _) = \case CancelReq id -> cancelReq st id
|
||||
NewReq id req -> newReq st id req
|
||||
runEff st = \case CancelReq id -> cancelReq st id
|
||||
NewReq id req -> newReq st id req
|
||||
|
||||
newReq :: State -> ReqId -> Request -> IO ()
|
||||
newReq st id req = do tid <- runReq st id req
|
||||
atomically $ modifyTVar (sLive st) (insertMap id tid)
|
||||
newReq st id req = do async <- runReq st id req
|
||||
atomically $ modifyTVar (sLive st) (insertMap id async)
|
||||
|
||||
waitCancel :: Async a -> IO (Either SomeException a)
|
||||
waitCancel async = cancel async >> waitCatch async
|
||||
|
||||
cancelThread :: State -> ReqId -> Async a -> IO ()
|
||||
cancelThread st id =
|
||||
waitCancel >=> \case Left _ -> emit st (Receive id Canceled)
|
||||
Right _ -> pure ()
|
||||
|
||||
cancelReq :: State -> ReqId -> IO ()
|
||||
cancelReq st id =
|
||||
join $ atomically $ do
|
||||
tbl <- readTVar (sLive st)
|
||||
case lookup id tbl of
|
||||
Nothing -> pure (pure ())
|
||||
Just tid -> do
|
||||
writeTVar (sLive st) (deleteMap id tbl)
|
||||
pure $ do killThread tid
|
||||
emit st (Receive id Canceled)
|
||||
Nothing -> pure (pure ())
|
||||
Just async -> do writeTVar (sLive st) (deleteMap id tbl)
|
||||
pure (cancelThread st id async)
|
||||
|
||||
runReq :: State -> ReqId -> Request -> IO ThreadId
|
||||
runReq st id request =
|
||||
forkIO $ H.withResponse (cvtReq request) (sManager st) $ \resp -> do
|
||||
let headers = cvtRespHeaders resp
|
||||
let getChunk = recv (H.responseBody resp)
|
||||
let loop = getChunk >>= \case
|
||||
Just bs -> emit st (Receive id $ Received bs) >> loop
|
||||
Nothing -> emit st (Receive id Done)
|
||||
emit st (Receive id $ Started headers)
|
||||
loop
|
||||
runReq :: State -> ReqId -> Request -> IO (Async ())
|
||||
runReq st id req = async (H.withResponse (cvtReq req) (sManager st) exec)
|
||||
where
|
||||
recv :: H.BodyReader -> IO (Maybe ByteString)
|
||||
recv read = read <&> \case chunk | null chunk -> Nothing
|
||||
| otherwise -> Just chunk
|
||||
|
||||
exec :: H.Response H.BodyReader -> IO ()
|
||||
exec resp = do
|
||||
let headers = cvtRespHeaders resp
|
||||
getChunk = recv (H.responseBody resp)
|
||||
loop = getChunk >>= \case
|
||||
Just bs -> emit st (Receive id $ Received bs) >> loop
|
||||
Nothing -> emit st (Receive id Done)
|
||||
emit st (Receive id $ Started headers)
|
||||
loop
|
||||
|
Loading…
Reference in New Issue
Block a user