king: Factored out UDP flow from Ames driver.

This commit is contained in:
Benjamin Summers 2020-05-05 12:57:05 -07:00
parent 47bf14f0f2
commit 21dcddc65b
2 changed files with 360 additions and 167 deletions

View File

@ -1,32 +1,30 @@
{-|
Ames IO Driver -- UDP
Ames IO Driver
-}
module Urbit.Vere.Ames (ames) where
import Urbit.Prelude
import Control.Monad.Extra hiding (mapM_)
import Network.Socket hiding (recvFrom, sendTo)
import Network.Socket.ByteString
import Urbit.Arvo hiding (Fake)
import Urbit.King.Config
import Urbit.Vere.Pier.Types
import qualified Data.ByteString as BS
import Urbit.Vere.Ames.UDP (UdpServ(..), fakeUdpServ, realUdpServ)
import qualified Data.Map as M
import qualified Urbit.Ob as Ob
import qualified Urbit.Time as Time
-- Types -----------------------------------------------------------------------
data AmesDrv = AmesDrv
{ aTurfs :: TVar (Maybe [Turf])
, aGalaxies :: IORef (M.Map Galaxy (Async (), TQueue ByteString))
, aSocket :: TVar (Maybe Socket)
, aListener :: Async ()
, aSendingQueue :: TQueue (SockAddr, ByteString)
, aSendingThread :: Async ()
, aUdpServ :: UdpServ
, aRecvTid :: Async ()
}
data NetworkMode = Fake | Localhost | Real | NoNetwork
@ -51,6 +49,13 @@ localhost = tupleToHostAddress (127,0,0,1)
inaddrAny :: HostAddress
inaddrAny = tupleToHostAddress (0, 0, 0, 0)
modeAddress :: NetworkMode -> Maybe HostAddress
modeAddress = \case
Fake -> Just localhost
Localhost -> Just localhost
Real -> Just inaddrAny
NoNetwork -> Nothing
okayFakeAddr :: AmesDest -> Bool
okayFakeAddr = \case
EachYes _ -> True
@ -64,8 +69,7 @@ localhostSockAddr mode = \case
EachNo (Jammed (AAVoid v )) -> absurd v
bornEv :: KingId -> Ev
bornEv inst =
EvBlip $ BlipEvNewt $ NewtEvBorn (fromIntegral inst, ()) ()
bornEv inst = EvBlip $ BlipEvNewt $ NewtEvBorn (fromIntegral inst, ()) ()
hearEv :: PortNumber -> HostAddress -> ByteString -> Ev
hearEv p a bs =
@ -73,8 +77,8 @@ hearEv p a bs =
where
dest = EachNo $ Jammed $ AAIpv4 (Ipv4 a) (fromIntegral p)
_turfText :: Turf -> Text
_turfText = intercalate "." . reverse . fmap unCord . unTurf
turfText :: Turf -> Text
turfText = intercalate "." . reverse . fmap unCord . unTurf
renderGalaxy :: Galaxy -> Text
renderGalaxy = Ob.renderPatp . Ob.patp . fromIntegral . unPatp
@ -82,6 +86,31 @@ renderGalaxy = Ob.renderPatp . Ob.patp . fromIntegral . unPatp
--------------------------------------------------------------------------------
netMode :: HasNetworkConfig e => Bool -> RIO e NetworkMode
netMode True = pure Fake
netMode False = view (networkConfigL . ncNetMode . to cvt)
where
cvt :: NetMode -> NetworkMode
cvt = \case
NMNormal -> Real
NMLocalhost -> Localhost
NMNone -> NoNetwork
udpPort :: Bool -> Ship -> HasNetworkConfig e => RIO e PortNumber
udpPort isFake who = do
mode <- netMode isFake
mPort <- view (networkConfigL . ncAmesPort)
pure $ maybe (listenPort mode who) fromIntegral mPort
udpServ :: (HasLogFunc e, HasNetworkConfig e) => Bool -> Ship -> RIO e UdpServ
udpServ isFake who = do
mode <- netMode isFake
port <- udpPort isFake who
case modeAddress mode of
Nothing -> fakeUdpServ
Just host -> realUdpServ port host
{-|
inst -- Process instance number.
who -- Which ship are we?
@ -105,145 +134,38 @@ ames inst who isFake enqueueEv stderr =
runAmes :: RAcquire e (EffCb e NewtEf)
runAmes = do
mode <- rio (netMode isFake)
drv <- mkRAcquire start stop
pure (handleEffect drv)
pure (handleEffect drv mode)
start :: RIO e AmesDrv
start = do
aTurfs <- newTVarIO Nothing
aGalaxies <- newIORef mempty
aSocket <- newTVarIO Nothing
bindSock aSocket
aListener <- async (waitPacket aSocket)
aSendingQueue <- newTQueueIO
aSendingThread <- async (sendingThread aSendingQueue aSocket)
pure $ AmesDrv{..}
aUdpServ <- udpServ isFake who
aRecvTid <- queuePacketsThread aUdpServ
pure (AmesDrv{..})
netMode :: RIO e NetworkMode
netMode = do
if isFake
then pure Fake
else view (networkConfigL . ncNetMode) >>= \case
NMNormal -> pure Real
NMLocalhost -> pure Localhost
NMNone -> pure NoNetwork
queuePacketsThread :: UdpServ -> RIO e (Async ())
queuePacketsThread UdpServ{..} = async $ forever $ atomically $ do
(p, a, b) <- usRecv
enqueueEv (hearEv p a b)
stop :: AmesDrv -> RIO e ()
stop AmesDrv{..} = do
io (usKill aUdpServ)
cancel aRecvTid
readIORef aGalaxies >>= mapM_ (cancel . fst)
cancel aSendingThread
cancel aListener
socket <- atomically $ readTVar aSocket
io $ maybeM (pure ()) (close') (pure socket)
bindSock :: TVar (Maybe Socket) -> RIO e ()
bindSock socketVar = getBindAddr >>= doBindSocket
where
getBindAddr = netMode >>= \case
Fake -> pure $ Just localhost
Localhost -> pure $ Just localhost
Real -> pure $ Just inaddrAny
NoNetwork -> pure Nothing
doBindSocket :: Maybe HostAddress -> RIO e ()
doBindSocket Nothing = atomically $ writeTVar socketVar Nothing
doBindSocket (Just bindAddr) = do
mode <- netMode
mPort <- view (networkConfigL . ncAmesPort)
let ourPort = maybe (listenPort mode who) fromIntegral mPort
s <- io $ socket AF_INET Datagram defaultProtocol
logTrace $ displayShow ("(ames) Binding to port ", ourPort)
let addr = SockAddrInet ourPort bindAddr
() <- io $ bind s addr
atomically $ writeTVar socketVar (Just s)
waitPacket :: TVar (Maybe Socket) -> RIO e ()
waitPacket socketVar = do
(atomically $ readTVar socketVar) >>= \case
Nothing -> pure ()
Just s -> do
res <- io $ tryIOError $ recvFrom s 4096
case res of
Left exn -> do
-- When we have a socket exception, we need to rebuild the
-- socket.
logTrace $ displayShow ("(ames) Socket exception. Rebinding.")
bindSock socketVar
Right (bs, addr) -> do
logTrace $ displayShow ("(ames) Received packet from ", addr)
case addr of
SockAddrInet p a -> atomically (enqueueEv $ hearEv p a bs)
_ -> pure ()
waitPacket socketVar
handleEffect :: AmesDrv -> NewtEf -> RIO e ()
handleEffect drv@AmesDrv{..} = \case
handleEffect :: AmesDrv -> NetworkMode -> NewtEf -> RIO e ()
handleEffect drv@AmesDrv{..} mode = \case
NewtEfTurf (_id, ()) turfs -> do
atomically $ writeTVar aTurfs (Just turfs)
NewtEfSend (_id, ()) dest (MkBytes bs) -> do
atomically (readTVar aTurfs) >>= \case
Nothing -> pure ()
Just turfs -> do
mode <- netMode
(sendPacket drv mode dest bs)
sendPacket :: AmesDrv -> NetworkMode -> AmesDest -> ByteString -> RIO e ()
sendPacket AmesDrv{..} NoNetwork dest bs = pure ()
sendPacket AmesDrv{..} Fake dest bs = do
when (okayFakeAddr dest) $ atomically $
writeTQueue aSendingQueue ((localhostSockAddr Fake dest), bs)
-- In localhost only mode, regardless of the actual destination, send it to
-- localhost.
sendPacket AmesDrv{..} Localhost dest bs = atomically $
writeTQueue aSendingQueue ((localhostSockAddr Localhost dest), bs)
sendPacket AmesDrv{..} Real (EachYes galaxy) bs = do
galaxies <- readIORef aGalaxies
queue <- case M.lookup galaxy galaxies of
Just (_, queue) -> pure queue
Nothing -> do
inQueue <- newTQueueIO
thread <- async $ galaxyResolver galaxy aTurfs inQueue aSendingQueue
modifyIORef (aGalaxies) (M.insert galaxy (thread, inQueue))
pure inQueue
atomically $ writeTQueue queue bs
sendPacket AmesDrv{..} Real (EachNo (Jammed (AAIpv4 a p))) bs = do
let addr = SockAddrInet (fromIntegral p) (unIpv4 a)
atomically $ writeTQueue aSendingQueue (addr, bs)
sendPacket AmesDrv{..} Real (EachNo (Jammed (AAVoid v))) bs = do
pure (absurd v)
-- An outbound queue of messages. We can only write to a socket from one
-- thread, so coalesce those writes here.
sendingThread :: TQueue (SockAddr, ByteString)
-> TVar (Maybe Socket)
-> RIO e ()
sendingThread queue socketVar = forever $
do
(dest, bs) <- atomically $ readTQueue queue
logTrace $ displayShow ("(ames) Sending packet to ", dest)
sendAll bs dest
where
sendAll bs dest = do
mybSocket <- atomically $ readTVar socketVar
case mybSocket of
Nothing -> pure ()
Just socket -> do
bytesSent <- io $ sendTo socket bs dest
when (bytesSent /= BS.length bs) $ do
sendAll (drop bytesSent bs) dest
Just turfs -> sendPacket drv mode dest bs
-- Asynchronous thread per galaxy which handles domain resolution, and can
-- block its own queue of ByteStrings to send.
@ -253,9 +175,9 @@ ames inst who isFake enqueueEv stderr =
--
-- TODO: Figure out how the real haskell time library works.
galaxyResolver :: Galaxy -> TVar (Maybe [Turf]) -> TQueue ByteString
-> TQueue (SockAddr, ByteString)
-> (SockAddr -> ByteString -> RIO e ())
-> RIO e ()
galaxyResolver galaxy turfVar incoming outgoing =
galaxyResolver galaxy turfVar incoming queueSendToGalaxy =
loop Nothing Time.unixEpoch
where
loop :: Maybe SockAddr -> Time.Wen -> RIO e ()
@ -314,8 +236,38 @@ ames inst who isFake enqueueEv stderr =
name <- case stripPrefix "~" nameWithSig of
Nothing -> error "Urbit.ob didn't produce string with ~"
Just x -> pure (unpack x)
pure $ name ++ "." ++ (unpack $ _turfText turf)
pure $ name ++ "." ++ (unpack $ turfText turf)
queueSendToGalaxy :: SockAddr -> ByteString -> RIO e ()
queueSendToGalaxy inet packet = do
atomically $ writeTQueue outgoing (inet, packet)
sendPacket :: AmesDrv -> NetworkMode -> AmesDest -> ByteString -> RIO e ()
sendPacket AmesDrv{..} mode dest bs = do
let go adr byt = io (usSend aUdpServ adr byt)
case (mode, dest) of
(NoNetwork, _) -> do
pure ()
(Fake, _) | okayFakeAddr dest -> do
go (localhostSockAddr Fake dest) bs
(Fake, _) | otherwise -> do
pure ()
(Localhost, _) -> do
go (localhostSockAddr Localhost dest) bs
(Real, EachYes galaxy) -> do
galaxies <- readIORef aGalaxies
queue <- case M.lookup galaxy galaxies of
Just (_, queue) -> pure queue
Nothing -> do
inQueue <- newTQueueIO
thread <- async (galaxyResolver galaxy aTurfs inQueue go)
modifyIORef (aGalaxies) (M.insert galaxy (thread, inQueue))
pure inQueue
atomically $ writeTQueue queue bs
(Real, EachNo (Jammed (AAIpv4 a p))) -> do
go (SockAddrInet (fromIntegral p) (unIpv4 a)) bs
(Real, EachNo (Jammed (AAVoid v))) -> do
absurd v

View File

@ -0,0 +1,241 @@
{- |
Raw UDP Server used by Ames driver.
1. Opens a UDP socket and makes sure that it stays open.
- If can't open the port, wait and try again repeatedly.
- If there is an error reading or writting from the open socket,
close it and open another.
2. Receives packets from the socket.
- When packets come in from the socket, they go into a bounded queue.
- If the queue is full, the packet is dropped.
- If the socket is closed, wait and try again repeatedly.
- `usRecv` gets the first packet from the queue.
3. Sends packets to the socket.
- Packets sent to `usSend` enter a bounded queue.
- If that queue is full, the packet is dropped.
- Packets are taken off the queue one at a time.
- If the socket is closed (or broken), the packet is dropped.
4. Runs until `usKill` is run, then all threads are killed and the
socket is closed.
-}
module Urbit.Vere.Ames.UDP
( UdpServ(..)
, fakeUdpServ
, realUdpServ
)
where
import Urbit.Prelude
import Network.Socket hiding (recvFrom, sendTo)
import Control.Monad.STM (retry)
import Network.Socket.ByteString (recvFrom, sendTo)
-- Types -----------------------------------------------------------------------
data UdpServ = UdpServ
{ usSend :: SockAddr -> ByteString -> IO ()
, usRecv :: STM (PortNumber, HostAddress, ByteString)
, usKill :: IO ()
}
-- Utils -----------------------------------------------------------------------
{- |
Writes to queue and returns `True` unless the queue is full, then do
nothing and return `False`.
-}
tryWriteTBQueue :: TBQueue x -> x -> STM Bool
tryWriteTBQueue q x = do
isFullTBQueue q >>= \case
True -> pure False
False -> writeTBQueue q x $> True
{- |
Open a UDP socket and bind it to a port
-}
doBind :: PortNumber -> HostAddress -> IO (Either IOError Socket)
doBind por hos = tryIOError $ do
sok <- io $ socket AF_INET Datagram defaultProtocol
() <- io $ bind sok (SockAddrInet por hos)
pure sok
{- |
Open a UDP socket and bind it to a port.
If this fails, wait 250ms and repeat forever.
-}
forceBind :: HasLogFunc e => PortNumber -> HostAddress -> RIO e Socket
forceBind por hos = go
where
go = do
logTrace (display ("AMES: UDP: Opening socket on port " <> tshow por))
io (doBind por hos) >>= \case
Right sk -> do
logTrace (display ("AMES: UDP: Opened socket on port " <> tshow por))
pure sk
Left err -> do
logTrace (display ("AMES: UDP: " <> tshow err))
logTrace ("AMES: UDP: Failed to open UDP socket. Waiting")
threadDelay 250_000
go
{- |
Attempt to send a packet to a socket.
If it fails, return `False`. Otherwise, return `True`.
-}
sendPacket :: HasLogFunc e => ByteString -> SockAddr -> Socket -> RIO e Bool
sendPacket fullBytes adr sok = do
logTrace ("AMES: UDP: Sending packet")
res <- io $ tryIOError $ go fullBytes
case res of
Left err -> do
logError $ display ("AMES: UDP: " <> tshow err)
logError "AMES: UDP: Failed to send packet"
pure False
Right () -> do
logError "AMES: UDP: Packet sent"
pure True
where
go byt = do
sent <- sendTo sok byt adr
when (sent /= length byt) $ do
go (drop sent byt)
{- |
Attempt to receive a packet from a socket.
- If an exception is throw, return `Left exn`.
- If it wasn't an IPv4 packet, return `Right Nothing`.
- Otherwise, return `Right (Just packet)`.
-}
recvPacket
:: HasLogFunc e
=> Socket
-> RIO e (Either IOError (Maybe (ByteString, PortNumber, HostAddress)))
recvPacket sok = do
io (tryIOError $ recvFrom sok 4096) <&> \case
Left exn -> Left exn
Right (b, SockAddrInet p a) -> Right (Just (b, p, a))
Right (_, _ ) -> Right Nothing
-- Fake Server for No-Networking Mode ------------------------------------------
{- |
Fake UDP API for no-networking configurations.
-}
fakeUdpServ :: HasLogFunc e => RIO e UdpServ
fakeUdpServ = do
logTrace "AMES: UDP: \"Starting\" fake UDP server."
pure UdpServ { .. }
where
usSend = \_ _ -> pure ()
usRecv = retry
usKill = pure ()
-- Real Server -----------------------------------------------------------------
{- |
Real UDP server. See module-level docs.
-}
realUdpServ
:: forall e . HasLogFunc e => PortNumber -> HostAddress -> RIO e UdpServ
realUdpServ por hos = do
logTrace "AMES: UDP: Starting real UDP server."
env <- ask
vSock <- newTVarIO Nothing
vFail <- newEmptyTMVarIO
qSend <- newTBQueueIO 100 -- TODO Tuning
qRecv <- newTBQueueIO 100 -- TODO Tuning
{-
If reading or writing to a socket fails, unbind it and tell the
socket-open thread to close it and open another.
This is careful about edge-cases. In any of these cases, do nothing.
- If vSock isn't set to the socket we used, do nothing.
- If vFail is already set (another thread signaled failure already).
-}
let signalBrokenSocket :: Socket -> RIO e ()
signalBrokenSocket sock = do
logTrace "AMES: UDP: Socket broken. Requesting new socket"
atomically $ do
mSock <- readTVar vSock
mFail <- tryReadTMVar vFail
when (mSock == Just sock && mFail == Nothing) $ do
putTMVar vFail sock
writeTVar vSock Nothing
enqueueRecvPacket :: PortNumber -> HostAddress -> ByteString -> RIO e ()
enqueueRecvPacket p a b = do
did <- atomically (tryWriteTBQueue qRecv (p, a, b))
when (did == False) $ do
logWarn "AMES: UDP: Dropping inbound packet because queue is full."
enqueueSendPacket :: SockAddr -> ByteString -> RIO e ()
enqueueSendPacket a b = do
did <- atomically (tryWriteTBQueue qSend (a, b))
when (did == False) $ do
logWarn "AMES: UDP: Dropping outbound packet because queue is full."
tOpen <- async $ forever $ do
sk <- forceBind por hos
atomically (writeTVar vSock (Just sk))
broken <- atomically (takeTMVar vFail)
logTrace "AMES: UDP: Closing broken socket."
io (close broken)
tSend <- async $ forever $ join $ atomically $ do
(adr, byt) <- readTBQueue qSend
readTVar vSock <&> \case
Nothing -> pure ()
Just sk -> do
okay <- sendPacket byt adr sk
unless okay (signalBrokenSocket sk)
tRecv <- async $ forever $ do
atomically (readTVar vSock) >>= \case
Nothing -> threadDelay 100_000
Just sk -> do
recvPacket sk >>= \case
Left exn -> do
logError "AMES: UDP: Failed to receive packet"
signalBrokenSocket sk
Right Nothing -> do
logError "AMES: UDP: Dropping non-ipv4 packet"
pure ()
Right (Just (b, p, a)) -> do
logTrace "AMES: UDP: Received packet."
enqueueRecvPacket p a b
let shutdown = do
logTrace "AMES: UDP: Shutting down. (killing threads)"
cancel tOpen
cancel tSend
cancel tRecv
logTrace "AMES: UDP: Shutting down. (closing socket)"
io $ join $ atomically $ do
res <- readTVar vSock <&> maybe (pure ()) close
writeTVar vSock Nothing
pure res
pure $ UdpServ { usSend = \a b -> runRIO env (enqueueSendPacket a b)
, usRecv = readTBQueue qRecv
, usKill = runRIO env shutdown
}