urbit/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs

253 lines
7.5 KiB
Haskell
Raw Normal View History

2020-01-23 07:16:09 +03:00
{-|
Ames IO Driver
2020-01-23 07:16:09 +03:00
-}
2020-07-23 23:17:02 +03:00
module Urbit.Vere.Ames (ames, ames', PacketOutcome(..)) where
2019-07-03 02:37:10 +03:00
import Urbit.Prelude
2019-08-01 03:27:13 +03:00
import Network.Socket hiding (recvFrom, sendTo)
import Urbit.Arvo hiding (Fake)
import Urbit.King.Config
import Urbit.Vere.Pier.Types
import Urbit.Vere.Ports
2019-08-01 03:27:13 +03:00
import Urbit.King.App (HasKingId(..), HasPierEnv(..))
import Urbit.Vere.Ames.DNS (NetworkMode(..), ResolvServ(..))
import Urbit.Vere.Ames.DNS (galaxyPort, resolvServ)
import Urbit.Vere.Ames.UDP (UdpServ(..), fakeUdpServ, realUdpServ)
2019-07-03 02:37:10 +03:00
-- Constants -------------------------------------------------------------------
2020-07-22 07:22:08 +03:00
-- | How many unprocessed ames packets to allow in the queue before we start
-- dropping incoming packets.
queueBound :: Word
queueBound = 1000
-- | How often, measured in number of packets dropped, we should announce packet
-- loss.
packetsDroppedPerComplaint :: Word
packetsDroppedPerComplaint = 1000
2019-08-01 03:27:13 +03:00
-- Types -----------------------------------------------------------------------
data AmesDrv = AmesDrv
{ aTurfs :: TVar (Maybe [Turf])
, aDropped :: TVar Word
, aUdpServ :: UdpServ
, aResolvr :: ResolvServ
, aRecvTid :: Async ()
2019-08-01 03:27:13 +03:00
}
data PacketOutcome
= Intake
| Ouster
2020-01-23 07:16:09 +03:00
2019-08-01 03:27:13 +03:00
-- Utils -----------------------------------------------------------------------
listenPort :: NetworkMode -> Ship -> PortNumber
listenPort m s | s < 256 = galaxyPort m (fromIntegral s)
listenPort m _ = 0 -- I don't care, just give me any port.
2019-08-01 03:27:13 +03:00
localhost :: HostAddress
localhost = tupleToHostAddress (127, 0, 0, 1)
2019-08-01 03:27:13 +03:00
inaddrAny :: HostAddress
inaddrAny = tupleToHostAddress (0, 0, 0, 0)
modeAddress :: NetworkMode -> Maybe HostAddress
modeAddress = \case
Fake -> Just localhost
Localhost -> Just localhost
Real -> Just inaddrAny
NoNetwork -> Nothing
okFakeAddr :: AmesDest -> Bool
okFakeAddr = \case
EachYes _ -> True
EachNo (Jammed (AAIpv4 (Ipv4 a) _)) -> a == localhost
EachNo (Jammed (AAVoid v )) -> absurd v
2019-08-01 03:27:13 +03:00
localAddr :: NetworkMode -> AmesDest -> SockAddr
localAddr mode = \case
EachYes g -> SockAddrInet (galaxyPort mode g) localhost
EachNo (Jammed (AAIpv4 _ p)) -> SockAddrInet (fromIntegral p) localhost
EachNo (Jammed (AAVoid v )) -> absurd v
2019-08-01 03:27:13 +03:00
2019-12-10 05:45:19 +03:00
bornEv :: KingId -> Ev
bornEv inst = EvBlip $ BlipEvNewt $ NewtEvBorn (fromIntegral inst, ()) ()
2019-08-01 03:27:13 +03:00
2019-12-10 05:45:19 +03:00
hearEv :: PortNumber -> HostAddress -> ByteString -> Ev
hearEv p a bs =
EvBlip $ BlipEvAmes $ AmesEvHear () dest (MkBytes bs)
where
dest = EachNo $ Jammed $ AAIpv4 (Ipv4 a) (fromIntegral p)
2019-08-01 03:27:13 +03:00
2020-01-23 07:16:09 +03:00
2019-08-01 03:27:13 +03:00
--------------------------------------------------------------------------------
netMode :: HasNetworkConfig e => Bool -> RIO e NetworkMode
netMode isFake = do
netMode <- view (networkConfigL . ncNetMode)
noAmes <- view (networkConfigL . ncNoAmes)
pure $ case (noAmes, isFake, netMode) of
(True, _ , _ ) -> NoNetwork
(_ , _ , NMNone ) -> NoNetwork
(_ , True, _ ) -> Fake
(_ , _ , NMNormal ) -> Real
(_ , _ , NMLocalhost) -> Localhost
2020-05-30 01:57:35 +03:00
udpPort :: HasNetworkConfig e => Bool -> Ship -> RIO e PortNumber
udpPort isFake who = do
mode <- netMode isFake
mPort <- view (networkConfigL . ncAmesPort)
pure $ maybe (listenPort mode who) fromIntegral mPort
udpServ :: (HasLogFunc e, HasNetworkConfig e, HasPortControlApi e)
=> Bool
-> Ship
-> RIO e UdpServ
udpServ isFake who = do
mode <- netMode isFake
port <- udpPort isFake who
case modeAddress mode of
Nothing -> fakeUdpServ
Just host -> realUdpServ port host
_bornFailed :: e -> WorkError -> IO ()
_bornFailed env _ = runRIO env $ do
pure () -- TODO What can we do?
ames'
:: HasPierEnv e
=> Ship
-> Bool
-> (Text -> RIO e ())
-> RIO e ([Ev], RAcquire e (DriverApi NewtEf))
ames' who isFake stderr = do
-- Unfortunately, we cannot use TBQueue because the only behavior
-- provided for when full is to block the writer. The implementation
-- below uses materially the same data structures as TBQueue, however.
ventQ :: TQueue EvErr <- newTQueueIO
avail :: TVar Word <- newTVarIO queueBound
let
enqueuePacket p = do
vail <- readTVar avail
if vail > 0
then do
modifyTVar avail (subtract 1)
writeTQueue ventQ p
pure Intake
else do
_ <- readTQueue ventQ
writeTQueue ventQ p
pure Ouster
dequeuePacket = do
pM <- tryReadTQueue ventQ
when (isJust pM) $ modifyTVar avail (+ 1)
pure pM
env <- ask
let (bornEvs, startDriver) = ames env who isFake enqueuePacket stderr
let runDriver = do
diOnEffect <- startDriver
let diEventSource = fmap RRWork <$> dequeuePacket
pure (DriverApi {..})
pure (bornEvs, runDriver)
2020-01-23 07:16:09 +03:00
{-|
2019-08-01 03:27:13 +03:00
inst -- Process instance number.
who -- Which ship are we?
enqueueEv -- Queue-event action.
mPort -- Explicit port override from command line arguments.
4096 is a reasonable number for recvFrom. Packets of that size are
not possible on the internet.
TODO verify that the KingIds match on effects.
2019-08-01 03:27:13 +03:00
-}
ames
:: forall e
. (HasLogFunc e, HasNetworkConfig e, HasPortControlApi e, HasKingId e)
2020-05-13 22:35:57 +03:00
=> e
-> Ship
-> Bool
-> (EvErr -> STM PacketOutcome)
-> (Text -> RIO e ())
-> ([Ev], RAcquire e (NewtEf -> IO ()))
2020-05-13 22:35:57 +03:00
ames env who isFake enqueueEv stderr = (initialEvents, runAmes)
where
2020-05-13 22:35:57 +03:00
king = fromIntegral (env ^. kingIdL)
initialEvents :: [Ev]
initialEvents = [bornEv king]
2020-06-07 02:34:27 +03:00
runAmes :: RAcquire e (NewtEf -> IO ())
runAmes = do
mode <- rio (netMode isFake)
drv <- mkRAcquire start stop
pure (handleEffect drv mode)
start :: HasLogFunc e => RIO e AmesDrv
start = do
aTurfs <- newTVarIO Nothing
aDropped <- newTVarIO 0
aUdpServ <- udpServ isFake who
aRecvTid <- queuePacketsThread aDropped aUdpServ
aResolvr <- resolvServ aTurfs (usSend aUdpServ) stderr
pure (AmesDrv { .. })
hearFailed _ = pure ()
queuePacketsThread :: HasLogFunc e => TVar Word -> UdpServ -> RIO e (Async ())
queuePacketsThread dropCtr UdpServ {..} = async $ forever $ do
outcome <- atomically $ do
(p, a, b) <- usRecv
enqueueEv (EvErr (hearEv p a b) hearFailed)
case outcome of
Intake -> pure ()
Ouster -> do
d <- atomically $ do
d <- readTVar dropCtr
writeTVar dropCtr (d + 1)
pure d
when (d `rem` packetsDroppedPerComplaint == 0) $
logWarn "ames: queue full; dropping inbound packets"
stop :: AmesDrv -> RIO e ()
stop AmesDrv {..} = io $ do
usKill aUdpServ
rsKill aResolvr
cancel aRecvTid
2020-06-07 02:34:27 +03:00
handleEffect :: AmesDrv -> NetworkMode -> NewtEf -> IO ()
handleEffect drv@AmesDrv {..} mode = runRIO env . \case
NewtEfTurf (_id, ()) turfs -> do
atomically $ writeTVar aTurfs (Just turfs)
NewtEfSend (_id, ()) dest (MkBytes bs) -> do
atomically (readTVar aTurfs) >>= \case
Nothing -> stderr "ames: send before turfs" >> pure ()
Just turfs -> sendPacket drv mode dest bs
sendPacket :: AmesDrv -> NetworkMode -> AmesDest -> ByteString -> RIO e ()
sendPacket AmesDrv {..} mode dest byt = do
let to adr = io (usSend aUdpServ adr byt)
case (mode, dest) of
(NoNetwork, _ ) -> pure ()
(Fake , _ ) -> when (okFakeAddr dest) $ to (localAddr Fake dest)
(Localhost, _ ) -> to (localAddr Localhost dest)
(Real , ra) -> ra & \case
EachYes gala -> io (rsSend aResolvr gala byt)
EachNo addr -> to (ipv4Addr addr)
ipv4Addr (Jammed (AAVoid v )) = absurd v
ipv4Addr (Jammed (AAIpv4 a p)) = SockAddrInet (fromIntegral p) (unIpv4 a)