urbit/pkg/hs/urbit-king/lib/Urbit/Vere/Eyre/Wai.hs

230 lines
6.2 KiB
Haskell
Raw Normal View History

{-|
WAI Application for `eyre` driver.
# Request Lifecycles
- Requests come in, are given an identifier and are passed to a callback.
- When requests timeout, the identifier is passed to anothing callback.
- The server pulls response actions, and passes them to the associated
request.
-}
2020-05-08 21:29:18 +03:00
module Urbit.Vere.Eyre.Wai
( RespAct(..)
, RespApi(..)
, LiveReqs(..)
, ReqInfo(..)
, emptyLiveReqs
, routeRespAct
, rmLiveReq
, newLiveReq
, app
)
where
import Urbit.Prelude hiding (Builder)
import Data.Binary.Builder (Builder, fromByteString)
import Data.Bits (shiftL, (.|.))
import Data.Conduit (ConduitT, Flush(Chunk, Flush), yield)
import Network.Socket (SockAddr(..))
import System.Random (newStdGen, randoms)
import Urbit.Arvo (Address(..), Ipv4(..), Ipv6(..), Method)
import qualified Network.HTTP.Types as H
import qualified Network.Wai as W
import qualified Network.Wai.Conduit as W
-- Types -----------------------------------------------------------------------
data RespAct
= RAFull H.Status [H.Header] ByteString
| RAHead H.Status [H.Header] ByteString
| RABloc ByteString
| RADone
deriving (Eq, Ord, Show)
data RespApi = RespApi
{ raAct :: RespAct -> STM Bool
, raKil :: STM ()
}
data LiveReqs = LiveReqs
{ reqIdSuply :: [Word64]
, activeReqs :: Map Word64 (Ship, RespApi)
}
data ReqInfo = ReqInfo
{ riAdr :: Address
, riMet :: H.StdMethod
, riUrl :: ByteString
, riHdr :: [H.Header]
, riBod :: ByteString
}
-- Live Requests Table -- All Requests Still Waiting for Responses -------------
emptyLiveReqs :: IO LiveReqs
emptyLiveReqs = io $ do
gen <- newStdGen
pure (LiveReqs (randoms gen) mempty)
routeRespAct :: Ship -> TVar LiveReqs -> Word64 -> RespAct -> STM Bool
routeRespAct who vLiv reqId act =
(lookup reqId . activeReqs <$> readTVar vLiv) >>= \case
Nothing -> pure False
Just (own, tv) -> do
if (who == own)
then raAct tv act
else pure False
rmLiveReq :: TVar LiveReqs -> Word64 -> STM ()
rmLiveReq var reqId = modifyTVar' var
$ \liv -> liv { activeReqs = deleteMap reqId (activeReqs liv) }
allocateReqId :: TVar LiveReqs -> STM Word64
allocateReqId var = do
LiveReqs supply tbl <- readTVar var
let loop :: [Word64] -> (Word64, [Word64])
loop [] = error "impossible"
loop (x:xs) | member x tbl = loop xs
loop (x:xs) | otherwise = (x, xs)
let (fresh, supply') = loop supply
writeTVar var (LiveReqs supply' tbl)
pure fresh
newLiveReq :: Ship -> TVar LiveReqs -> STM (Word64, STM RespAct)
newLiveReq who var = do
tmv <- newTQueue
kil <- newEmptyTMVar
nex <- allocateReqId var
LiveReqs sup tbl <- readTVar var
let waitAct = (<|>) (readTMVar kil $> RADone) (readTQueue tmv)
respApi = RespApi
{ raKil = putTMVar kil ()
, raAct = \act -> tryReadTMVar kil >>= \case
Nothing -> writeTQueue tmv act $> True
Just () -> pure False
}
writeTVar var (LiveReqs sup (insertMap nex (who, respApi) tbl))
pure (nex, waitAct)
-- Random Helpers --------------------------------------------------------------
cookMeth :: W.Request -> Maybe Method
cookMeth = H.parseMethod . W.requestMethod >>> \case
Left _ -> Nothing
Right m -> Just m
reqAddr :: W.Request -> Address
reqAddr = W.remoteHost >>> \case
SockAddrInet _ a -> AIpv4 (Ipv4 a)
SockAddrInet6 _ _ a _ -> AIpv6 (mkIpv6 a)
_ -> error "invalid sock addr"
mkIpv6 :: (Word32, Word32, Word32, Word32) -> Ipv6
mkIpv6 (p, q, r, s) = Ipv6 (pBits .|. qBits .|. rBits .|. sBits)
where
pBits = shiftL (fromIntegral p) 0
qBits = shiftL (fromIntegral q) 32
rBits = shiftL (fromIntegral r) 64
sBits = shiftL (fromIntegral s) 96
reqUrl :: W.Request -> ByteString
reqUrl r = W.rawPathInfo r <> W.rawQueryString r
-- Responses -------------------------------------------------------------------
noHeader :: HasLogFunc e => RIO e a
noHeader = do
logError "Response block with no response header."
error "Bad HttpEvent: Response block with no response header."
dupHead :: HasLogFunc e => RIO e a
dupHead = do
logError "Multiple %head actions on one request"
error "Bad HttpEvent: Multiple header actions per on one request."
{-|
- Immediately yield all of the initial chunks
- Yield the data from %bloc action.
- Close the stream when we hit a %done action.
-}
streamBlocks
:: HasLogFunc e
=> e
-> ByteString
-> STM RespAct
-> ConduitT () (Flush Builder) IO ()
streamBlocks env init getAct = send init >> loop
where
loop = atomically getAct >>= \case
RAHead _ _ _ -> runRIO env dupHead
RAFull _ _ _ -> runRIO env dupHead
RADone -> pure ()
RABloc c -> send c >> loop
send "" = pure ()
send c = do
runRIO env (logTrace (display ("sending chunk " <> tshow c)))
yield $ Chunk $ fromByteString c
yield Flush
sendResponse
:: HasLogFunc e
=> (W.Response -> IO W.ResponseReceived)
-> STM RespAct
-> RIO e W.ResponseReceived
sendResponse cb waitAct = do
env <- ask
atomically waitAct >>= \case
RADone -> io $ cb $ W.responseLBS (H.mkStatus 444 "No Response") [] ""
RAFull s h b -> io $ cb $ W.responseLBS s h $ fromStrict b
RAHead s h b -> io $ cb $ W.responseSource s h $ streamBlocks env b waitAct
RABloc _ -> noHeader
liveReq :: Ship -> TVar LiveReqs -> RAcquire e (Word64, STM RespAct)
liveReq who vLiv = mkRAcquire ins del
where
ins = atomically (newLiveReq who vLiv)
del = atomically . rmLiveReq vLiv . fst
app
:: HasLogFunc e
=> e
-> Ship
-> TVar LiveReqs
-> (Word64 -> ReqInfo -> STM ())
-> (Word64 -> STM ())
-> W.Application
app env who liv inform cancel req respond =
runRIO env $ rwith (liveReq who liv) $ \(reqId, respApi) -> do
bod <- io (toStrict <$> W.strictRequestBody req)
met <- maybe (error "bad method") pure (cookMeth req)
let adr = reqAddr req
hdr = W.requestHeaders req
url = reqUrl req
atomically $ inform reqId $ ReqInfo adr met url hdr bod
try (sendResponse respond respApi) >>= \case
Right rr -> pure rr
Left exn -> do
atomically (cancel reqId)
logError $ display ("Exception during request" <> tshow exn)
throwIO (exn :: SomeException)