Merge pull request #3147 from urbit/pp/king-bounded-ames

king: drop ames packets when >1k are unprocessed
This commit is contained in:
pilfer-pandex 2020-07-23 13:56:38 -07:00 committed by GitHub
commit 88ee19ae13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 14 deletions

View File

@ -2,7 +2,7 @@
Ames IO Driver Ames IO Driver
-} -}
module Urbit.Vere.Ames (ames, ames') where module Urbit.Vere.Ames (ames, ames', PacketOutcome(..)) where
import Urbit.Prelude import Urbit.Prelude
@ -17,15 +17,33 @@ import Urbit.Vere.Ames.DNS (galaxyPort, resolvServ)
import Urbit.Vere.Ames.UDP (UdpServ(..), fakeUdpServ, realUdpServ) import Urbit.Vere.Ames.UDP (UdpServ(..), fakeUdpServ, realUdpServ)
-- Constants -------------------------------------------------------------------
-- | How many unprocessed ames packets to allow in the queue before we start
-- dropping incoming packets.
queueBound :: Word
queueBound = 1000
-- | How often, measured in number of packets dropped, we should announce packet
-- loss.
packetsDroppedPerComplaint :: Word
packetsDroppedPerComplaint = 1000
-- Types ----------------------------------------------------------------------- -- Types -----------------------------------------------------------------------
data AmesDrv = AmesDrv data AmesDrv = AmesDrv
{ aTurfs :: TVar (Maybe [Turf]) { aTurfs :: TVar (Maybe [Turf])
, aDropped :: TVar Word
, aUdpServ :: UdpServ , aUdpServ :: UdpServ
, aResolvr :: ResolvServ , aResolvr :: ResolvServ
, aRecvTid :: Async () , aRecvTid :: Async ()
} }
data PacketOutcome
= Intake
| Ouster
-- Utils ----------------------------------------------------------------------- -- Utils -----------------------------------------------------------------------
@ -106,13 +124,34 @@ ames'
-> (Text -> RIO e ()) -> (Text -> RIO e ())
-> RIO e ([Ev], RAcquire e (DriverApi NewtEf)) -> RIO e ([Ev], RAcquire e (DriverApi NewtEf))
ames' who isFake stderr = do ames' who isFake stderr = do
-- Unfortunately, we cannot use TBQueue because the only behavior
-- provided for when full is to block the writer. The implementation
-- below uses materially the same data structures as TBQueue, however.
ventQ :: TQueue EvErr <- newTQueueIO ventQ :: TQueue EvErr <- newTQueueIO
avail :: TVar Word <- newTVarIO queueBound
let
enqueuePacket p = do
vail <- readTVar avail
if vail > 0
then do
modifyTVar avail (subtract 1)
writeTQueue ventQ p
pure Intake
else do
_ <- readTQueue ventQ
writeTQueue ventQ p
pure Ouster
dequeuePacket = do
pM <- tryReadTQueue ventQ
when (isJust pM) $ modifyTVar avail (+ 1)
pure pM
env <- ask env <- ask
let (bornEvs, startDriver) = ames env who isFake (writeTQueue ventQ) stderr let (bornEvs, startDriver) = ames env who isFake enqueuePacket stderr
let runDriver = do let runDriver = do
diOnEffect <- startDriver diOnEffect <- startDriver
let diEventSource = fmap RRWork <$> tryReadTQueue ventQ let diEventSource = fmap RRWork <$> dequeuePacket
pure (DriverApi {..}) pure (DriverApi {..})
pure (bornEvs, runDriver) pure (bornEvs, runDriver)
@ -135,7 +174,7 @@ ames
=> e => e
-> Ship -> Ship
-> Bool -> Bool
-> (EvErr -> STM ()) -> (EvErr -> STM PacketOutcome)
-> (Text -> RIO e ()) -> (Text -> RIO e ())
-> ([Ev], RAcquire e (NewtEf -> IO ())) -> ([Ev], RAcquire e (NewtEf -> IO ()))
ames env who isFake enqueueEv stderr = (initialEvents, runAmes) ames env who isFake enqueueEv stderr = (initialEvents, runAmes)
@ -151,20 +190,31 @@ ames env who isFake enqueueEv stderr = (initialEvents, runAmes)
drv <- mkRAcquire start stop drv <- mkRAcquire start stop
pure (handleEffect drv mode) pure (handleEffect drv mode)
start :: RIO e AmesDrv start :: HasLogFunc e => RIO e AmesDrv
start = do start = do
aTurfs <- newTVarIO Nothing aTurfs <- newTVarIO Nothing
aDropped <- newTVarIO 0
aUdpServ <- udpServ isFake who aUdpServ <- udpServ isFake who
aRecvTid <- queuePacketsThread aUdpServ aRecvTid <- queuePacketsThread aDropped aUdpServ
aResolvr <- resolvServ aTurfs (usSend aUdpServ) stderr aResolvr <- resolvServ aTurfs (usSend aUdpServ) stderr
pure (AmesDrv { .. }) pure (AmesDrv { .. })
hearFailed _ = pure () hearFailed _ = pure ()
queuePacketsThread :: UdpServ -> RIO e (Async ()) queuePacketsThread :: HasLogFunc e => TVar Word -> UdpServ -> RIO e (Async ())
queuePacketsThread UdpServ {..} = async $ forever $ atomically $ do queuePacketsThread dropCtr UdpServ {..} = async $ forever $ do
outcome <- atomically $ do
(p, a, b) <- usRecv (p, a, b) <- usRecv
enqueueEv (EvErr (hearEv p a b) hearFailed) enqueueEv (EvErr (hearEv p a b) hearFailed)
case outcome of
Intake -> pure ()
Ouster -> do
d <- atomically $ do
d <- readTVar dropCtr
writeTVar dropCtr (d + 1)
pure d
when (d `rem` packetsDroppedPerComplaint == 0) $
logWarn "ames: queue full; dropping inbound packets"
stop :: AmesDrv -> RIO e () stop :: AmesDrv -> RIO e ()
stop AmesDrv {..} = io $ do stop AmesDrv {..} = io $ do

View File

@ -80,8 +80,8 @@ runGala
runGala point = do runGala point = do
env <- ask env <- ask
que <- newTQueueIO que <- newTQueueIO
let (_, runAmes) = let enqueue = \p -> writeTQueue que p $> Intake
ames env (fromIntegral point) True (writeTQueue que) noStderr let (_, runAmes) = ames env (fromIntegral point) True enqueue noStderr
cb <- runAmes cb <- runAmes
io (cb turfEf) io (cb turfEf)
pure (que, cb) pure (que, cb)