diff --git a/pkg/hs/urbit-king/lib/Urbit/King/App.hs b/pkg/hs/urbit-king/lib/Urbit/King/App.hs index 107708d16a..6be577a653 100644 --- a/pkg/hs/urbit-king/lib/Urbit/King/App.hs +++ b/pkg/hs/urbit-king/lib/Urbit/King/App.hs @@ -190,9 +190,9 @@ runHostEnv multi ports action = do king <- ask let hostEnv = HostEnv { _hostEnvKingEnv = king - , _hostEnvMultiEyreApi = multi - , _hostEnvPortControlApi = ports - } + , _hostEnvMultiEyreApi = multi + , _hostEnvPortControlApi = ports + } io (runRIO hostEnv action) diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs index 5065cb4ace..b71551e027 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs @@ -9,14 +9,19 @@ import Urbit.Prelude import Network.Socket hiding (recvFrom, sendTo) import Urbit.Arvo hiding (Fake) import Urbit.King.Config +import Urbit.Vere.Ames.LaneCache +import Urbit.Vere.Ames.Packet import Urbit.Vere.Pier.Types import Urbit.Vere.Ports +import Data.Serialize (decode, encode) import Urbit.King.App (HasKingId(..), HasPierEnv(..)) import Urbit.Vere.Ames.DNS (NetworkMode(..), ResolvServ(..)) import Urbit.Vere.Ames.DNS (galaxyPort, resolvServ) import Urbit.Vere.Ames.UDP (UdpServ(..), fakeUdpServ, realUdpServ) +import qualified Urbit.Noun.Time as Time + -- Constants ------------------------------------------------------------------- @@ -33,11 +38,15 @@ packetsDroppedPerComplaint = 1000 -- Types ----------------------------------------------------------------------- +type Version = Word8 + data AmesDrv = AmesDrv { aTurfs :: TVar (Maybe [Turf]) , aDropped :: TVar Word + , aVersion :: TVar (Maybe Version) , aUdpServ :: UdpServ , aResolvr :: ResolvServ + , aVersTid :: Async () , aRecvTid :: Async () } @@ -82,9 +91,10 @@ bornEv inst = EvBlip $ BlipEvNewt $ NewtEvBorn (fromIntegral inst, ()) () hearEv :: PortNumber -> HostAddress -> ByteString -> Ev hearEv p a bs = - EvBlip $ BlipEvAmes $ AmesEvHear () dest (MkBytes bs) - where - dest = EachNo $ Jammed $ AAIpv4 (Ipv4 a) (fromIntegral p) + EvBlip $ BlipEvAmes $ AmesEvHear () (ipDest p a) (MkBytes bs) + +ipDest :: PortNumber -> HostAddress -> AmesDest +ipDest p a = EachNo $ Jammed $ AAIpv4 (Ipv4 a) (fromIntegral p) -------------------------------------------------------------------------------- @@ -125,9 +135,10 @@ ames' :: HasPierEnv e => Ship -> Bool + -> (Time.Wen -> Gang -> Path -> (Maybe (Term, Noun) -> IO ()) -> STM ()) -> (Text -> RIO e ()) -> RIO e ([Ev], RAcquire e (DriverApi NewtEf)) -ames' who isFake stderr = do +ames' who isFake scry stderr = do -- Unfortunately, we cannot use TBQueue because the only behavior -- provided for when full is to block the writer. The implementation -- below uses materially the same data structures as TBQueue, however. @@ -151,7 +162,7 @@ ames' who isFake stderr = do pure pM env <- ask - let (bornEvs, startDriver) = ames env who isFake enqueuePacket stderr + let (bornEvs, startDriver) = ames env who isFake scry enqueuePacket stderr let runDriver = do diOnEffect <- startDriver @@ -178,10 +189,11 @@ ames => e -> Ship -> Bool + -> (Time.Wen -> Gang -> Path -> (Maybe (Term, Noun) -> IO ()) -> STM ()) -> (EvErr -> STM PacketOutcome) -> (Text -> RIO e ()) -> ([Ev], RAcquire e (NewtEf -> IO ())) -ames env who isFake enqueueEv stderr = (initialEvents, runAmes) +ames env who isFake scry enqueueEv stderr = (initialEvents, runAmes) where king = fromIntegral (env ^. kingIdL) @@ -194,36 +206,85 @@ ames env who isFake enqueueEv stderr = (initialEvents, runAmes) drv <- mkRAcquire start stop pure (handleEffect drv mode) - start :: HasLogFunc e => RIO e AmesDrv + start :: RIO e AmesDrv start = do + mode <- rio (netMode isFake) + cache <- laneCache scryLane + aTurfs <- newTVarIO Nothing aDropped <- newTVarIO 0 + aVersion <- newTVarIO Nothing + aVersTid <- trackVersionThread aVersion aUdpServ <- udpServ isFake who - aRecvTid <- queuePacketsThread aDropped aUdpServ aResolvr <- resolvServ aTurfs (usSend aUdpServ) stderr + aRecvTid <- queuePacketsThread + aDropped + aVersion + (byCache cache) + (send aUdpServ aResolvr mode) + aUdpServ + pure (AmesDrv { .. }) hearFailed _ = pure () - queuePacketsThread :: HasLogFunc e => TVar Word -> UdpServ -> RIO e (Async ()) - queuePacketsThread dropCtr UdpServ {..} = async $ forever $ do - outcome <- atomically $ do - (p, a, b) <- usRecv - enqueueEv (EvErr (hearEv p a b) hearFailed) - case outcome of - Intake -> pure () - Ouster -> do - d <- atomically $ do - d <- readTVar dropCtr - writeTVar dropCtr (d + 1) - pure d - when (d `rem` packetsDroppedPerComplaint == 0) $ - logWarn "ames: queue full; dropping inbound packets" + trackVersionThread :: HasLogFunc e => TVar (Maybe Version) -> RIO e (Async ()) + trackVersionThread versSlot = async $ forever do + env <- ask - stop :: AmesDrv -> RIO e () + scryVersion \v -> do + v0 <- readTVarIO versSlot + atomically $ writeTVar versSlot (Just v) + if (v0 == Just v) + then logInfo $ displayShow ("ames: proto version unchanged at", v) + else stderr ("ames: protocol version now " <> tshow v) + + threadDelay (10 * 60 * 1_000_000) -- 10m + + queuePacketsThread :: HasLogFunc e + => TVar Word + -> TVar (Maybe Version) + -> (Ship -> (Maybe [AmesDest] -> RIO e ()) -> RIO e ()) + -> (AmesDest -> ByteString -> RIO e ()) + -> UdpServ + -> RIO e (Async ()) + queuePacketsThread dropCtr vers lan forward UdpServ{..} = async $ forever $ do + -- port number, host address, bytestring + (p, a, b) <- atomically usRecv + ver <- readTVarIO vers + + case decode b of + Right (pkt@Packet {..}) | ver == Nothing || ver == Just pktVersion -> do + logDebug $ displayShow ("ames: bon packet", pkt, showUD $ bytesAtom b) + + if pktRcvr == who + then serf'sUp p a b + else lan pktRcvr $ \case + Just (dest:_) -> forward dest $ encode pkt + { pktOrigin = pktOrigin <|> Just (ipDest p a) } + _ -> logInfo $ displayShow ("ames: dropping unroutable", pkt) + + Right pkt -> logInfo $ displayShow ("ames: dropping ill-versed", pkt, ver) + + Left e -> logInfo $ displayShow ("ames: dropping malformed", e) + + where + serf'sUp p a b = + atomically (enqueueEv (EvErr (hearEv p a b) hearFailed)) >>= \case + Intake -> pure () + Ouster -> do + d <- atomically $ do + d <- readTVar dropCtr + writeTVar dropCtr (d + 1) + pure d + when (d `rem` packetsDroppedPerComplaint == 0) $ + logWarn "ames: queue full; dropping inbound packets" + + stop :: forall e. AmesDrv -> RIO e () stop AmesDrv {..} = io $ do usKill aUdpServ rsKill aResolvr + cancel aVersTid cancel aRecvTid handleEffect :: AmesDrv -> NetworkMode -> NewtEf -> IO () @@ -234,19 +295,53 @@ ames env who isFake enqueueEv stderr = (initialEvents, runAmes) NewtEfSend (_id, ()) dest (MkBytes bs) -> do atomically (readTVar aTurfs) >>= \case Nothing -> stderr "ames: send before turfs" >> pure () - Just turfs -> sendPacket drv mode dest bs + Just turfs -> send aUdpServ aResolvr mode dest bs - sendPacket :: AmesDrv -> NetworkMode -> AmesDest -> ByteString -> RIO e () - sendPacket AmesDrv {..} mode dest byt = do - let to adr = io (usSend aUdpServ adr byt) + send :: UdpServ + -> ResolvServ + -> NetworkMode + -> AmesDest + -> ByteString + -> RIO e () + send udpServ resolvr mode dest byt = do + let to adr = io (usSend udpServ adr byt) case (mode, dest) of (NoNetwork, _ ) -> pure () (Fake , _ ) -> when (okFakeAddr dest) $ to (localAddr Fake dest) (Localhost, _ ) -> to (localAddr Localhost dest) (Real , ra) -> ra & \case - EachYes gala -> io (rsSend aResolvr gala byt) + EachYes gala -> io (rsSend resolvr gala byt) EachNo addr -> to (ipv4Addr addr) + scryVersion :: HasLogFunc e => (Version -> RIO e ()) -> RIO e () + scryVersion = scry' ["protocol", "version"] + . maybe (logError "ames: could not scry for version") + + scryLane :: HasLogFunc e + => Ship + -> (Maybe [AmesDest] -> RIO e ()) + -> RIO e () + scryLane ship = scry' ["peers", MkKnot $ tshow ship, "forward-lane"] + + scry' :: forall e n + . (HasLogFunc e, FromNoun n) + => [Knot] + -> (Maybe n -> RIO e ()) + -> RIO e () + scry' p k = do + env <- ask + wen <- io Time.now + let nkt = MkKnot $ tshow $ Time.MkDate wen + let pax = Path $ "ax" : MkKnot (tshow who) : "" : nkt : p + putStrLn ("scrying for " <> tshow pax) + let kon = runRIO env . \case + Just (_, fromNoun @n -> Just v) -> k (Just v) + Just (_, n) -> do + logError $ displayShow ("ames: uncanny scry result", pax, n) + k Nothing + Nothing -> k Nothing + atomically $ scry wen Nothing pax kon + ipv4Addr (Jammed (AAVoid v )) = absurd v ipv4Addr (Jammed (AAIpv4 a p)) = SockAddrInet (fromIntegral p) (unIpv4 a) diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Ames/LaneCache.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Ames/LaneCache.hs new file mode 100644 index 0000000000..8c8e235c4c --- /dev/null +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Ames/LaneCache.hs @@ -0,0 +1,34 @@ +module Urbit.Vere.Ames.LaneCache (LaneCache, laneCache, byCache) where + +import Urbit.Prelude + +import Urbit.Noun.Time + +expiry :: Gap +expiry = (2 * 60) ^. from secs + +data LaneCache m a b = LaneCache + { lcCache :: TVar (Map a (Wen, b)) + , lcAction :: a -> (b -> m ()) -> m () + } + +laneCache :: (Ord a, MonadIO n) + => (a -> (b -> m ()) -> m ()) + -> n (LaneCache m a b) +laneCache act = LaneCache <$> newTVarIO mempty <*> pure act + +byCache :: (Ord a, MonadIO m) + => LaneCache m a b + -> a -> (b -> m ()) -> m () +byCache LaneCache {..} x f = lookup x <$> readTVarIO lcCache >>= \case + Nothing -> go + Just (t, v) -> do + t' <- io now + if gap t' t > expiry + then go + else f v + where + go = lcAction x $ \v -> do + t <- io now + atomically $ modifyTVar' lcCache (insertMap x (t, v)) + f v diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Ames/Packet.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Ames/Packet.hs new file mode 100644 index 0000000000..9566e10599 --- /dev/null +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Ames/Packet.hs @@ -0,0 +1,102 @@ +{-| + Parsing of Ames packets +-} + +module Urbit.Vere.Ames.Packet where + +import Urbit.Prelude + +import Data.Bits +import Data.LargeWord +import Data.Serialize + +import Urbit.Arvo (AmesDest) +import Urbit.Noun.Tree (mug) + +data Packet = Packet + { pktVersion :: Word8 + , pktEncrypted :: Bool + -- + , pktSndr :: Ship + , pktRcvr :: Ship + , pktOrigin :: Maybe AmesDest + , pktContent :: Bytes + } + +instance Show Packet where + show Packet {..} + = "Packet {pktVersion = " + <> show pktVersion + <> ", pktEncrypted = " + <> show pktEncrypted + <> ", pktSndr = " + <> show pktSndr + <> ", pktRcvr = " + <> show pktRcvr + <> ", pktOrigin = " + <> show pktOrigin + <> ", pktContent = " + <> showUD (bytesAtom $ unBytes pktContent) + <> "}" + +instance Serialize Packet where + get = do + -- header + head <- getWord32le + let pktVersion = head .&. 0b111 & fromIntegral + let checksum = shiftR head 3 .&. (2 ^ 20 - 1) + let sndrRank = shiftR head 23 .&. 0b11 + let rcvrRank = shiftR head 25 .&. 0b11 + let pktEncrypted = testBit head 27 & not -- loobean + -- verify checksum + lookAhead $ do + len <- remaining + body <- getBytes len + -- XX mug (marked "TODO") is implemented as "slowMug" in U.N.Tree. Ominous + -- Also, toNoun will copy the bytes into an atom. We probably want a mugBS + let chk = fromIntegral (mug $ toNoun $ MkBytes body) .&. (2 ^ 20 - 1) + when (checksum /= chk) $ + fail ("checksum mismatch: expected " <> show checksum + <> "; got " <> show chk) + -- body + pktSndr <- getShip sndrRank + pktRcvr <- getShip rcvrRank + len <- remaining + payload <- getBytes len + -- data ("payload") + (pktOrigin, pktContent) <- case cueBS payload of + Left e -> fail (show e) + Right n -> case fromNounErr n of + Left e -> fail (show e) + Right c -> pure c + pure Packet {..} + where + getShip = fmap Ship . \case + 0 -> fromIntegral <$> getWord16le -- galaxy / star + 1 -> fromIntegral <$> getWord32le -- planet + 2 -> fromIntegral <$> getWord64le -- moon + 3 -> LargeKey <$> getWord64le <*> getWord64le -- comet + _ -> fail "impossibiru" + + put Packet {..} = do + let load = jamBS $ toNoun (pktOrigin, pktContent) + let (sndR, putSndr) = putShipGetRank pktSndr + let (rcvR, putRcvr) = putShipGetRank pktRcvr + let body = runPut (putSndr <> putRcvr <> putByteString load) + -- XX again maybe mug can be made better here + let chek = fromIntegral (mug $ toNoun $ MkBytes body) .&. (2 ^ 20 - 1) + let encr = pktEncrypted + let vers = fromIntegral pktVersion .&. 0b111 + let head = vers + .|. shiftL chek 3 + .|. shiftL sndR 23 + .|. shiftL rcvR 25 + .|. if encr then 0 else bit 27 + putWord32le head + putByteString body -- XX can we avoid copy? + where + putShipGetRank s@(Ship (LargeKey p q)) = case () of + _ | s < 2 ^ 16 -> (0, putWord16le $ fromIntegral s) -- gar + | s < 2 ^ 32 -> (1, putWord32le $ fromIntegral s) -- pan + | s < 2 ^ 64 -> (2, putWord64le $ fromIntegral s) -- mon + | otherwise -> (3, putWord64le p >> putWord64le q) -- com diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs index 82da7d0df9..997e37ce26 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs @@ -305,6 +305,9 @@ pier (serf, log) vSlog startedSig = do atomically $ Term.trace muxed txt logOther "serf" (display $ T.strip txt) + scrySig <- newEmptyTMVarIO + onKill <- view onKillPierSigL + -- Our call above to set the logging function which echos errors from the -- Serf doesn't have the appended \r\n because those \r\n s are added in -- the c serf code. Logging output from our haskell process must manually @@ -312,6 +315,7 @@ pier (serf, log) vSlog startedSig = do let compute = putTMVar computeQ let execute = writeTQueue executeQ let persist = writeTQueue persistQ + let scry = \w b g k -> putTMVar scrySig (w, b, g, k) let sigint = Serf.sendSIGINT serf (bootEvents, startDrivers) <- do @@ -319,10 +323,7 @@ pier (serf, log) vSlog startedSig = do let err = atomically . Term.trace muxed . (<> "\r\n") let siz = TermSize { tsWide = 80, tsTall = 24 } let fak = isFake logId - drivers env ship fak compute (siz, muxed) err sigint - - scrySig <- newEmptyTMVarIO - onKill <- view onKillPierSigL + drivers env ship fak compute scry (siz, muxed) err sigint let computeConfig = ComputeConfig { ccOnWork = takeTMVar computeQ , ccOnKill = onKill @@ -369,19 +370,6 @@ pier (serf, log) vSlog startedSig = do threadDelay (snapshotEverySecs * 1_000_000) void $ atomically $ tryPutTMVar saveSig () - -- TODO bullshit scry tester - when False $ do - void $ acquireWorker "bullshit scry tester" $ do - env <- ask - forever $ do - threadDelay 15_000_000 - wen <- io Time.now - let kal = \mTermNoun -> runRIO env $ do - logInfo $ displayShow ("scry result: ", mTermNoun) - let nkt = MkKnot $ tshow $ Time.MkDate wen - let pax = Path ["j", "~zod", "life", nkt, "~zod"] - atomically $ putTMVar scrySig (wen, Nothing, pax, kal) - putMVar startedSig () -- Wait for something to die. @@ -423,18 +411,21 @@ drivers -> Ship -> Bool -> (RunReq -> STM ()) + -> (Wen -> Gang -> Path -> (Maybe (Term, Noun) -> IO ()) -> STM ()) -> (TermSize, Term.Client) -> (Text -> RIO e ()) -> IO () -> RAcquire e ([Ev], RAcquire e Drivers) -drivers env who isFake plan termSys stderr serfSIGINT = do +drivers env who isFake plan scry termSys stderr serfSIGINT = do (behnBorn, runBehn) <- rio Behn.behn' (termBorn, runTerm) <- rio (Term.term' termSys serfSIGINT) - (amesBorn, runAmes) <- rio (Ames.ames' who isFake stderr) + (amesBorn, runAmes) <- rio (Ames.ames' who isFake scry stderr) (httpBorn, runEyre) <- rio (Eyre.eyre' who isFake stderr) (clayBorn, runClay) <- rio Clay.clay' (irisBorn, runIris) <- rio Iris.client' + putStrLn ("ship is " <> tshow who) + let initialEvents = mconcat [behnBorn,clayBorn,amesBorn,httpBorn,irisBorn,termBorn] let runDrivers = do diff --git a/pkg/hs/urbit-king/package.yaml b/pkg/hs/urbit-king/package.yaml index 583cd6b6cd..e309fd73cf 100644 --- a/pkg/hs/urbit-king/package.yaml +++ b/pkg/hs/urbit-king/package.yaml @@ -31,6 +31,7 @@ dependencies: - binary - bytestring - case-insensitive + - cereal - classy-prelude - conduit - containers @@ -119,6 +120,7 @@ dependencies: default-extensions: - ApplicativeDo - BangPatterns + - BinaryLiterals - BlockArguments - ConstraintKinds - DataKinds @@ -145,6 +147,7 @@ default-extensions: - OverloadedStrings - PackageImports - PartialTypeSignatures + - PatternGuards - PatternSynonyms - QuasiQuotes - Rank2Types diff --git a/pkg/hs/urbit-noun/lib/Urbit/Noun/Conversions.hs b/pkg/hs/urbit-noun/lib/Urbit/Noun/Conversions.hs index 6a8b5fddfe..d1f1f94dac 100644 --- a/pkg/hs/urbit-noun/lib/Urbit/Noun/Conversions.hs +++ b/pkg/hs/urbit-noun/lib/Urbit/Noun/Conversions.hs @@ -12,6 +12,7 @@ module Urbit.Noun.Conversions , UD(..), UV(..), UW(..), cordToUW , Mug(..), Path(..), EvilPath(..), Ship(..) , Lenient(..), pathToFilePath, filePathToPath + , showUD, tshowUD ) where import ClassyPrelude hiding (hash) @@ -36,6 +37,7 @@ import RIO.FilePath (joinPath, splitDirectories, takeBaseName, takeDirectory, takeExtension, (<.>)) import Urbit.Noun.Cue (cue) import Urbit.Noun.Jam (jam) +import Urbit.Ob (patp) import qualified Data.Char as C import qualified Data.Text.Encoding as T @@ -97,22 +99,28 @@ instance FromNoun UD where Nothing -> fail ("invalid decimal atom: " <> unpack (filter (/= '.') t)) Just vl -> pure (UD vl) +showUD :: (Show i, Integral i) => i -> String +showUD = uTypeAddDots 3 . show + +tshowUD :: (Show i, Integral i) => i -> Text +tshowUD = pack . uTypeAddDots 3 . show + -------------------------------------------------------------------------------- -uTypeAddDots :: String -> String -uTypeAddDots = reverse . go . reverse +uTypeAddDots :: Int -> String -> String +uTypeAddDots n = reverse . go . reverse where go s = if null tel then hed else hed <> "." <> go tel where - hed = take 5 s - tel = drop 5 s + hed = take n s + tel = drop n s convertToU :: [Char] -> [Char] -> Atom -> String convertToU baseMap prefix = go [] where - go acc 0 = "0" <> prefix <> uTypeAddDots acc + go acc 0 = "0" <> prefix <> uTypeAddDots 5 acc go acc n = go (char n : acc) (n `div` len) char n = baseMap !! (fromIntegral (n `mod` len)) @@ -571,7 +579,10 @@ instance FromNoun Term where -- XX TODO -- Ship ------------------------------------------------------------------------ newtype Ship = Ship Word128 -- @p - deriving newtype (Eq, Ord, Show, Enum, Real, Integral, Num, ToNoun, FromNoun) + deriving newtype (Eq, Ord, Enum, Real, Integral, Num, ToNoun, FromNoun) + +instance Show Ship where + show = show . patp . fromIntegral -- Path ------------------------------------------------------------------------ diff --git a/pkg/hs/urbit-noun/package.yaml b/pkg/hs/urbit-noun/package.yaml index d94be31f67..74dfe2dddb 100644 --- a/pkg/hs/urbit-noun/package.yaml +++ b/pkg/hs/urbit-noun/package.yaml @@ -25,6 +25,7 @@ dependencies: - text - time - urbit-atom + - urbit-hob - urbit-noun-core default-extensions: