diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs index 9d8d09846..7f0b5a36f 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs @@ -1,32 +1,30 @@ {-| - Ames IO Driver -- UDP + Ames IO Driver -} module Urbit.Vere.Ames (ames) where import Urbit.Prelude -import Control.Monad.Extra hiding (mapM_) -import Network.Socket hiding (recvFrom, sendTo) -import Network.Socket.ByteString -import Urbit.Arvo hiding (Fake) +import Network.Socket hiding (recvFrom, sendTo) +import Urbit.Arvo hiding (Fake) import Urbit.King.Config import Urbit.Vere.Pier.Types -import qualified Data.ByteString as BS -import qualified Data.Map as M -import qualified Urbit.Ob as Ob -import qualified Urbit.Time as Time +import Urbit.Vere.Ames.UDP (UdpServ(..), fakeUdpServ, realUdpServ) + +import qualified Data.Map as M +import qualified Urbit.Ob as Ob +import qualified Urbit.Time as Time + -- Types ----------------------------------------------------------------------- data AmesDrv = AmesDrv - { aTurfs :: TVar (Maybe [Turf]) - , aGalaxies :: IORef (M.Map Galaxy (Async (), TQueue ByteString)) - , aSocket :: TVar (Maybe Socket) - , aListener :: Async () - , aSendingQueue :: TQueue (SockAddr, ByteString) - , aSendingThread :: Async () + { aTurfs :: TVar (Maybe [Turf]) + , aGalaxies :: IORef (M.Map Galaxy (Async (), TQueue ByteString)) + , aUdpServ :: UdpServ + , aRecvTid :: Async () } data NetworkMode = Fake | Localhost | Real | NoNetwork @@ -36,45 +34,51 @@ data NetworkMode = Fake | Localhost | Real | NoNetwork -- Utils ----------------------------------------------------------------------- galaxyPort :: NetworkMode -> Galaxy -> PortNumber -galaxyPort Fake (Patp g) = fromIntegral g + 31337 +galaxyPort Fake (Patp g) = fromIntegral g + 31337 galaxyPort Localhost (Patp g) = fromIntegral g + 13337 -galaxyPort Real (Patp g) = fromIntegral g + 13337 +galaxyPort Real (Patp g) = fromIntegral g + 13337 galaxyPort NoNetwork _ = fromIntegral 0 listenPort :: NetworkMode -> Ship -> PortNumber listenPort m s | s < 256 = galaxyPort m (fromIntegral s) -listenPort m _ = 0 +listenPort m _ = 0 localhost :: HostAddress -localhost = tupleToHostAddress (127,0,0,1) +localhost = tupleToHostAddress (127, 0, 0, 1) inaddrAny :: HostAddress -inaddrAny = tupleToHostAddress (0,0,0,0) +inaddrAny = tupleToHostAddress (0, 0, 0, 0) + +modeAddress :: NetworkMode -> Maybe HostAddress +modeAddress = \case + Fake -> Just localhost + Localhost -> Just localhost + Real -> Just inaddrAny + NoNetwork -> Nothing okayFakeAddr :: AmesDest -> Bool okayFakeAddr = \case - EachYes _ -> True - EachNo (Jammed (AAIpv4 (Ipv4 a) _)) -> a == localhost - EachNo (Jammed (AAVoid v)) -> absurd v + EachYes _ -> True + EachNo (Jammed (AAIpv4 (Ipv4 a) _)) -> a == localhost + EachNo (Jammed (AAVoid v )) -> absurd v localhostSockAddr :: NetworkMode -> AmesDest -> SockAddr localhostSockAddr mode = \case - EachYes g -> SockAddrInet (galaxyPort mode g) localhost - EachNo (Jammed (AAIpv4 _ p)) -> SockAddrInet (fromIntegral p) localhost - EachNo (Jammed (AAVoid v)) -> absurd v + EachYes g -> SockAddrInet (galaxyPort mode g) localhost + EachNo (Jammed (AAIpv4 _ p)) -> SockAddrInet (fromIntegral p) localhost + EachNo (Jammed (AAVoid v )) -> absurd v bornEv :: KingId -> Ev -bornEv inst = - EvBlip $ BlipEvNewt $ NewtEvBorn (fromIntegral inst, ()) () +bornEv inst = EvBlip $ BlipEvNewt $ NewtEvBorn (fromIntegral inst, ()) () hearEv :: PortNumber -> HostAddress -> ByteString -> Ev hearEv p a bs = - EvBlip $ BlipEvAmes $ AmesEvHear () dest (MkBytes bs) - where - dest = EachNo $ Jammed $ AAIpv4 (Ipv4 a) (fromIntegral p) + EvBlip $ BlipEvAmes $ AmesEvHear () dest (MkBytes bs) + where + dest = EachNo $ Jammed $ AAIpv4 (Ipv4 a) (fromIntegral p) -_turfText :: Turf -> Text -_turfText = intercalate "." . reverse . fmap unCord . unTurf +turfText :: Turf -> Text +turfText = intercalate "." . reverse . fmap unCord . unTurf renderGalaxy :: Galaxy -> Text renderGalaxy = Ob.renderPatp . Ob.patp . fromIntegral . unPatp @@ -82,6 +86,31 @@ renderGalaxy = Ob.renderPatp . Ob.patp . fromIntegral . unPatp -------------------------------------------------------------------------------- +netMode :: HasNetworkConfig e => Bool -> RIO e NetworkMode +netMode True = pure Fake +netMode False = view (networkConfigL . ncNetMode . to cvt) + where + cvt :: NetMode -> NetworkMode + cvt = \case + NMNormal -> Real + NMLocalhost -> Localhost + NMNone -> NoNetwork + +udpPort :: Bool -> Ship -> HasNetworkConfig e => RIO e PortNumber +udpPort isFake who = do + mode <- netMode isFake + mPort <- view (networkConfigL . ncAmesPort) + pure $ maybe (listenPort mode who) fromIntegral mPort + +udpServ :: (HasLogFunc e, HasNetworkConfig e) => Bool -> Ship -> RIO e UdpServ +udpServ isFake who = do + mode <- netMode isFake + port <- udpPort isFake who + case modeAddress mode of + Nothing -> fakeUdpServ + Just host -> realUdpServ port host + + {-| inst -- Process instance number. who -- Which ship are we? @@ -105,145 +134,38 @@ ames inst who isFake enqueueEv stderr = runAmes :: RAcquire e (EffCb e NewtEf) runAmes = do - drv <- mkRAcquire start stop - pure (handleEffect drv) + mode <- rio (netMode isFake) + drv <- mkRAcquire start stop + pure (handleEffect drv mode) start :: RIO e AmesDrv start = do - aTurfs <- newTVarIO Nothing - aGalaxies <- newIORef mempty - aSocket <- newTVarIO Nothing - bindSock aSocket - aListener <- async (waitPacket aSocket) - aSendingQueue <- newTQueueIO - aSendingThread <- async (sendingThread aSendingQueue aSocket) - pure $ AmesDrv{..} + aTurfs <- newTVarIO Nothing + aGalaxies <- newIORef mempty + aUdpServ <- udpServ isFake who + aRecvTid <- queuePacketsThread aUdpServ + pure (AmesDrv{..}) - netMode :: RIO e NetworkMode - netMode = do - if isFake - then pure Fake - else view (networkConfigL . ncNetMode) >>= \case - NMNormal -> pure Real - NMLocalhost -> pure Localhost - NMNone -> pure NoNetwork + queuePacketsThread :: UdpServ -> RIO e (Async ()) + queuePacketsThread UdpServ{..} = async $ forever $ atomically $ do + (p, a, b) <- usRecv + enqueueEv (hearEv p a b) stop :: AmesDrv -> RIO e () stop AmesDrv{..} = do - readIORef aGalaxies >>= mapM_ (cancel . fst) + io (usKill aUdpServ) + cancel aRecvTid + readIORef aGalaxies >>= mapM_ (cancel . fst) - cancel aSendingThread - cancel aListener - socket <- atomically $ readTVar aSocket - io $ maybeM (pure ()) (close') (pure socket) - - bindSock :: TVar (Maybe Socket) -> RIO e () - bindSock socketVar = getBindAddr >>= doBindSocket - where - getBindAddr = netMode >>= \case - Fake -> pure $ Just localhost - Localhost -> pure $ Just localhost - Real -> pure $ Just inaddrAny - NoNetwork -> pure Nothing - - doBindSocket :: Maybe HostAddress -> RIO e () - doBindSocket Nothing = atomically $ writeTVar socketVar Nothing - doBindSocket (Just bindAddr) = do - mode <- netMode - mPort <- view (networkConfigL . ncAmesPort) - let ourPort = maybe (listenPort mode who) fromIntegral mPort - s <- io $ socket AF_INET Datagram defaultProtocol - - logTrace $ displayShow ("(ames) Binding to port ", ourPort) - let addr = SockAddrInet ourPort bindAddr - () <- io $ bind s addr - - atomically $ writeTVar socketVar (Just s) - - waitPacket :: TVar (Maybe Socket) -> RIO e () - waitPacket socketVar = do - (atomically $ readTVar socketVar) >>= \case - Nothing -> pure () - Just s -> do - res <- io $ tryIOError $ recvFrom s 4096 - case res of - Left exn -> do - -- When we have a socket exception, we need to rebuild the - -- socket. - logTrace $ displayShow ("(ames) Socket exception. Rebinding.") - bindSock socketVar - Right (bs, addr) -> do - logTrace $ displayShow ("(ames) Received packet from ", addr) - case addr of - SockAddrInet p a -> atomically (enqueueEv $ hearEv p a bs) - _ -> pure () - - waitPacket socketVar - - - handleEffect :: AmesDrv -> NewtEf -> RIO e () - handleEffect drv@AmesDrv{..} = \case + handleEffect :: AmesDrv -> NetworkMode -> NewtEf -> RIO e () + handleEffect drv@AmesDrv{..} mode = \case NewtEfTurf (_id, ()) turfs -> do - atomically $ writeTVar aTurfs (Just turfs) + atomically $ writeTVar aTurfs (Just turfs) NewtEfSend (_id, ()) dest (MkBytes bs) -> do - atomically (readTVar aTurfs) >>= \case - Nothing -> pure () - Just turfs -> do - mode <- netMode - (sendPacket drv mode dest bs) - - sendPacket :: AmesDrv -> NetworkMode -> AmesDest -> ByteString -> RIO e () - - sendPacket AmesDrv{..} NoNetwork dest bs = pure () - - sendPacket AmesDrv{..} Fake dest bs = do - when (okayFakeAddr dest) $ atomically $ - writeTQueue aSendingQueue ((localhostSockAddr Fake dest), bs) - - -- In localhost only mode, regardless of the actual destination, send it to - -- localhost. - sendPacket AmesDrv{..} Localhost dest bs = atomically $ - writeTQueue aSendingQueue ((localhostSockAddr Localhost dest), bs) - - sendPacket AmesDrv{..} Real (EachYes galaxy) bs = do - galaxies <- readIORef aGalaxies - queue <- case M.lookup galaxy galaxies of - Just (_, queue) -> pure queue - Nothing -> do - inQueue <- newTQueueIO - thread <- async $ galaxyResolver galaxy aTurfs inQueue aSendingQueue - modifyIORef (aGalaxies) (M.insert galaxy (thread, inQueue)) - pure inQueue - - atomically $ writeTQueue queue bs - - sendPacket AmesDrv{..} Real (EachNo (Jammed (AAIpv4 a p))) bs = do - let addr = SockAddrInet (fromIntegral p) (unIpv4 a) - atomically $ writeTQueue aSendingQueue (addr, bs) - - sendPacket AmesDrv{..} Real (EachNo (Jammed (AAVoid v))) bs = do - pure (absurd v) - - -- An outbound queue of messages. We can only write to a socket from one - -- thread, so coalesce those writes here. - sendingThread :: TQueue (SockAddr, ByteString) - -> TVar (Maybe Socket) - -> RIO e () - sendingThread queue socketVar = forever $ - do - (dest, bs) <- atomically $ readTQueue queue - logTrace $ displayShow ("(ames) Sending packet to ", dest) - sendAll bs dest - where - sendAll bs dest = do - mybSocket <- atomically $ readTVar socketVar - case mybSocket of - Nothing -> pure () - Just socket -> do - bytesSent <- io $ sendTo socket bs dest - when (bytesSent /= BS.length bs) $ do - sendAll (drop bytesSent bs) dest + atomically (readTVar aTurfs) >>= \case + Nothing -> pure () + Just turfs -> sendPacket drv mode dest bs -- Asynchronous thread per galaxy which handles domain resolution, and can -- block its own queue of ByteStrings to send. @@ -253,9 +175,9 @@ ames inst who isFake enqueueEv stderr = -- -- TODO: Figure out how the real haskell time library works. galaxyResolver :: Galaxy -> TVar (Maybe [Turf]) -> TQueue ByteString - -> TQueue (SockAddr, ByteString) + -> (SockAddr -> ByteString -> RIO e ()) -> RIO e () - galaxyResolver galaxy turfVar incoming outgoing = + galaxyResolver galaxy turfVar incoming queueSendToGalaxy = loop Nothing Time.unixEpoch where loop :: Maybe SockAddr -> Time.Wen -> RIO e () @@ -314,8 +236,38 @@ ames inst who isFake enqueueEv stderr = name <- case stripPrefix "~" nameWithSig of Nothing -> error "Urbit.ob didn't produce string with ~" Just x -> pure (unpack x) - pure $ name ++ "." ++ (unpack $ _turfText turf) + pure $ name ++ "." ++ (unpack $ turfText turf) - queueSendToGalaxy :: SockAddr -> ByteString -> RIO e () - queueSendToGalaxy inet packet = do - atomically $ writeTQueue outgoing (inet, packet) + sendPacket :: AmesDrv -> NetworkMode -> AmesDest -> ByteString -> RIO e () + sendPacket AmesDrv{..} mode dest bs = do + let go adr byt = io (usSend aUdpServ adr byt) + + case (mode, dest) of + (NoNetwork, _) -> do + pure () + + (Fake, _) | okayFakeAddr dest -> do + go (localhostSockAddr Fake dest) bs + + (Fake, _) | otherwise -> do + pure () + + (Localhost, _) -> do + go (localhostSockAddr Localhost dest) bs + + (Real, EachYes galaxy) -> do + galaxies <- readIORef aGalaxies + queue <- case M.lookup galaxy galaxies of + Just (_, queue) -> pure queue + Nothing -> do + inQueue <- newTQueueIO + thread <- async (galaxyResolver galaxy aTurfs inQueue go) + modifyIORef (aGalaxies) (M.insert galaxy (thread, inQueue)) + pure inQueue + atomically $ writeTQueue queue bs + + (Real, EachNo (Jammed (AAIpv4 a p))) -> do + go (SockAddrInet (fromIntegral p) (unIpv4 a)) bs + + (Real, EachNo (Jammed (AAVoid v))) -> do + absurd v diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Ames/UDP.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Ames/UDP.hs new file mode 100644 index 000000000..c22df009f --- /dev/null +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Ames/UDP.hs @@ -0,0 +1,241 @@ +{- | + Raw UDP Server used by Ames driver. + + 1. Opens a UDP socket and makes sure that it stays open. + + - If can't open the port, wait and try again repeatedly. + - If there is an error reading or writting from the open socket, + close it and open another. + + 2. Receives packets from the socket. + + - When packets come in from the socket, they go into a bounded queue. + - If the queue is full, the packet is dropped. + - If the socket is closed, wait and try again repeatedly. + - `usRecv` gets the first packet from the queue. + + 3. Sends packets to the socket. + + - Packets sent to `usSend` enter a bounded queue. + - If that queue is full, the packet is dropped. + - Packets are taken off the queue one at a time. + - If the socket is closed (or broken), the packet is dropped. + + 4. Runs until `usKill` is run, then all threads are killed and the + socket is closed. +-} + +module Urbit.Vere.Ames.UDP + ( UdpServ(..) + , fakeUdpServ + , realUdpServ + ) +where + +import Urbit.Prelude + +import Network.Socket hiding (recvFrom, sendTo) + +import Control.Monad.STM (retry) +import Network.Socket.ByteString (recvFrom, sendTo) + + +-- Types ----------------------------------------------------------------------- + +data UdpServ = UdpServ + { usSend :: SockAddr -> ByteString -> IO () + , usRecv :: STM (PortNumber, HostAddress, ByteString) + , usKill :: IO () + } + + +-- Utils ----------------------------------------------------------------------- + +{- | + Writes to queue and returns `True` unless the queue is full, then do + nothing and return `False`. +-} +tryWriteTBQueue :: TBQueue x -> x -> STM Bool +tryWriteTBQueue q x = do + isFullTBQueue q >>= \case + True -> pure False + False -> writeTBQueue q x $> True + +{- | + Open a UDP socket and bind it to a port +-} +doBind :: PortNumber -> HostAddress -> IO (Either IOError Socket) +doBind por hos = tryIOError $ do + sok <- io $ socket AF_INET Datagram defaultProtocol + () <- io $ bind sok (SockAddrInet por hos) + pure sok + +{- | + Open a UDP socket and bind it to a port. + + If this fails, wait 250ms and repeat forever. +-} +forceBind :: HasLogFunc e => PortNumber -> HostAddress -> RIO e Socket +forceBind por hos = go + where + go = do + logTrace (display ("AMES: UDP: Opening socket on port " <> tshow por)) + io (doBind por hos) >>= \case + Right sk -> do + logTrace (display ("AMES: UDP: Opened socket on port " <> tshow por)) + pure sk + Left err -> do + logTrace (display ("AMES: UDP: " <> tshow err)) + logTrace ("AMES: UDP: Failed to open UDP socket. Waiting") + threadDelay 250_000 + go + +{- | + Attempt to send a packet to a socket. + + If it fails, return `False`. Otherwise, return `True`. +-} +sendPacket :: HasLogFunc e => ByteString -> SockAddr -> Socket -> RIO e Bool +sendPacket fullBytes adr sok = do + logTrace ("AMES: UDP: Sending packet") + res <- io $ tryIOError $ go fullBytes + case res of + Left err -> do + logError $ display ("AMES: UDP: " <> tshow err) + logError "AMES: UDP: Failed to send packet" + pure False + Right () -> do + logError "AMES: UDP: Packet sent" + pure True + where + go byt = do + sent <- sendTo sok byt adr + when (sent /= length byt) $ do + go (drop sent byt) + +{- | + Attempt to receive a packet from a socket. + + - If an exception is throw, return `Left exn`. + - If it wasn't an IPv4 packet, return `Right Nothing`. + - Otherwise, return `Right (Just packet)`. +-} +recvPacket + :: HasLogFunc e + => Socket + -> RIO e (Either IOError (Maybe (ByteString, PortNumber, HostAddress))) +recvPacket sok = do + io (tryIOError $ recvFrom sok 4096) <&> \case + Left exn -> Left exn + Right (b, SockAddrInet p a) -> Right (Just (b, p, a)) + Right (_, _ ) -> Right Nothing + + +-- Fake Server for No-Networking Mode ------------------------------------------ + +{- | + Fake UDP API for no-networking configurations. +-} +fakeUdpServ :: HasLogFunc e => RIO e UdpServ +fakeUdpServ = do + logTrace "AMES: UDP: \"Starting\" fake UDP server." + pure UdpServ { .. } + where + usSend = \_ _ -> pure () + usRecv = retry + usKill = pure () + + +-- Real Server ----------------------------------------------------------------- + +{- | + Real UDP server. See module-level docs. +-} +realUdpServ + :: forall e . HasLogFunc e => PortNumber -> HostAddress -> RIO e UdpServ +realUdpServ por hos = do + logTrace "AMES: UDP: Starting real UDP server." + + env <- ask + + vSock <- newTVarIO Nothing + vFail <- newEmptyTMVarIO + qSend <- newTBQueueIO 100 -- TODO Tuning + qRecv <- newTBQueueIO 100 -- TODO Tuning + + {- + If reading or writing to a socket fails, unbind it and tell the + socket-open thread to close it and open another. + + This is careful about edge-cases. In any of these cases, do nothing. + + - If vSock isn't set to the socket we used, do nothing. + - If vFail is already set (another thread signaled failure already). + -} + let signalBrokenSocket :: Socket -> RIO e () + signalBrokenSocket sock = do + logTrace "AMES: UDP: Socket broken. Requesting new socket" + atomically $ do + mSock <- readTVar vSock + mFail <- tryReadTMVar vFail + when (mSock == Just sock && mFail == Nothing) $ do + putTMVar vFail sock + writeTVar vSock Nothing + + enqueueRecvPacket :: PortNumber -> HostAddress -> ByteString -> RIO e () + enqueueRecvPacket p a b = do + did <- atomically (tryWriteTBQueue qRecv (p, a, b)) + when (did == False) $ do + logWarn "AMES: UDP: Dropping inbound packet because queue is full." + + enqueueSendPacket :: SockAddr -> ByteString -> RIO e () + enqueueSendPacket a b = do + did <- atomically (tryWriteTBQueue qSend (a, b)) + when (did == False) $ do + logWarn "AMES: UDP: Dropping outbound packet because queue is full." + + tOpen <- async $ forever $ do + sk <- forceBind por hos + atomically (writeTVar vSock (Just sk)) + broken <- atomically (takeTMVar vFail) + logTrace "AMES: UDP: Closing broken socket." + io (close broken) + + tSend <- async $ forever $ join $ atomically $ do + (adr, byt) <- readTBQueue qSend + readTVar vSock <&> \case + Nothing -> pure () + Just sk -> do + okay <- sendPacket byt adr sk + unless okay (signalBrokenSocket sk) + + tRecv <- async $ forever $ do + atomically (readTVar vSock) >>= \case + Nothing -> threadDelay 100_000 + Just sk -> do + recvPacket sk >>= \case + Left exn -> do + logError "AMES: UDP: Failed to receive packet" + signalBrokenSocket sk + Right Nothing -> do + logError "AMES: UDP: Dropping non-ipv4 packet" + pure () + Right (Just (b, p, a)) -> do + logTrace "AMES: UDP: Received packet." + enqueueRecvPacket p a b + + let shutdown = do + logTrace "AMES: UDP: Shutting down. (killing threads)" + cancel tOpen + cancel tSend + cancel tRecv + logTrace "AMES: UDP: Shutting down. (closing socket)" + io $ join $ atomically $ do + res <- readTVar vSock <&> maybe (pure ()) close + writeTVar vSock Nothing + pure res + + pure $ UdpServ { usSend = \a b -> runRIO env (enqueueSendPacket a b) + , usRecv = readTBQueue qRecv + , usKill = runRIO env shutdown + }