mirror of
https://github.com/urbit/shrub.git
synced 2024-12-25 13:04:17 +03:00
275 lines
8.4 KiB
Haskell
275 lines
8.4 KiB
Haskell
{- |
|
|
Raw UDP Server used by Ames driver.
|
|
|
|
1. Opens a UDP socket and makes sure that it stays open.
|
|
|
|
- If can't open the port, wait and try again repeatedly.
|
|
- If there is an error reading to or writing from the open socket,
|
|
close it and open another, making sure, however, to reuse the
|
|
same port
|
|
NOTE: It's not clear what, if anything, closing and reopening
|
|
the socket does. We're keeping this behavior out of conservatism
|
|
until we understand it better.
|
|
|
|
2. Receives packets from the socket.
|
|
|
|
- When packets come in from the socket, they go into a bounded queue.
|
|
- If the queue is full, the packet is dropped.
|
|
- If the socket is closed, wait and try again repeatedly.
|
|
- `usRecv` gets the first packet from the queue.
|
|
|
|
3. Sends packets to the socket.
|
|
|
|
- Packets sent to `usSend` enter a bounded queue.
|
|
- If that queue is full, the packet is dropped.
|
|
- Packets are taken off the queue one at a time.
|
|
- If the socket is closed (or broken), the packet is dropped.
|
|
|
|
4. Runs until `usKill` is run, then all threads are killed and the
|
|
socket is closed.
|
|
-}
|
|
|
|
module Urbit.Vere.Ames.UDP
|
|
( UdpServ(..)
|
|
, fakeUdpServ
|
|
, realUdpServ
|
|
)
|
|
where
|
|
|
|
import Urbit.Prelude
|
|
import Urbit.Vere.Ports
|
|
|
|
import Network.Socket
|
|
|
|
import Control.Monad.STM (retry)
|
|
import Network.Socket.ByteString (recvFrom, sendTo)
|
|
import Urbit.Vere.Stat (AmesStat(..), bump)
|
|
|
|
-- Types -----------------------------------------------------------------------
|
|
|
|
data UdpServ = UdpServ
|
|
{ usSend :: SockAddr -> ByteString -> IO ()
|
|
, usRecv :: STM (PortNumber, HostAddress, ByteString)
|
|
, usKill :: IO ()
|
|
}
|
|
|
|
|
|
-- Utils -----------------------------------------------------------------------
|
|
|
|
{- |
|
|
Writes to queue and returns `True` unless the queue is full, then do
|
|
nothing and return `False`.
|
|
-}
|
|
tryWriteTBQueue :: TBQueue x -> x -> STM Bool
|
|
tryWriteTBQueue q x = do
|
|
isFullTBQueue q >>= \case
|
|
True -> pure False
|
|
False -> writeTBQueue q x $> True
|
|
|
|
{- |
|
|
Open a UDP socket and bind it to a port
|
|
-}
|
|
doBind :: PortNumber -> HostAddress -> IO (Either IOError Socket)
|
|
doBind por hos = tryIOError $ do
|
|
sok <- io $ socket AF_INET Datagram defaultProtocol
|
|
() <- io $ bind sok (SockAddrInet por hos)
|
|
pure sok
|
|
|
|
{- |
|
|
Open a UDP socket and bind it to a port.
|
|
|
|
If this fails, wait 250ms and repeat forever.
|
|
-}
|
|
forceBind :: HasLogFunc e => PortNumber -> HostAddress -> RIO e Socket
|
|
forceBind por hos = go
|
|
where
|
|
go = do
|
|
logInfo (display ("AMES: UDP: Opening socket on port " <> tshow por))
|
|
io (doBind por hos) >>= \case
|
|
Right sk -> do
|
|
logInfo (display ("AMES: UDP: Opened socket on port " <> tshow por))
|
|
pure sk
|
|
Left err -> do
|
|
logInfo (display ("AMES: UDP: " <> tshow err))
|
|
logInfo ("AMES: UDP: Failed to open UDP socket. Waiting")
|
|
threadDelay 250_000
|
|
go
|
|
|
|
{- |
|
|
Attempt to send a packet to a socket.
|
|
|
|
If it fails, return `False`. Otherwise, return `True`.
|
|
-}
|
|
sendPacket :: HasLogFunc e => ByteString -> SockAddr -> Socket -> RIO e Bool
|
|
sendPacket fullBytes adr sok = do
|
|
logDebug $ displayShow ("AMES", "UDP", "Sending packet.")
|
|
res <- io $ tryIOError $ go fullBytes
|
|
case res of
|
|
Left err -> do
|
|
logError $ displayShow ("AMES", "UDP", "Failed to send packet", err)
|
|
pure False
|
|
Right () -> do
|
|
logDebug $ displayShow ("AMES", "UDP", "Packet sent.")
|
|
pure True
|
|
where
|
|
go byt = do
|
|
sent <- sendTo sok byt adr
|
|
when (sent /= length byt) $ do
|
|
go (drop sent byt)
|
|
|
|
{- |
|
|
Attempt to receive a packet from a socket.
|
|
|
|
- If an exception is throw, return `Left exn`.
|
|
- If it wasn't an IPv4 packet, return `Right Nothing`.
|
|
- Otherwise, return `Right (Just packet)`.
|
|
-}
|
|
recvPacket
|
|
:: HasLogFunc e
|
|
=> Socket
|
|
-> RIO e (Either IOError (Maybe (ByteString, PortNumber, HostAddress)))
|
|
recvPacket sok = do
|
|
io (tryIOError $ recvFrom sok 4096) <&> \case
|
|
Left exn -> Left exn
|
|
Right (b, SockAddrInet p a) -> Right (Just (b, p, a))
|
|
Right (_, _ ) -> Right Nothing
|
|
|
|
|
|
-- Fake Server for No-Networking Mode ------------------------------------------
|
|
|
|
{- |
|
|
Fake UDP API for no-networking configurations.
|
|
-}
|
|
fakeUdpServ :: HasLogFunc e => RIO e UdpServ
|
|
fakeUdpServ = do
|
|
logInfo $ displayShow ("AMES", "UDP", "\"Starting\" fake UDP server.")
|
|
pure UdpServ { .. }
|
|
where
|
|
usSend = \_ _ -> pure ()
|
|
usRecv = retry
|
|
usKill = pure ()
|
|
|
|
|
|
-- Real Server -----------------------------------------------------------------
|
|
|
|
{- |
|
|
Real UDP server. See module-level docs.
|
|
-}
|
|
realUdpServ
|
|
:: forall e
|
|
. (HasLogFunc e, HasPortControlApi e)
|
|
=> PortNumber
|
|
-> HostAddress
|
|
-> AmesStat
|
|
-> RIO e UdpServ
|
|
realUdpServ startPort hos sat = do
|
|
logInfo $ displayShow ("AMES", "UDP", "Starting real UDP server.")
|
|
|
|
env <- ask
|
|
|
|
vSock <- newTVarIO Nothing
|
|
vFail <- newEmptyTMVarIO
|
|
qSend <- newTBQueueIO 100 -- TODO Tuning
|
|
qRecv <- newTBQueueIO 100 -- TODO Tuning
|
|
|
|
{-
|
|
If reading or writing to a socket fails, unbind it and tell the
|
|
socket-open thread to close it and open another.
|
|
|
|
This is careful about edge-cases. In any of these cases, do nothing.
|
|
|
|
- If vSock isn't set to the socket we used, do nothing.
|
|
- If vFail is already set (another thread signaled failure already).
|
|
-}
|
|
let signalBrokenSocket :: Socket -> RIO e ()
|
|
signalBrokenSocket sock = do
|
|
logInfo $ displayShow ("AMES", "UDP"
|
|
, "Socket broken. Requesting new socket"
|
|
)
|
|
atomically $ do
|
|
mSock <- readTVar vSock
|
|
mFail <- tryReadTMVar vFail
|
|
when (mSock == Just sock && mFail == Nothing) $ do
|
|
putTMVar vFail sock
|
|
writeTVar vSock Nothing
|
|
|
|
enqueueRecvPacket :: PortNumber -> HostAddress -> ByteString -> RIO e ()
|
|
enqueueRecvPacket p a b = do
|
|
did <- atomically (tryWriteTBQueue qRecv (p, a, b))
|
|
when (did == False) $ do
|
|
bump (asUqf sat)
|
|
logWarn $ displayShow $ ("AMES", "UDP",)
|
|
"Dropping inbound packet because queue is full."
|
|
|
|
enqueueSendPacket :: SockAddr -> ByteString -> RIO e ()
|
|
enqueueSendPacket a b = do
|
|
did <- atomically (tryWriteTBQueue qSend (a, b))
|
|
when (did == False) $ do
|
|
logWarn "AMES: UDP: Dropping outbound packet because queue is full."
|
|
let opener por = do
|
|
logInfo $ displayShow $ ("AMES", "UDP", "Trying to open socket, port",)
|
|
por
|
|
sk <- forceBind por hos
|
|
sn <- io $ getSocketName sk
|
|
sp <- io $ socketPort sk
|
|
logInfo $ displayShow $ ("AMES", "UDP", "Got socket", sn, sp)
|
|
|
|
let waitForRelease = do
|
|
atomically (writeTVar vSock (Just sk))
|
|
broken <- atomically (takeTMVar vFail)
|
|
logWarn "AMES: UDP: Closing broken socket."
|
|
io (close broken)
|
|
|
|
case sn of
|
|
(SockAddrInet boundPort _) ->
|
|
-- When we're on IPv4, maybe port forward at the NAT.
|
|
rwith (requestPortAccess $ fromIntegral boundPort) $
|
|
\() -> waitForRelease
|
|
_ -> waitForRelease
|
|
|
|
opener sp
|
|
|
|
tOpen <- async $ opener startPort
|
|
|
|
tSend <- async $ forever $ join $ atomically $ do
|
|
(adr, byt) <- readTBQueue qSend
|
|
readTVar vSock <&> \case
|
|
Nothing -> pure ()
|
|
Just sk -> do
|
|
okay <- sendPacket byt adr sk
|
|
unless okay (signalBrokenSocket sk)
|
|
|
|
tRecv <- async $ forever $ do
|
|
atomically (readTVar vSock) >>= \case
|
|
Nothing -> threadDelay 100_000
|
|
Just sk -> do
|
|
recvPacket sk >>= \case
|
|
Left exn -> do
|
|
bump (asUdf sat)
|
|
logError "AMES: UDP: Failed to receive packet"
|
|
signalBrokenSocket sk
|
|
Right Nothing -> do
|
|
bump (asUi6 sat)
|
|
logError "AMES: UDP: Dropping non-ipv4 packet"
|
|
pure ()
|
|
Right (Just (b, p, a)) -> do
|
|
logDebug "AMES: UDP: Received packet."
|
|
bump (asUdp sat)
|
|
enqueueRecvPacket p a b
|
|
|
|
let shutdown = do
|
|
logInfo "AMES: UDP: Shutting down. (killing threads)"
|
|
cancel tOpen
|
|
cancel tSend
|
|
cancel tRecv
|
|
logInfo "AMES: UDP: Shutting down. (closing socket)"
|
|
io $ join $ atomically $ do
|
|
res <- readTVar vSock <&> maybe (pure ()) close
|
|
writeTVar vSock Nothing
|
|
pure res
|
|
|
|
pure $ UdpServ { usSend = \a b -> runRIO env (enqueueSendPacket a b)
|
|
, usRecv = readTBQueue qRecv
|
|
, usKill = runRIO env shutdown
|
|
}
|