king: stateless forwarding

This commit is contained in:
pilfer-pandex 2020-09-26 15:55:10 -07:00
parent dc7f9bd08c
commit 52b917ee71
8 changed files with 293 additions and 56 deletions

View File

@ -190,9 +190,9 @@ runHostEnv multi ports action = do
king <- ask
let hostEnv = HostEnv { _hostEnvKingEnv = king
, _hostEnvMultiEyreApi = multi
, _hostEnvPortControlApi = ports
}
, _hostEnvMultiEyreApi = multi
, _hostEnvPortControlApi = ports
}
io (runRIO hostEnv action)

View File

@ -9,14 +9,19 @@ import Urbit.Prelude
import Network.Socket hiding (recvFrom, sendTo)
import Urbit.Arvo hiding (Fake)
import Urbit.King.Config
import Urbit.Vere.Ames.LaneCache
import Urbit.Vere.Ames.Packet
import Urbit.Vere.Pier.Types
import Urbit.Vere.Ports
import Data.Serialize (decode, encode)
import Urbit.King.App (HasKingId(..), HasPierEnv(..))
import Urbit.Vere.Ames.DNS (NetworkMode(..), ResolvServ(..))
import Urbit.Vere.Ames.DNS (galaxyPort, resolvServ)
import Urbit.Vere.Ames.UDP (UdpServ(..), fakeUdpServ, realUdpServ)
import qualified Urbit.Noun.Time as Time
-- Constants -------------------------------------------------------------------
@ -33,11 +38,15 @@ packetsDroppedPerComplaint = 1000
-- Types -----------------------------------------------------------------------
type Version = Word8
data AmesDrv = AmesDrv
{ aTurfs :: TVar (Maybe [Turf])
, aDropped :: TVar Word
, aVersion :: TVar (Maybe Version)
, aUdpServ :: UdpServ
, aResolvr :: ResolvServ
, aVersTid :: Async ()
, aRecvTid :: Async ()
}
@ -82,9 +91,10 @@ bornEv inst = EvBlip $ BlipEvNewt $ NewtEvBorn (fromIntegral inst, ()) ()
hearEv :: PortNumber -> HostAddress -> ByteString -> Ev
hearEv p a bs =
EvBlip $ BlipEvAmes $ AmesEvHear () dest (MkBytes bs)
where
dest = EachNo $ Jammed $ AAIpv4 (Ipv4 a) (fromIntegral p)
EvBlip $ BlipEvAmes $ AmesEvHear () (ipDest p a) (MkBytes bs)
ipDest :: PortNumber -> HostAddress -> AmesDest
ipDest p a = EachNo $ Jammed $ AAIpv4 (Ipv4 a) (fromIntegral p)
--------------------------------------------------------------------------------
@ -125,9 +135,10 @@ ames'
:: HasPierEnv e
=> Ship
-> Bool
-> (Time.Wen -> Gang -> Path -> (Maybe (Term, Noun) -> IO ()) -> STM ())
-> (Text -> RIO e ())
-> RIO e ([Ev], RAcquire e (DriverApi NewtEf))
ames' who isFake stderr = do
ames' who isFake scry stderr = do
-- Unfortunately, we cannot use TBQueue because the only behavior
-- provided for when full is to block the writer. The implementation
-- below uses materially the same data structures as TBQueue, however.
@ -151,7 +162,7 @@ ames' who isFake stderr = do
pure pM
env <- ask
let (bornEvs, startDriver) = ames env who isFake enqueuePacket stderr
let (bornEvs, startDriver) = ames env who isFake scry enqueuePacket stderr
let runDriver = do
diOnEffect <- startDriver
@ -178,10 +189,11 @@ ames
=> e
-> Ship
-> Bool
-> (Time.Wen -> Gang -> Path -> (Maybe (Term, Noun) -> IO ()) -> STM ())
-> (EvErr -> STM PacketOutcome)
-> (Text -> RIO e ())
-> ([Ev], RAcquire e (NewtEf -> IO ()))
ames env who isFake enqueueEv stderr = (initialEvents, runAmes)
ames env who isFake scry enqueueEv stderr = (initialEvents, runAmes)
where
king = fromIntegral (env ^. kingIdL)
@ -194,36 +206,85 @@ ames env who isFake enqueueEv stderr = (initialEvents, runAmes)
drv <- mkRAcquire start stop
pure (handleEffect drv mode)
start :: HasLogFunc e => RIO e AmesDrv
start :: RIO e AmesDrv
start = do
mode <- rio (netMode isFake)
cache <- laneCache scryLane
aTurfs <- newTVarIO Nothing
aDropped <- newTVarIO 0
aVersion <- newTVarIO Nothing
aVersTid <- trackVersionThread aVersion
aUdpServ <- udpServ isFake who
aRecvTid <- queuePacketsThread aDropped aUdpServ
aResolvr <- resolvServ aTurfs (usSend aUdpServ) stderr
aRecvTid <- queuePacketsThread
aDropped
aVersion
(byCache cache)
(send aUdpServ aResolvr mode)
aUdpServ
pure (AmesDrv { .. })
hearFailed _ = pure ()
queuePacketsThread :: HasLogFunc e => TVar Word -> UdpServ -> RIO e (Async ())
queuePacketsThread dropCtr UdpServ {..} = async $ forever $ do
outcome <- atomically $ do
(p, a, b) <- usRecv
enqueueEv (EvErr (hearEv p a b) hearFailed)
case outcome of
Intake -> pure ()
Ouster -> do
d <- atomically $ do
d <- readTVar dropCtr
writeTVar dropCtr (d + 1)
pure d
when (d `rem` packetsDroppedPerComplaint == 0) $
logWarn "ames: queue full; dropping inbound packets"
trackVersionThread :: HasLogFunc e => TVar (Maybe Version) -> RIO e (Async ())
trackVersionThread versSlot = async $ forever do
env <- ask
stop :: AmesDrv -> RIO e ()
scryVersion \v -> do
v0 <- readTVarIO versSlot
atomically $ writeTVar versSlot (Just v)
if (v0 == Just v)
then logInfo $ displayShow ("ames: proto version unchanged at", v)
else stderr ("ames: protocol version now " <> tshow v)
threadDelay (10 * 60 * 1_000_000) -- 10m
queuePacketsThread :: HasLogFunc e
=> TVar Word
-> TVar (Maybe Version)
-> (Ship -> (Maybe [AmesDest] -> RIO e ()) -> RIO e ())
-> (AmesDest -> ByteString -> RIO e ())
-> UdpServ
-> RIO e (Async ())
queuePacketsThread dropCtr vers lan forward UdpServ{..} = async $ forever $ do
-- port number, host address, bytestring
(p, a, b) <- atomically usRecv
ver <- readTVarIO vers
case decode b of
Right (pkt@Packet {..}) | ver == Nothing || ver == Just pktVersion -> do
logDebug $ displayShow ("ames: bon packet", pkt, showUD $ bytesAtom b)
if pktRcvr == who
then serf'sUp p a b
else lan pktRcvr $ \case
Just (dest:_) -> forward dest $ encode pkt
{ pktOrigin = pktOrigin <|> Just (ipDest p a) }
_ -> logInfo $ displayShow ("ames: dropping unroutable", pkt)
Right pkt -> logInfo $ displayShow ("ames: dropping ill-versed", pkt, ver)
Left e -> logInfo $ displayShow ("ames: dropping malformed", e)
where
serf'sUp p a b =
atomically (enqueueEv (EvErr (hearEv p a b) hearFailed)) >>= \case
Intake -> pure ()
Ouster -> do
d <- atomically $ do
d <- readTVar dropCtr
writeTVar dropCtr (d + 1)
pure d
when (d `rem` packetsDroppedPerComplaint == 0) $
logWarn "ames: queue full; dropping inbound packets"
stop :: forall e. AmesDrv -> RIO e ()
stop AmesDrv {..} = io $ do
usKill aUdpServ
rsKill aResolvr
cancel aVersTid
cancel aRecvTid
handleEffect :: AmesDrv -> NetworkMode -> NewtEf -> IO ()
@ -234,19 +295,53 @@ ames env who isFake enqueueEv stderr = (initialEvents, runAmes)
NewtEfSend (_id, ()) dest (MkBytes bs) -> do
atomically (readTVar aTurfs) >>= \case
Nothing -> stderr "ames: send before turfs" >> pure ()
Just turfs -> sendPacket drv mode dest bs
Just turfs -> send aUdpServ aResolvr mode dest bs
sendPacket :: AmesDrv -> NetworkMode -> AmesDest -> ByteString -> RIO e ()
sendPacket AmesDrv {..} mode dest byt = do
let to adr = io (usSend aUdpServ adr byt)
send :: UdpServ
-> ResolvServ
-> NetworkMode
-> AmesDest
-> ByteString
-> RIO e ()
send udpServ resolvr mode dest byt = do
let to adr = io (usSend udpServ adr byt)
case (mode, dest) of
(NoNetwork, _ ) -> pure ()
(Fake , _ ) -> when (okFakeAddr dest) $ to (localAddr Fake dest)
(Localhost, _ ) -> to (localAddr Localhost dest)
(Real , ra) -> ra & \case
EachYes gala -> io (rsSend aResolvr gala byt)
EachYes gala -> io (rsSend resolvr gala byt)
EachNo addr -> to (ipv4Addr addr)
scryVersion :: HasLogFunc e => (Version -> RIO e ()) -> RIO e ()
scryVersion = scry' ["protocol", "version"]
. maybe (logError "ames: could not scry for version")
scryLane :: HasLogFunc e
=> Ship
-> (Maybe [AmesDest] -> RIO e ())
-> RIO e ()
scryLane ship = scry' ["peers", MkKnot $ tshow ship, "forward-lane"]
scry' :: forall e n
. (HasLogFunc e, FromNoun n)
=> [Knot]
-> (Maybe n -> RIO e ())
-> RIO e ()
scry' p k = do
env <- ask
wen <- io Time.now
let nkt = MkKnot $ tshow $ Time.MkDate wen
let pax = Path $ "ax" : MkKnot (tshow who) : "" : nkt : p
putStrLn ("scrying for " <> tshow pax)
let kon = runRIO env . \case
Just (_, fromNoun @n -> Just v) -> k (Just v)
Just (_, n) -> do
logError $ displayShow ("ames: uncanny scry result", pax, n)
k Nothing
Nothing -> k Nothing
atomically $ scry wen Nothing pax kon
ipv4Addr (Jammed (AAVoid v )) = absurd v
ipv4Addr (Jammed (AAIpv4 a p)) = SockAddrInet (fromIntegral p) (unIpv4 a)

View File

@ -0,0 +1,34 @@
module Urbit.Vere.Ames.LaneCache (LaneCache, laneCache, byCache) where
import Urbit.Prelude
import Urbit.Noun.Time
expiry :: Gap
expiry = (2 * 60) ^. from secs
data LaneCache m a b = LaneCache
{ lcCache :: TVar (Map a (Wen, b))
, lcAction :: a -> (b -> m ()) -> m ()
}
laneCache :: (Ord a, MonadIO n)
=> (a -> (b -> m ()) -> m ())
-> n (LaneCache m a b)
laneCache act = LaneCache <$> newTVarIO mempty <*> pure act
byCache :: (Ord a, MonadIO m)
=> LaneCache m a b
-> a -> (b -> m ()) -> m ()
byCache LaneCache {..} x f = lookup x <$> readTVarIO lcCache >>= \case
Nothing -> go
Just (t, v) -> do
t' <- io now
if gap t' t > expiry
then go
else f v
where
go = lcAction x $ \v -> do
t <- io now
atomically $ modifyTVar' lcCache (insertMap x (t, v))
f v

View File

@ -0,0 +1,102 @@
{-|
Parsing of Ames packets
-}
module Urbit.Vere.Ames.Packet where
import Urbit.Prelude
import Data.Bits
import Data.LargeWord
import Data.Serialize
import Urbit.Arvo (AmesDest)
import Urbit.Noun.Tree (mug)
data Packet = Packet
{ pktVersion :: Word8
, pktEncrypted :: Bool
--
, pktSndr :: Ship
, pktRcvr :: Ship
, pktOrigin :: Maybe AmesDest
, pktContent :: Bytes
}
instance Show Packet where
show Packet {..}
= "Packet {pktVersion = "
<> show pktVersion
<> ", pktEncrypted = "
<> show pktEncrypted
<> ", pktSndr = "
<> show pktSndr
<> ", pktRcvr = "
<> show pktRcvr
<> ", pktOrigin = "
<> show pktOrigin
<> ", pktContent = "
<> showUD (bytesAtom $ unBytes pktContent)
<> "}"
instance Serialize Packet where
get = do
-- header
head <- getWord32le
let pktVersion = head .&. 0b111 & fromIntegral
let checksum = shiftR head 3 .&. (2 ^ 20 - 1)
let sndrRank = shiftR head 23 .&. 0b11
let rcvrRank = shiftR head 25 .&. 0b11
let pktEncrypted = testBit head 27 & not -- loobean
-- verify checksum
lookAhead $ do
len <- remaining
body <- getBytes len
-- XX mug (marked "TODO") is implemented as "slowMug" in U.N.Tree. Ominous
-- Also, toNoun will copy the bytes into an atom. We probably want a mugBS
let chk = fromIntegral (mug $ toNoun $ MkBytes body) .&. (2 ^ 20 - 1)
when (checksum /= chk) $
fail ("checksum mismatch: expected " <> show checksum
<> "; got " <> show chk)
-- body
pktSndr <- getShip sndrRank
pktRcvr <- getShip rcvrRank
len <- remaining
payload <- getBytes len
-- data ("payload")
(pktOrigin, pktContent) <- case cueBS payload of
Left e -> fail (show e)
Right n -> case fromNounErr n of
Left e -> fail (show e)
Right c -> pure c
pure Packet {..}
where
getShip = fmap Ship . \case
0 -> fromIntegral <$> getWord16le -- galaxy / star
1 -> fromIntegral <$> getWord32le -- planet
2 -> fromIntegral <$> getWord64le -- moon
3 -> LargeKey <$> getWord64le <*> getWord64le -- comet
_ -> fail "impossibiru"
put Packet {..} = do
let load = jamBS $ toNoun (pktOrigin, pktContent)
let (sndR, putSndr) = putShipGetRank pktSndr
let (rcvR, putRcvr) = putShipGetRank pktRcvr
let body = runPut (putSndr <> putRcvr <> putByteString load)
-- XX again maybe mug can be made better here
let chek = fromIntegral (mug $ toNoun $ MkBytes body) .&. (2 ^ 20 - 1)
let encr = pktEncrypted
let vers = fromIntegral pktVersion .&. 0b111
let head = vers
.|. shiftL chek 3
.|. shiftL sndR 23
.|. shiftL rcvR 25
.|. if encr then 0 else bit 27
putWord32le head
putByteString body -- XX can we avoid copy?
where
putShipGetRank s@(Ship (LargeKey p q)) = case () of
_ | s < 2 ^ 16 -> (0, putWord16le $ fromIntegral s) -- gar
| s < 2 ^ 32 -> (1, putWord32le $ fromIntegral s) -- pan
| s < 2 ^ 64 -> (2, putWord64le $ fromIntegral s) -- mon
| otherwise -> (3, putWord64le p >> putWord64le q) -- com

View File

@ -305,6 +305,9 @@ pier (serf, log) vSlog startedSig = do
atomically $ Term.trace muxed txt
logOther "serf" (display $ T.strip txt)
scrySig <- newEmptyTMVarIO
onKill <- view onKillPierSigL
-- Our call above to set the logging function which echos errors from the
-- Serf doesn't have the appended \r\n because those \r\n s are added in
-- the c serf code. Logging output from our haskell process must manually
@ -312,6 +315,7 @@ pier (serf, log) vSlog startedSig = do
let compute = putTMVar computeQ
let execute = writeTQueue executeQ
let persist = writeTQueue persistQ
let scry = \w b g k -> putTMVar scrySig (w, b, g, k)
let sigint = Serf.sendSIGINT serf
(bootEvents, startDrivers) <- do
@ -319,10 +323,7 @@ pier (serf, log) vSlog startedSig = do
let err = atomically . Term.trace muxed . (<> "\r\n")
let siz = TermSize { tsWide = 80, tsTall = 24 }
let fak = isFake logId
drivers env ship fak compute (siz, muxed) err sigint
scrySig <- newEmptyTMVarIO
onKill <- view onKillPierSigL
drivers env ship fak compute scry (siz, muxed) err sigint
let computeConfig = ComputeConfig { ccOnWork = takeTMVar computeQ
, ccOnKill = onKill
@ -369,19 +370,6 @@ pier (serf, log) vSlog startedSig = do
threadDelay (snapshotEverySecs * 1_000_000)
void $ atomically $ tryPutTMVar saveSig ()
-- TODO bullshit scry tester
when False $ do
void $ acquireWorker "bullshit scry tester" $ do
env <- ask
forever $ do
threadDelay 15_000_000
wen <- io Time.now
let kal = \mTermNoun -> runRIO env $ do
logInfo $ displayShow ("scry result: ", mTermNoun)
let nkt = MkKnot $ tshow $ Time.MkDate wen
let pax = Path ["j", "~zod", "life", nkt, "~zod"]
atomically $ putTMVar scrySig (wen, Nothing, pax, kal)
putMVar startedSig ()
-- Wait for something to die.
@ -423,18 +411,21 @@ drivers
-> Ship
-> Bool
-> (RunReq -> STM ())
-> (Wen -> Gang -> Path -> (Maybe (Term, Noun) -> IO ()) -> STM ())
-> (TermSize, Term.Client)
-> (Text -> RIO e ())
-> IO ()
-> RAcquire e ([Ev], RAcquire e Drivers)
drivers env who isFake plan termSys stderr serfSIGINT = do
drivers env who isFake plan scry termSys stderr serfSIGINT = do
(behnBorn, runBehn) <- rio Behn.behn'
(termBorn, runTerm) <- rio (Term.term' termSys serfSIGINT)
(amesBorn, runAmes) <- rio (Ames.ames' who isFake stderr)
(amesBorn, runAmes) <- rio (Ames.ames' who isFake scry stderr)
(httpBorn, runEyre) <- rio (Eyre.eyre' who isFake stderr)
(clayBorn, runClay) <- rio Clay.clay'
(irisBorn, runIris) <- rio Iris.client'
putStrLn ("ship is " <> tshow who)
let initialEvents = mconcat [behnBorn,clayBorn,amesBorn,httpBorn,irisBorn,termBorn]
let runDrivers = do

View File

@ -31,6 +31,7 @@ dependencies:
- binary
- bytestring
- case-insensitive
- cereal
- classy-prelude
- conduit
- containers
@ -119,6 +120,7 @@ dependencies:
default-extensions:
- ApplicativeDo
- BangPatterns
- BinaryLiterals
- BlockArguments
- ConstraintKinds
- DataKinds
@ -145,6 +147,7 @@ default-extensions:
- OverloadedStrings
- PackageImports
- PartialTypeSignatures
- PatternGuards
- PatternSynonyms
- QuasiQuotes
- Rank2Types

View File

@ -12,6 +12,7 @@ module Urbit.Noun.Conversions
, UD(..), UV(..), UW(..), cordToUW
, Mug(..), Path(..), EvilPath(..), Ship(..)
, Lenient(..), pathToFilePath, filePathToPath
, showUD, tshowUD
) where
import ClassyPrelude hiding (hash)
@ -36,6 +37,7 @@ import RIO.FilePath (joinPath, splitDirectories, takeBaseName,
takeDirectory, takeExtension, (<.>))
import Urbit.Noun.Cue (cue)
import Urbit.Noun.Jam (jam)
import Urbit.Ob (patp)
import qualified Data.Char as C
import qualified Data.Text.Encoding as T
@ -97,22 +99,28 @@ instance FromNoun UD where
Nothing -> fail ("invalid decimal atom: " <> unpack (filter (/= '.') t))
Just vl -> pure (UD vl)
showUD :: (Show i, Integral i) => i -> String
showUD = uTypeAddDots 3 . show
tshowUD :: (Show i, Integral i) => i -> Text
tshowUD = pack . uTypeAddDots 3 . show
--------------------------------------------------------------------------------
uTypeAddDots :: String -> String
uTypeAddDots = reverse . go . reverse
uTypeAddDots :: Int -> String -> String
uTypeAddDots n = reverse . go . reverse
where
go s = if null tel then hed
else hed <> "." <> go tel
where
hed = take 5 s
tel = drop 5 s
hed = take n s
tel = drop n s
convertToU :: [Char] -> [Char] -> Atom -> String
convertToU baseMap prefix = go []
where
go acc 0 = "0" <> prefix <> uTypeAddDots acc
go acc 0 = "0" <> prefix <> uTypeAddDots 5 acc
go acc n = go (char n : acc) (n `div` len)
char n = baseMap !! (fromIntegral (n `mod` len))
@ -571,7 +579,10 @@ instance FromNoun Term where -- XX TODO
-- Ship ------------------------------------------------------------------------
newtype Ship = Ship Word128 -- @p
deriving newtype (Eq, Ord, Show, Enum, Real, Integral, Num, ToNoun, FromNoun)
deriving newtype (Eq, Ord, Enum, Real, Integral, Num, ToNoun, FromNoun)
instance Show Ship where
show = show . patp . fromIntegral
-- Path ------------------------------------------------------------------------

View File

@ -25,6 +25,7 @@ dependencies:
- text
- time
- urbit-atom
- urbit-hob
- urbit-noun-core
default-extensions: