diff --git a/pkg/king/lib/Vere/Ames.hs b/pkg/king/lib/Vere/Ames.hs index 38e2e90b59..e6cc9f6727 100644 --- a/pkg/king/lib/Vere/Ames.hs +++ b/pkg/king/lib/Vere/Ames.hs @@ -7,9 +7,10 @@ import Network.Socket hiding (recvFrom, sendTo) import Network.Socket.ByteString import Vere.Pier.Types -import qualified Data.Map as M -import qualified Urbit.Ob as Ob -import qualified Urbit.Time as Time +import qualified Data.ByteString as BS +import qualified Data.Map as M +import qualified Urbit.Ob as Ob +import qualified Urbit.Time as Time -- Types ----------------------------------------------------------------------- @@ -17,7 +18,8 @@ data AmesDrv = AmesDrv { aIsLive :: IORef Bool , aTurfs :: TVar [Turf] , aGalaxies :: IORef (M.Map Galaxy (Async (), TQueue ByteString)) - , aSocket :: Socket + , aSocketSend :: Socket + , aSocketRecv :: Socket , aWakeTimer :: Async () , aListener :: Async () , aSendingQueue :: TQueue (SockAddr, ByteString) @@ -92,65 +94,74 @@ _turfText = intercalate "." . reverse . fmap unCord . unTurf TODO verify that the KingIds match on effects. -} -ames :: KingId -> Ship -> Maybe Port -> QueueEv - -> ([Ev], Acquire (EffCb e NewtEf)) +ames :: forall e. HasLogFunc e + => KingId -> Ship -> Maybe Port -> QueueEv + -> ([Ev], RAcquire e (EffCb e NewtEf)) ames inst who mPort enqueueEv = (initialEvents, runAmes) where initialEvents :: [Ev] initialEvents = [barnEv inst] - runAmes :: Acquire (EffCb e NewtEf) + runAmes :: RAcquire e (EffCb e NewtEf) runAmes = do - drv <- mkAcquire start stop - pure (io . handleEffect drv) + drv <- mkRAcquire start stop + pure (handleEffect drv) - start :: IO AmesDrv + start :: RIO e AmesDrv start = do - vLiv <- newIORef False - vTurf <- newTVarIO [] - vGalaxies <- newIORef mempty - time <- async runTimer - sock <- bindSock - hear <- async (waitPacket sock) - sendQueue <- newTQueueIO - sending <- async (sendingThread sendQueue sock) - pure $ AmesDrv vLiv vTurf vGalaxies sock time hear sendQueue sending + aIsLive <- newIORef False + aTurfs <- newTVarIO [] + aGalaxies <- newIORef mempty + aSocketSend <- io $ bindSendSock + aSocketRecv <- bindRecvSock + aWakeTimer <- async runTimer + aListener <- async (waitPacket aSocketRecv) + aSendingQueue <- newTQueueIO + aSendingThread <- async (sendingThread aSendingQueue aSocketSend) + pure $ AmesDrv{..} + -- TODO: This switch needs to reflect the network mode flag; are we a fake + -- network? Do we have all networking disabled? netMode :: NetworkMode netMode = Fake - stop :: AmesDrv -> IO () - stop (AmesDrv{..}) = do + stop :: AmesDrv -> RIO e () + stop AmesDrv{..} = do galaxies <- readIORef aGalaxies mapM_ (cancel . fst) galaxies cancel aSendingThread cancel aWakeTimer cancel aListener - close' aSocket + io $ close' aSocketRecv - runTimer :: IO () + runTimer :: RIO e () runTimer = forever $ do threadDelay (300 * 1000000) -- 300 seconds atomically (enqueueEv wakeEv) - bindSock :: IO Socket - bindSock = do + bindSendSock :: IO Socket + bindSendSock = socket AF_INET Datagram defaultProtocol + + bindRecvSock :: RIO e Socket + bindRecvSock = do let ourPort = maybe (listenPort netMode who) fromIntegral mPort - s <- socket AF_INET Datagram defaultProtocol - () <- bind s (SockAddrInet ourPort localhost) + s <- io $ socket AF_INET Datagram defaultProtocol + logTrace $ displayShow ("(ames) Binding to port ", ourPort) + () <- io $ bind s (SockAddrInet ourPort localhost) pure s - waitPacket :: Socket -> IO () + waitPacket :: Socket -> RIO e () waitPacket s = forever $ do - (bs, addr) <- recvFrom s 4096 - wen <- Time.now + (bs, addr) <- io $ recvFrom s 4096 + logTrace $ displayShow ("(ames) Received packet from ", addr) + wen <- io $ Time.now case addr of SockAddrInet p a -> atomically (enqueueEv $ hearEv wen p a bs) _ -> pure () - handleEffect :: AmesDrv -> NewtEf -> IO () + handleEffect :: AmesDrv -> NewtEf -> RIO e () handleEffect drv@AmesDrv{..} = \case NewtEfTurf (_id, ()) turfs -> do writeIORef aIsLive True @@ -159,33 +170,51 @@ ames inst who mPort enqueueEv = NewtEfSend (_id, ()) dest (MkBytes bs) -> do whenM (readIORef aIsLive) (sendPacket drv netMode dest bs) - sendPacket :: AmesDrv -> NetworkMode -> AmesDest -> ByteString -> IO () + sendPacket :: AmesDrv -> NetworkMode -> AmesDest -> ByteString -> RIO e () - sendPacket AmesDrv{..} Fake dest bs = + sendPacket AmesDrv{..} Fake dest bs = do + logTrace $ displayShow + ("(ames) sendPacket Fake ", dest, (destSockAddr Fake dest)) when (okayFakeAddr dest) $ do atomically $ writeTQueue aSendingQueue ((destSockAddr Fake dest), bs) sendPacket AmesDrv{..} Real (ADGala wen galaxy) bs = do + logTrace $ displayShow + ("(ames) sendPacket Real Galaxy ", galaxy) galaxies <- readIORef aGalaxies queue <- case M.lookup galaxy galaxies of Just (_, queue) -> pure queue Nothing -> do inQueue <- newTQueueIO - thread <- galaxyResolver galaxy aTurfs inQueue aSendingQueue + thread <- async $ galaxyResolver galaxy aTurfs inQueue aSendingQueue modifyIORef (aGalaxies) (M.insert galaxy (thread, inQueue)) pure inQueue atomically $ writeTQueue queue bs - sendPacket AmesDrv{..} Real ip@(ADIpv4 _ _ _) bs = + sendPacket AmesDrv{..} Real ip@(ADIpv4 _ _ _) bs = do + logTrace $ displayShow + ("(ames) sendPacket Real Other ", ip, (destSockAddr Real ip)) atomically $ writeTQueue aSendingQueue ((destSockAddr Real ip), bs) + -- Maybe the entire socket usage in this example is wrong. We're using the + -- receiving socket as the sending socket, too. Can we not do that? + + -- An outbound queue of messages. We can only write to a socket from one -- thread, so coalesce those writes here. - sendingThread :: TQueue (SockAddr, ByteString) -> Socket -> IO () + sendingThread :: TQueue (SockAddr, ByteString) -> Socket -> RIO e () sendingThread queue socket = forever $ do (dest, bs) <- atomically $ readTQueue queue - void $ sendTo socket bs dest + + logTrace $ displayShow ("(ames) Sending packet to ", socket, dest) + -- This line blocks! WTF! + bytesSent <- io $ sendTo socket bs dest + logTrace $ displayShow ("(ames) Packet sent! ", bytesSent) + let len = BS.length bs + when (bytesSent /= len) $ do + logDebug $ displayShow + ("(ames) Only sent ", bytesSent, " of ", (length bs)) -- Asynchronous thread per galaxy which handles domain resolution, and can -- block its own queue of ByteStrings to send. @@ -196,11 +225,11 @@ ames inst who mPort enqueueEv = -- TODO: Figure out how the real haskell time library works. galaxyResolver :: Galaxy -> TVar [Turf] -> TQueue ByteString -> TQueue (SockAddr, ByteString) - -> IO (Async ()) + -> RIO e () galaxyResolver galaxy turfVar incoming outgoing = - async $ loop Nothing Time.unixEpoch + loop Nothing Time.unixEpoch where - loop :: Maybe SockAddr -> Time.Wen -> IO () + loop :: Maybe SockAddr -> Time.Wen -> RIO e () loop lastGalaxyIP lastLookupTime = do packet <- atomically $ readTQueue incoming @@ -210,33 +239,42 @@ ames inst who mPort enqueueEv = -- We've failed to lookup the IP. Drop the outbound packet -- because we have no IP for our galaxy, including possible -- previous IPs. + logDebug $ displayShow + ("(ames) Dropping packet; no ip for galaxy ", galaxy) loop Nothing t (Just ip, t) -> do queueSendToGalaxy ip packet loop (Just ip) t - checkIP :: Maybe SockAddr -> Time.Wen -> IO (Maybe SockAddr, Time.Wen) + checkIP :: Maybe SockAddr -> Time.Wen + -> RIO e (Maybe SockAddr, Time.Wen) checkIP lastIP lastLookupTime = do - current <- Time.now + current <- io $ Time.now if (Time.gap current lastLookupTime ^. Time.secs) < 300 then pure (lastIP, lastLookupTime) else do toCheck <- atomically $ readTVar turfVar ip <- resolveFirstIP lastIP toCheck - timeAfterResolution <- Time.now + timeAfterResolution <- io $ Time.now pure (ip, timeAfterResolution) - resolveFirstIP :: Maybe SockAddr -> [Turf] -> IO (Maybe SockAddr) + resolveFirstIP :: Maybe SockAddr -> [Turf] -> RIO e (Maybe SockAddr) resolveFirstIP prevIP [] = do -- print ("ames: czar at %s: not found (b)\n") + logDebug $ displayShow + ("(ames) Failed to lookup IP for ", galaxy) pure prevIP resolveFirstIP prevIP (x:xs) = do let hostname = buildDNS galaxy x - listIPs <- getAddrInfo Nothing (Just hostname) Nothing + let portstr = show $ galaxyPort Real galaxy + listIPs <- io $ getAddrInfo Nothing (Just hostname) (Just portstr) case listIPs of [] -> resolveFirstIP prevIP xs - (y:ys) -> pure $ Just $ addrAddress y + (y:ys) -> do + logDebug $ displayShow + ("(ames) Looked up ", hostname, portstr, y) + pure $ Just $ addrAddress y buildDNS :: Galaxy -> Turf -> String buildDNS (Galaxy g) turf = name ++ "." ++ (unpack $ _turfText turf) @@ -246,6 +284,7 @@ ames inst who mPort enqueueEv = Nothing -> error "Urbit.ob didn't produce string with ~" Just x -> (unpack x) - queueSendToGalaxy :: SockAddr -> ByteString -> IO () - queueSendToGalaxy inet packet = + queueSendToGalaxy :: SockAddr -> ByteString -> RIO e () + queueSendToGalaxy inet packet = do + logTrace $ displayShow ("(ames) Sending galaxy packet to ", inet) atomically $ writeTQueue outgoing (inet, packet) diff --git a/pkg/king/lib/Vere/Pier.hs b/pkg/king/lib/Vere/Pier.hs index dc1edba881..51bc49ecc7 100644 --- a/pkg/king/lib/Vere/Pier.hs +++ b/pkg/king/lib/Vere/Pier.hs @@ -261,7 +261,7 @@ drivers pierPath inst who mPort plan shutdownSTM termSys = initialEvents = mconcat [behnBorn, clayBorn, amesBorn, httpBorn, termBorn, irisBorn] runDrivers = do - dNewt <- liftAcquire $ runAmes + dNewt <- runAmes dBehn <- liftAcquire $ runBehn dAmes <- pure $ const $ pure () dHttpClient <- runIris diff --git a/pkg/king/test/AmesTests.hs b/pkg/king/test/AmesTests.hs index 9fbd3955e6..f403ed2543 100644 --- a/pkg/king/test/AmesTests.hs +++ b/pkg/king/test/AmesTests.hs @@ -35,11 +35,12 @@ turfEf = NewtEfTurf (0, ()) [] sendEf :: Galaxy -> Wen -> Bytes -> NewtEf sendEf g w bs = NewtEfSend (0, ()) (ADGala w g) bs -runGala :: Word8 -> RAcquire e (TQueue Ev, EffCb e NewtEf) +runGala :: forall e. (HasLogFunc e) + => Word8 -> RAcquire e (TQueue Ev, EffCb e NewtEf) runGala point = do q <- newTQueueIO let (_, runAmes) = ames pid (fromIntegral point) Nothing (writeTQueue q) - cb ← liftAcquire runAmes + cb ← runAmes rio $ cb turfEf pure (q, cb) @@ -66,7 +67,7 @@ sendThread cb (to, val) = void $ mkRAcquire start cancel zodSelfMsg :: Property zodSelfMsg = forAll arbitrary (ioProperty . runApp . runTest) where - runTest :: Bytes -> RIO e Bool + runTest :: HasLogFunc e => Bytes -> RIO e Bool runTest val = runRAcquire $ do (zodQ, zod) <- runGala 0 () <- sendThread zod (0, val) @@ -75,13 +76,13 @@ zodSelfMsg = forAll arbitrary (ioProperty . runApp . runTest) twoTalk :: Property twoTalk = forAll arbitrary (ioProperty . runApp . runTest) where - runTest :: (Word8, Word8, Bytes) -> RIO e Bool + runTest :: HasLogFunc e => (Word8, Word8, Bytes) -> RIO e Bool runTest (aliceShip, bobShip, val) = if aliceShip == bobShip then pure True else go aliceShip bobShip val - go :: Word8 -> Word8 -> Bytes -> RIO e Bool + go :: HasLogFunc e => Word8 -> Word8 -> Bytes -> RIO e Bool go aliceShip bobShip val = runRAcquire $ do (aliceQ, alice) <- runGala aliceShip (bobQ, bob) <- runGala bobShip