urbit/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs

347 lines
11 KiB
Haskell
Raw Normal View History

2020-01-23 07:16:09 +03:00
{-|
Ames IO Driver
2020-01-23 07:16:09 +03:00
-}
2020-07-23 23:17:02 +03:00
module Urbit.Vere.Ames (ames, ames', PacketOutcome(..)) where
2019-07-03 02:37:10 +03:00
import Urbit.Prelude
2019-08-01 03:27:13 +03:00
import Network.Socket hiding (recvFrom, sendTo)
import Urbit.Arvo hiding (Fake)
import Urbit.King.Config
2020-09-27 01:55:10 +03:00
import Urbit.Vere.Ames.LaneCache
import Urbit.Vere.Ames.Packet
import Urbit.Vere.Pier.Types
import Urbit.Vere.Ports
2019-08-01 03:27:13 +03:00
2020-09-27 01:55:10 +03:00
import Data.Serialize (decode, encode)
import Urbit.King.App (HasKingId(..), HasPierEnv(..))
import Urbit.Vere.Ames.DNS (NetworkMode(..), ResolvServ(..))
import Urbit.Vere.Ames.DNS (galaxyPort, resolvServ)
import Urbit.Vere.Ames.UDP (UdpServ(..), fakeUdpServ, realUdpServ)
2020-09-27 01:55:10 +03:00
import qualified Urbit.Noun.Time as Time
2019-07-03 02:37:10 +03:00
-- Constants -------------------------------------------------------------------
2020-07-22 07:22:08 +03:00
-- | How many unprocessed ames packets to allow in the queue before we start
-- dropping incoming packets.
queueBound :: Word
queueBound = 1000
-- | How often, measured in number of packets dropped, we should announce packet
-- loss.
packetsDroppedPerComplaint :: Word
packetsDroppedPerComplaint = 1000
2019-08-01 03:27:13 +03:00
-- Types -----------------------------------------------------------------------
2020-09-27 01:55:10 +03:00
type Version = Word8
2019-08-01 03:27:13 +03:00
data AmesDrv = AmesDrv
{ aTurfs :: TVar (Maybe [Turf])
, aDropped :: TVar Word
2020-09-27 01:55:10 +03:00
, aVersion :: TVar (Maybe Version)
, aUdpServ :: UdpServ
, aResolvr :: ResolvServ
2020-09-27 01:55:10 +03:00
, aVersTid :: Async ()
, aRecvTid :: Async ()
2019-08-01 03:27:13 +03:00
}
data PacketOutcome
= Intake
| Ouster
2020-01-23 07:16:09 +03:00
2019-08-01 03:27:13 +03:00
-- Utils -----------------------------------------------------------------------
listenPort :: NetworkMode -> Ship -> PortNumber
listenPort m s | s < 256 = galaxyPort m (fromIntegral s)
listenPort m _ = 0 -- I don't care, just give me any port.
2019-08-01 03:27:13 +03:00
localhost :: HostAddress
localhost = tupleToHostAddress (127, 0, 0, 1)
2019-08-01 03:27:13 +03:00
inaddrAny :: HostAddress
inaddrAny = tupleToHostAddress (0, 0, 0, 0)
modeAddress :: NetworkMode -> Maybe HostAddress
modeAddress = \case
Fake -> Just localhost
Localhost -> Just localhost
Real -> Just inaddrAny
NoNetwork -> Nothing
okFakeAddr :: AmesDest -> Bool
okFakeAddr = \case
EachYes _ -> True
EachNo (Jammed (AAIpv4 (Ipv4 a) _)) -> a == localhost
EachNo (Jammed (AAVoid v )) -> absurd v
2019-08-01 03:27:13 +03:00
localAddr :: NetworkMode -> AmesDest -> SockAddr
localAddr mode = \case
EachYes g -> SockAddrInet (galaxyPort mode g) localhost
EachNo (Jammed (AAIpv4 _ p)) -> SockAddrInet (fromIntegral p) localhost
EachNo (Jammed (AAVoid v )) -> absurd v
2019-08-01 03:27:13 +03:00
2019-12-10 05:45:19 +03:00
bornEv :: KingId -> Ev
bornEv inst = EvBlip $ BlipEvNewt $ NewtEvBorn (fromIntegral inst, ()) ()
2019-08-01 03:27:13 +03:00
2019-12-10 05:45:19 +03:00
hearEv :: PortNumber -> HostAddress -> ByteString -> Ev
hearEv p a bs =
2020-09-27 01:55:10 +03:00
EvBlip $ BlipEvAmes $ AmesEvHear () (ipDest p a) (MkBytes bs)
ipDest :: PortNumber -> HostAddress -> AmesDest
ipDest p a = EachNo $ Jammed $ AAIpv4 (Ipv4 a) (fromIntegral p)
2019-08-01 03:27:13 +03:00
2020-01-23 07:16:09 +03:00
2019-08-01 03:27:13 +03:00
--------------------------------------------------------------------------------
netMode :: HasNetworkConfig e => Bool -> RIO e NetworkMode
netMode isFake = do
netMode <- view (networkConfigL . ncNetMode)
noAmes <- view (networkConfigL . ncNoAmes)
pure $ case (noAmes, isFake, netMode) of
(True, _ , _ ) -> NoNetwork
(_ , _ , NMNone ) -> NoNetwork
(_ , True, _ ) -> Fake
(_ , _ , NMNormal ) -> Real
(_ , _ , NMLocalhost) -> Localhost
2020-05-30 01:57:35 +03:00
udpPort :: HasNetworkConfig e => Bool -> Ship -> RIO e PortNumber
udpPort isFake who = do
mode <- netMode isFake
mPort <- view (networkConfigL . ncAmesPort)
pure $ maybe (listenPort mode who) fromIntegral mPort
udpServ :: (HasLogFunc e, HasNetworkConfig e, HasPortControlApi e)
=> Bool
-> Ship
-> RIO e UdpServ
udpServ isFake who = do
mode <- netMode isFake
port <- udpPort isFake who
case modeAddress mode of
Nothing -> fakeUdpServ
Just host -> realUdpServ port host
_bornFailed :: e -> WorkError -> IO ()
_bornFailed env _ = runRIO env $ do
pure () -- TODO What can we do?
ames'
:: HasPierEnv e
=> Ship
-> Bool
2020-09-27 01:55:10 +03:00
-> (Time.Wen -> Gang -> Path -> (Maybe (Term, Noun) -> IO ()) -> STM ())
-> (Text -> RIO e ())
-> RIO e ([Ev], RAcquire e (DriverApi NewtEf))
2020-09-27 01:55:10 +03:00
ames' who isFake scry stderr = do
-- Unfortunately, we cannot use TBQueue because the only behavior
-- provided for when full is to block the writer. The implementation
-- below uses materially the same data structures as TBQueue, however.
ventQ :: TQueue EvErr <- newTQueueIO
avail :: TVar Word <- newTVarIO queueBound
let
enqueuePacket p = do
vail <- readTVar avail
if vail > 0
then do
modifyTVar avail (subtract 1)
writeTQueue ventQ p
pure Intake
else do
_ <- readTQueue ventQ
writeTQueue ventQ p
pure Ouster
dequeuePacket = do
pM <- tryReadTQueue ventQ
when (isJust pM) $ modifyTVar avail (+ 1)
pure pM
env <- ask
2020-09-27 01:55:10 +03:00
let (bornEvs, startDriver) = ames env who isFake scry enqueuePacket stderr
let runDriver = do
diOnEffect <- startDriver
let diEventSource = fmap RRWork <$> dequeuePacket
pure (DriverApi {..})
pure (bornEvs, runDriver)
2020-01-23 07:16:09 +03:00
{-|
2019-08-01 03:27:13 +03:00
inst -- Process instance number.
who -- Which ship are we?
enqueueEv -- Queue-event action.
mPort -- Explicit port override from command line arguments.
4096 is a reasonable number for recvFrom. Packets of that size are
not possible on the internet.
TODO verify that the KingIds match on effects.
2019-08-01 03:27:13 +03:00
-}
ames
:: forall e
. (HasLogFunc e, HasNetworkConfig e, HasPortControlApi e, HasKingId e)
2020-05-13 22:35:57 +03:00
=> e
-> Ship
-> Bool
2020-09-27 01:55:10 +03:00
-> (Time.Wen -> Gang -> Path -> (Maybe (Term, Noun) -> IO ()) -> STM ())
-> (EvErr -> STM PacketOutcome)
-> (Text -> RIO e ())
-> ([Ev], RAcquire e (NewtEf -> IO ()))
2020-09-27 01:55:10 +03:00
ames env who isFake scry enqueueEv stderr = (initialEvents, runAmes)
where
2020-05-13 22:35:57 +03:00
king = fromIntegral (env ^. kingIdL)
initialEvents :: [Ev]
initialEvents = [bornEv king]
2020-06-07 02:34:27 +03:00
runAmes :: RAcquire e (NewtEf -> IO ())
runAmes = do
mode <- rio (netMode isFake)
drv <- mkRAcquire start stop
pure (handleEffect drv mode)
2020-09-27 01:55:10 +03:00
start :: RIO e AmesDrv
start = do
2020-09-27 01:55:10 +03:00
mode <- rio (netMode isFake)
cache <- laneCache scryLane
aTurfs <- newTVarIO Nothing
aDropped <- newTVarIO 0
2020-09-27 01:55:10 +03:00
aVersion <- newTVarIO Nothing
aVersTid <- trackVersionThread aVersion
aUdpServ <- udpServ isFake who
aResolvr <- resolvServ aTurfs (usSend aUdpServ) stderr
2020-09-27 01:55:10 +03:00
aRecvTid <- queuePacketsThread
aDropped
aVersion
(byCache cache)
(send aUdpServ aResolvr mode)
aUdpServ
pure (AmesDrv { .. })
hearFailed _ = pure ()
2020-09-27 01:55:10 +03:00
trackVersionThread :: HasLogFunc e => TVar (Maybe Version) -> RIO e (Async ())
trackVersionThread versSlot = async $ forever do
env <- ask
scryVersion \v -> do
v0 <- readTVarIO versSlot
atomically $ writeTVar versSlot (Just v)
if (v0 == Just v)
then logInfo $ displayShow ("ames: proto version unchanged at", v)
else stderr ("ames: protocol version now " <> tshow v)
2020-10-13 07:57:01 +03:00
threadDelay (10 * 60 * 1_000_000) -- 10m
2020-09-27 01:55:10 +03:00
queuePacketsThread :: HasLogFunc e
=> TVar Word
-> TVar (Maybe Version)
-> (Ship -> (Maybe [AmesDest] -> RIO e ()) -> RIO e ())
-> (AmesDest -> ByteString -> RIO e ())
-> UdpServ
-> RIO e (Async ())
queuePacketsThread dropCtr vers lan forward UdpServ{..} = async $ forever $ do
-- port number, host address, bytestring
(p, a, b) <- atomically usRecv
ver <- readTVarIO vers
case decode b of
Right (pkt@Packet {..}) | ver == Nothing || ver == Just pktVersion -> do
logDebug $ displayShow ("ames: bon packet", pkt, showUD $ bytesAtom b)
if pktRcvr == who
then serfsUp p a b
2020-09-27 01:55:10 +03:00
else lan pktRcvr $ \case
Just (dest:_) -> forward dest $ encode pkt
{ pktOrigin = pktOrigin <|> Just (ipDest p a) }
_ -> logInfo $ displayShow ("ames: dropping unroutable", pkt)
Right pkt -> logInfo $ displayShow ("ames: dropping ill-versed", pkt, ver)
Left e -> logInfo $ displayShow ("ames: dropping malformed", e)
where
serfsUp p a b =
2020-09-27 01:55:10 +03:00
atomically (enqueueEv (EvErr (hearEv p a b) hearFailed)) >>= \case
Intake -> pure ()
Ouster -> do
d <- atomically $ do
d <- readTVar dropCtr
writeTVar dropCtr (d + 1)
pure d
when (d `rem` packetsDroppedPerComplaint == 0) $
logWarn "ames: queue full; dropping inbound packets"
stop :: forall e. AmesDrv -> RIO e ()
stop AmesDrv {..} = io $ do
usKill aUdpServ
rsKill aResolvr
2020-09-27 01:55:10 +03:00
cancel aVersTid
cancel aRecvTid
2020-06-07 02:34:27 +03:00
handleEffect :: AmesDrv -> NetworkMode -> NewtEf -> IO ()
handleEffect drv@AmesDrv {..} mode = runRIO env . \case
NewtEfTurf (_id, ()) turfs -> do
atomically $ writeTVar aTurfs (Just turfs)
NewtEfSend (_id, ()) dest (MkBytes bs) -> do
atomically (readTVar aTurfs) >>= \case
Nothing -> stderr "ames: send before turfs" >> pure ()
2020-09-27 01:55:10 +03:00
Just turfs -> send aUdpServ aResolvr mode dest bs
2020-09-27 01:55:10 +03:00
send :: UdpServ
-> ResolvServ
-> NetworkMode
-> AmesDest
-> ByteString
-> RIO e ()
send udpServ resolvr mode dest byt = do
let to adr = io (usSend udpServ adr byt)
case (mode, dest) of
(NoNetwork, _ ) -> pure ()
(Fake , _ ) -> when (okFakeAddr dest) $ to (localAddr Fake dest)
(Localhost, _ ) -> to (localAddr Localhost dest)
(Real , ra) -> ra & \case
2020-09-27 01:55:10 +03:00
EachYes gala -> io (rsSend resolvr gala byt)
EachNo addr -> to (ipv4Addr addr)
2020-09-27 01:55:10 +03:00
scryVersion :: HasLogFunc e => (Version -> RIO e ()) -> RIO e ()
scryVersion = scry' ["protocol", "version"]
. maybe (logError "ames: could not scry for version")
scryLane :: HasLogFunc e
=> Ship
-> (Maybe [AmesDest] -> RIO e ())
-> RIO e ()
scryLane ship = scry' ["peers", MkKnot $ tshow ship, "forward-lane"]
scry' :: forall e n
. (HasLogFunc e, FromNoun n)
=> [Knot]
-> (Maybe n -> RIO e ())
-> RIO e ()
scry' p k = do
env <- ask
wen <- io Time.now
let nkt = MkKnot $ tshow $ Time.MkDate wen
let pax = Path $ "ax" : MkKnot (tshow who) : "" : nkt : p
let kon = runRIO env . \case
Just (_, fromNoun @n -> Just v) -> k (Just v)
Just (_, n) -> do
logError $ displayShow ("ames: uncanny scry result", pax, n)
k Nothing
Nothing -> k Nothing
atomically $ scry wen Nothing pax kon
ipv4Addr (Jammed (AAVoid v )) = absurd v
ipv4Addr (Jammed (AAIpv4 a p)) = SockAddrInet (fromIntegral p) (unIpv4 a)