mirror of
https://github.com/ilyakooo0/urbit.git
synced 2024-12-22 22:31:30 +03:00
ecbcdf7d00
I noticed that the king's text log file kept filling up, and it's mostly 'sending chunk "\n"' from eyre. This changes the log level of every partial send over an active eyre channel to info, instead of always.
230 lines
6.2 KiB
Haskell
230 lines
6.2 KiB
Haskell
{-|
|
|
WAI Application for `eyre` driver.
|
|
|
|
# Request Lifecycles
|
|
|
|
- Requests come in, are given an identifier and are passed to a callback.
|
|
|
|
- When requests timeout, the identifier is passed to anothing callback.
|
|
|
|
- The server pulls response actions, and passes them to the associated
|
|
request.
|
|
-}
|
|
|
|
module Urbit.Vere.Eyre.Wai
|
|
( RespAct(..)
|
|
, RespApi(..)
|
|
, LiveReqs(..)
|
|
, ReqInfo(..)
|
|
, emptyLiveReqs
|
|
, routeRespAct
|
|
, rmLiveReq
|
|
, newLiveReq
|
|
, app
|
|
)
|
|
where
|
|
|
|
import Urbit.Prelude hiding (Builder)
|
|
|
|
import Data.Binary.Builder (Builder, fromByteString)
|
|
import Data.Bits (shiftL, (.|.))
|
|
import Data.Conduit (ConduitT, Flush(Chunk, Flush), yield)
|
|
import Network.Socket (SockAddr(..))
|
|
import System.Random (newStdGen, randoms)
|
|
import Urbit.Arvo (Address(..), Ipv4(..), Ipv6(..), Method)
|
|
|
|
import qualified Network.HTTP.Types as H
|
|
import qualified Network.Wai as W
|
|
import qualified Network.Wai.Conduit as W
|
|
|
|
|
|
-- Types -----------------------------------------------------------------------
|
|
|
|
data RespAct
|
|
= RAFull H.Status [H.Header] ByteString
|
|
| RAHead H.Status [H.Header] ByteString
|
|
| RABloc ByteString
|
|
| RADone
|
|
deriving (Eq, Ord, Show)
|
|
|
|
data RespApi = RespApi
|
|
{ raAct :: RespAct -> STM Bool
|
|
, raKil :: STM ()
|
|
}
|
|
|
|
data LiveReqs = LiveReqs
|
|
{ reqIdSuply :: [Word64]
|
|
, activeReqs :: Map Word64 (Ship, RespApi)
|
|
}
|
|
|
|
data ReqInfo = ReqInfo
|
|
{ riAdr :: Address
|
|
, riMet :: H.StdMethod
|
|
, riUrl :: ByteString
|
|
, riHdr :: [H.Header]
|
|
, riBod :: ByteString
|
|
}
|
|
|
|
|
|
-- Live Requests Table -- All Requests Still Waiting for Responses -------------
|
|
|
|
emptyLiveReqs :: IO LiveReqs
|
|
emptyLiveReqs = io $ do
|
|
gen <- newStdGen
|
|
pure (LiveReqs (randoms gen) mempty)
|
|
|
|
routeRespAct :: Ship -> TVar LiveReqs -> Word64 -> RespAct -> STM Bool
|
|
routeRespAct who vLiv reqId act =
|
|
(lookup reqId . activeReqs <$> readTVar vLiv) >>= \case
|
|
Nothing -> pure False
|
|
Just (own, tv) -> do
|
|
if (who == own)
|
|
then raAct tv act
|
|
else pure False
|
|
|
|
rmLiveReq :: TVar LiveReqs -> Word64 -> STM ()
|
|
rmLiveReq var reqId = modifyTVar' var
|
|
$ \liv -> liv { activeReqs = deleteMap reqId (activeReqs liv) }
|
|
|
|
allocateReqId :: TVar LiveReqs -> STM Word64
|
|
allocateReqId var = do
|
|
LiveReqs supply tbl <- readTVar var
|
|
|
|
let loop :: [Word64] -> (Word64, [Word64])
|
|
loop [] = error "impossible"
|
|
loop (x:xs) | member x tbl = loop xs
|
|
loop (x:xs) | otherwise = (x, xs)
|
|
|
|
let (fresh, supply') = loop supply
|
|
writeTVar var (LiveReqs supply' tbl)
|
|
pure fresh
|
|
|
|
newLiveReq :: Ship -> TVar LiveReqs -> STM (Word64, STM RespAct)
|
|
newLiveReq who var = do
|
|
tmv <- newTQueue
|
|
kil <- newEmptyTMVar
|
|
nex <- allocateReqId var
|
|
|
|
LiveReqs sup tbl <- readTVar var
|
|
|
|
let waitAct = (<|>) (readTMVar kil $> RADone) (readTQueue tmv)
|
|
respApi = RespApi
|
|
{ raKil = putTMVar kil ()
|
|
, raAct = \act -> tryReadTMVar kil >>= \case
|
|
Nothing -> writeTQueue tmv act $> True
|
|
Just () -> pure False
|
|
}
|
|
|
|
|
|
writeTVar var (LiveReqs sup (insertMap nex (who, respApi) tbl))
|
|
|
|
pure (nex, waitAct)
|
|
|
|
|
|
-- Random Helpers --------------------------------------------------------------
|
|
|
|
cookMeth :: W.Request -> Maybe Method
|
|
cookMeth = H.parseMethod . W.requestMethod >>> \case
|
|
Left _ -> Nothing
|
|
Right m -> Just m
|
|
|
|
reqAddr :: W.Request -> Address
|
|
reqAddr = W.remoteHost >>> \case
|
|
SockAddrInet _ a -> AIpv4 (Ipv4 a)
|
|
SockAddrInet6 _ _ a _ -> AIpv6 (mkIpv6 a)
|
|
_ -> error "invalid sock addr"
|
|
|
|
mkIpv6 :: (Word32, Word32, Word32, Word32) -> Ipv6
|
|
mkIpv6 (p, q, r, s) = Ipv6 (pBits .|. qBits .|. rBits .|. sBits)
|
|
where
|
|
pBits = shiftL (fromIntegral p) 0
|
|
qBits = shiftL (fromIntegral q) 32
|
|
rBits = shiftL (fromIntegral r) 64
|
|
sBits = shiftL (fromIntegral s) 96
|
|
|
|
reqUrl :: W.Request -> ByteString
|
|
reqUrl r = W.rawPathInfo r <> W.rawQueryString r
|
|
|
|
|
|
-- Responses -------------------------------------------------------------------
|
|
|
|
noHeader :: HasLogFunc e => RIO e a
|
|
noHeader = do
|
|
logError "Response block with no response header."
|
|
error "Bad HttpEvent: Response block with no response header."
|
|
|
|
dupHead :: HasLogFunc e => RIO e a
|
|
dupHead = do
|
|
logError "Multiple %head actions on one request"
|
|
error "Bad HttpEvent: Multiple header actions per on one request."
|
|
|
|
{-|
|
|
- Immediately yield all of the initial chunks
|
|
- Yield the data from %bloc action.
|
|
- Close the stream when we hit a %done action.
|
|
-}
|
|
streamBlocks
|
|
:: HasLogFunc e
|
|
=> e
|
|
-> ByteString
|
|
-> STM RespAct
|
|
-> ConduitT () (Flush Builder) IO ()
|
|
streamBlocks env init getAct = send init >> loop
|
|
where
|
|
loop = atomically getAct >>= \case
|
|
RAHead _ _ _ -> runRIO env dupHead
|
|
RAFull _ _ _ -> runRIO env dupHead
|
|
RADone -> pure ()
|
|
RABloc c -> send c >> loop
|
|
|
|
send "" = pure ()
|
|
send c = do
|
|
runRIO env (logInfo (display ("sending chunk " <> tshow c)))
|
|
yield $ Chunk $ fromByteString c
|
|
yield Flush
|
|
|
|
sendResponse
|
|
:: HasLogFunc e
|
|
=> (W.Response -> IO W.ResponseReceived)
|
|
-> STM RespAct
|
|
-> RIO e W.ResponseReceived
|
|
sendResponse cb waitAct = do
|
|
env <- ask
|
|
atomically waitAct >>= \case
|
|
RADone -> io $ cb $ W.responseLBS (H.mkStatus 444 "No Response") [] ""
|
|
RAFull s h b -> io $ cb $ W.responseLBS s h $ fromStrict b
|
|
RAHead s h b -> io $ cb $ W.responseSource s h $ streamBlocks env b waitAct
|
|
RABloc _ -> noHeader
|
|
|
|
liveReq :: Ship -> TVar LiveReqs -> RAcquire e (Word64, STM RespAct)
|
|
liveReq who vLiv = mkRAcquire ins del
|
|
where
|
|
ins = atomically (newLiveReq who vLiv)
|
|
del = atomically . rmLiveReq vLiv . fst
|
|
|
|
app
|
|
:: HasLogFunc e
|
|
=> e
|
|
-> Ship
|
|
-> TVar LiveReqs
|
|
-> (Word64 -> ReqInfo -> STM ())
|
|
-> (Word64 -> STM ())
|
|
-> W.Application
|
|
app env who liv inform cancel req respond =
|
|
runRIO env $ rwith (liveReq who liv) $ \(reqId, respApi) -> do
|
|
bod <- io (toStrict <$> W.strictRequestBody req)
|
|
met <- maybe (error "bad method") pure (cookMeth req)
|
|
|
|
let adr = reqAddr req
|
|
hdr = W.requestHeaders req
|
|
url = reqUrl req
|
|
|
|
atomically $ inform reqId $ ReqInfo adr met url hdr bod
|
|
|
|
try (sendResponse respond respApi) >>= \case
|
|
Right rr -> pure rr
|
|
Left exn -> do
|
|
atomically (cancel reqId)
|
|
logError $ display ("Exception during request" <> tshow exn)
|
|
throwIO (exn :: SomeException)
|