mirror of
https://github.com/urbit/shrub.git
synced 2024-12-21 01:41:37 +03:00
king: stat
This commit is contained in:
parent
42b776fd19
commit
80540c2142
@ -16,6 +16,7 @@ module Urbit.Prelude
|
||||
, io, rio
|
||||
, logTrace
|
||||
, acquireWorker, acquireWorkerBound
|
||||
, hark
|
||||
) where
|
||||
|
||||
import ClassyPrelude
|
||||
@ -38,6 +39,8 @@ import RIO (HasLogFunc, LogFunc, LogLevel(..), logDebug, logError, logFuncL,
|
||||
logInfo, logOptionsHandle, logOther, logWarn, mkLogFunc,
|
||||
setLogMinLevel, setLogUseLoc, setLogUseTime, withLogFunc)
|
||||
|
||||
import qualified RIO
|
||||
|
||||
io :: MonadIO m => IO a -> m a
|
||||
io = liftIO
|
||||
|
||||
@ -47,6 +50,9 @@ rio = liftRIO
|
||||
logTrace :: HasLogFunc e => Utf8Builder -> RIO e ()
|
||||
logTrace = logOther "trace"
|
||||
|
||||
-- | Composes a log message out of textual components.
|
||||
hark :: [Text] -> Utf8Builder
|
||||
hark = RIO.displayBytesUtf8 . foldMap encodeUtf8
|
||||
|
||||
-- Utils for Spawning Worker Threads -------------------------------------------
|
||||
|
||||
|
@ -24,6 +24,7 @@ import Urbit.King.App (HasKingId(..), HasPierEnv(..))
|
||||
import Urbit.Vere.Ames.DNS (NetworkMode(..), ResolvServ(..))
|
||||
import Urbit.Vere.Ames.DNS (galaxyPort, resolvServ)
|
||||
import Urbit.Vere.Ames.UDP (UdpServ(..), fakeUdpServ, realUdpServ)
|
||||
import Urbit.Vere.Stat (AmesStat(..))
|
||||
|
||||
import qualified Urbit.Noun.Time as Time
|
||||
|
||||
@ -125,13 +126,14 @@ udpPort isFake who = do
|
||||
udpServ :: (HasLogFunc e, HasNetworkConfig e, HasPortControlApi e)
|
||||
=> Bool
|
||||
-> Ship
|
||||
-> AmesStat
|
||||
-> RIO e UdpServ
|
||||
udpServ isFake who = do
|
||||
udpServ isFake who stat = do
|
||||
mode <- netMode isFake
|
||||
port <- udpPort isFake who
|
||||
case modeAddress mode of
|
||||
Nothing -> fakeUdpServ
|
||||
Just host -> realUdpServ port host
|
||||
Just host -> realUdpServ port host stat
|
||||
|
||||
_bornFailed :: e -> WorkError -> IO ()
|
||||
_bornFailed env _ = runRIO env $ do
|
||||
@ -141,10 +143,11 @@ ames'
|
||||
:: HasPierEnv e
|
||||
=> Ship
|
||||
-> Bool
|
||||
-> AmesStat
|
||||
-> (Time.Wen -> Gang -> Path -> IO (Maybe (Term, Noun)))
|
||||
-> (Text -> RIO e ())
|
||||
-> RIO e ([Ev], RAcquire e (DriverApi NewtEf))
|
||||
ames' who isFake scry stderr = do
|
||||
ames' who isFake stat scry stderr = do
|
||||
-- Unfortunately, we cannot use TBQueue because the only behavior
|
||||
-- provided for when full is to block the writer. The implementation
|
||||
-- below uses materially the same data structures as TBQueue, however.
|
||||
@ -168,7 +171,7 @@ ames' who isFake scry stderr = do
|
||||
pure pM
|
||||
|
||||
env <- ask
|
||||
let (bornEvs, startDriver) = ames env who isFake scry enqueuePacket stderr
|
||||
let (bornEvs, startDriver) = ames env who isFake stat scry enqueuePacket stderr
|
||||
|
||||
let runDriver = do
|
||||
diOnEffect <- startDriver
|
||||
@ -195,11 +198,12 @@ ames
|
||||
=> e
|
||||
-> Ship
|
||||
-> Bool
|
||||
-> AmesStat
|
||||
-> (Time.Wen -> Gang -> Path -> IO (Maybe (Term, Noun)))
|
||||
-> (EvErr -> STM PacketOutcome)
|
||||
-> (Text -> RIO e ())
|
||||
-> ([Ev], RAcquire e (NewtEf -> IO ()))
|
||||
ames env who isFake scry enqueueEv stderr = (initialEvents, runAmes)
|
||||
ames env who isFake stat scry enqueueEv stderr = (initialEvents, runAmes)
|
||||
where
|
||||
king = fromIntegral (env ^. kingIdL)
|
||||
|
||||
@ -221,7 +225,7 @@ ames env who isFake scry enqueueEv stderr = (initialEvents, runAmes)
|
||||
aDropped <- newTVarIO 0
|
||||
aVersion <- newTVarIO Nothing
|
||||
aVersTid <- trackVersionThread aVersion
|
||||
aUdpServ <- udpServ isFake who
|
||||
aUdpServ <- udpServ isFake who stat
|
||||
aResolvr <- resolvServ aTurfs (usSend aUdpServ) stderr
|
||||
aRecvTid <- queuePacketsThread
|
||||
aDropped
|
||||
@ -229,10 +233,19 @@ ames env who isFake scry enqueueEv stderr = (initialEvents, runAmes)
|
||||
cachedScryLane
|
||||
(send aUdpServ aResolvr mode)
|
||||
aUdpServ
|
||||
stat
|
||||
|
||||
pure (AmesDrv { .. })
|
||||
|
||||
hearFailed _ = pure ()
|
||||
hearFailed AmesStat {..} = runRIO env . \case
|
||||
RunSwap{} -> atomically $ modifyTVar' asSwp (+ 1)
|
||||
RunBail gs -> do
|
||||
for gs \(t, es) ->
|
||||
for es \e ->
|
||||
logWarn $ hark
|
||||
["ames: goof: ", unTerm t, ": ", tankToText e]
|
||||
atomically $ modifyTVar' asBal (+ 1)
|
||||
RunOkay{} -> atomically $ modifyTVar' asOky (+ 1)
|
||||
|
||||
trackVersionThread :: HasLogFunc e => TVar (Maybe Version) -> RIO e (Async ())
|
||||
trackVersionThread versSlot = async $ forever do
|
||||
@ -254,12 +267,15 @@ ames env who isFake scry enqueueEv stderr = (initialEvents, runAmes)
|
||||
-> (Ship -> RIO e (Maybe [AmesDest]))
|
||||
-> (AmesDest -> ByteString -> RIO e ())
|
||||
-> UdpServ
|
||||
-> AmesStat
|
||||
-> RIO e (Async ())
|
||||
queuePacketsThread dropCtr vers lan forward UdpServ{..} = async $ forever $ do
|
||||
queuePacketsThread dropCtr vers lan forward UdpServ{..} s@(AmesStat{..}) = async $ forever $ do
|
||||
-- port number, host address, bytestring
|
||||
(p, a, b) <- atomically usRecv
|
||||
(p, a, b) <- atomically (modifyTVar' asRcv (+ 1) >> usRecv)
|
||||
ver <- readTVarIO vers
|
||||
|
||||
serfsUp p a b
|
||||
-- TODO make this make sense with stats
|
||||
{-
|
||||
case decode b of
|
||||
Right (pkt@Packet {..}) | ver == Nothing || ver == Just pktVersion -> do
|
||||
logDebug $ displayShow ("ames: bon packet", pkt, showUD $ bytesAtom b)
|
||||
@ -294,10 +310,11 @@ ames env who isFake scry enqueueEv stderr = (initialEvents, runAmes)
|
||||
-- they cannot be filtered, as we do not know their semantics
|
||||
--
|
||||
Left e -> logInfo $ displayShow ("ames: dropping malformed", e)
|
||||
-}
|
||||
|
||||
where
|
||||
serfsUp p a b =
|
||||
atomically (enqueueEv (EvErr (hearEv p a b) hearFailed)) >>= \case
|
||||
atomically (enqueueEv (EvErr (hearEv p a b) (hearFailed s))) >>= \case
|
||||
Intake -> pure ()
|
||||
Ouster -> do
|
||||
d <- atomically $ do
|
||||
|
@ -34,6 +34,7 @@ where
|
||||
|
||||
import Urbit.Prelude
|
||||
import Urbit.Vere.Ports
|
||||
import Urbit.Vere.Stat
|
||||
|
||||
import Network.Socket
|
||||
|
||||
@ -156,8 +157,9 @@ realUdpServ
|
||||
. (HasLogFunc e, HasPortControlApi e)
|
||||
=> PortNumber
|
||||
-> HostAddress
|
||||
-> AmesStat
|
||||
-> RIO e UdpServ
|
||||
realUdpServ por hos = do
|
||||
realUdpServ por hos sat = do
|
||||
logInfo $ displayShow ("AMES", "UDP", "Starting real UDP server.")
|
||||
|
||||
env <- ask
|
||||
@ -239,6 +241,7 @@ realUdpServ por hos = do
|
||||
pure ()
|
||||
Right (Just (b, p, a)) -> do
|
||||
logDebug "AMES: UDP: Received packet."
|
||||
atomically $ modifyTVar' (asUdp sat) (+ 1)
|
||||
enqueueRecvPacket p a b
|
||||
|
||||
let shutdown = do
|
||||
|
@ -23,6 +23,7 @@ import RIO.Directory
|
||||
import Urbit.Arvo
|
||||
import Urbit.King.App
|
||||
import Urbit.Vere.Pier.Types
|
||||
import Urbit.Vere.Stat
|
||||
|
||||
import Control.Monad.STM (retry)
|
||||
import System.Environment (getExecutablePath)
|
||||
@ -429,9 +430,11 @@ drivers
|
||||
-> Site.KingSubsite
|
||||
-> RAcquire e ([Ev], RAcquire e Drivers)
|
||||
drivers env who isFake plan scry termSys stderr serfSIGINT sub = do
|
||||
stat@Stat{..} <- newStat
|
||||
|
||||
(behnBorn, runBehn) <- rio Behn.behn'
|
||||
(termBorn, runTerm) <- rio (Term.term' termSys serfSIGINT)
|
||||
(amesBorn, runAmes) <- rio (Ames.ames' who isFake scry stderr)
|
||||
(termBorn, runTerm) <- rio (Term.term' termSys (renderStat stat) serfSIGINT)
|
||||
(amesBorn, runAmes) <- rio (Ames.ames' who isFake statAmes scry stderr)
|
||||
(httpBorn, runEyre) <- rio (Eyre.eyre' who isFake stderr sub)
|
||||
(clayBorn, runClay) <- rio Clay.clay'
|
||||
(irisBorn, runIris) <- rio Iris.client'
|
||||
|
39
pkg/hs/urbit-king/lib/Urbit/Vere/Stat.hs
Normal file
39
pkg/hs/urbit-king/lib/Urbit/Vere/Stat.hs
Normal file
@ -0,0 +1,39 @@
|
||||
module Urbit.Vere.Stat where
|
||||
|
||||
import Urbit.Prelude
|
||||
|
||||
data Stat = Stat
|
||||
{ statAmes :: AmesStat
|
||||
}
|
||||
|
||||
data AmesStat = AmesStat
|
||||
{ asUdp :: TVar Word
|
||||
, asRcv :: TVar Word
|
||||
, asSwp :: TVar Word
|
||||
, asBal :: TVar Word
|
||||
, asOky :: TVar Word
|
||||
}
|
||||
|
||||
newStat :: MonadIO m => m Stat
|
||||
newStat = do
|
||||
asUdp <- newTVarIO 0
|
||||
asRcv <- newTVarIO 0
|
||||
asSwp <- newTVarIO 0
|
||||
asBal <- newTVarIO 0
|
||||
asOky <- newTVarIO 0
|
||||
pure Stat{statAmes = AmesStat{..}}
|
||||
|
||||
type RenderedStat = [Text]
|
||||
|
||||
renderStat :: MonadIO m => Stat -> m RenderedStat
|
||||
renderStat Stat{statAmes = AmesStat{..}} =
|
||||
sequence
|
||||
[ pure "stat:"
|
||||
, pure " ames:"
|
||||
, (" udp: " <>) <$> tshow <$> readTVarIO asUdp
|
||||
, (" rcv: " <>) <$> tshow <$> readTVarIO asRcv
|
||||
, (" swp: " <>) <$> tshow <$> readTVarIO asSwp
|
||||
, (" bal: " <>) <$> tshow <$> readTVarIO asBal
|
||||
, (" oky: " <>) <$> tshow <$> readTVarIO asOky
|
||||
]
|
||||
|
@ -27,6 +27,7 @@ import Urbit.Vere.Pier.Types
|
||||
import Data.List ((!!))
|
||||
import RIO.Directory (createDirectoryIfMissing)
|
||||
import Urbit.King.API (readPortsFile)
|
||||
import Urbit.Vere.Stat (RenderedStat)
|
||||
import Urbit.TermSize (TermSize(TermSize))
|
||||
import Urbit.Vere.Term.API (Client(Client), ClientTake(..))
|
||||
|
||||
@ -556,11 +557,15 @@ localClient doneSignal = fst <$> mkRAcquire start stop
|
||||
loop rd
|
||||
else if w == 3 then do
|
||||
-- ETX (^C)
|
||||
logInfo $ displayShow "Ctrl-c interrupt"
|
||||
logInfo $ "Ctrl-c interrupt"
|
||||
atomically $ do
|
||||
writeTQueue wq [Term.Trace "interrupt\r\n"]
|
||||
writeTQueue rq $ Ctl $ Cord "c"
|
||||
loop rd
|
||||
else if w == 20 then do
|
||||
-- DC4 (^T)
|
||||
atomically $ writeTQueue wq [Term.Trace "<stat>\r\n"]
|
||||
loop rd
|
||||
else if w <= 26 then do
|
||||
case pack [BS.w2c (w + 97 - 1)] of
|
||||
"d" -> atomically doneSignal
|
||||
@ -597,9 +602,10 @@ localClient doneSignal = fst <$> mkRAcquire start stop
|
||||
term'
|
||||
:: HasPierEnv e
|
||||
=> (TermSize, Client)
|
||||
-> IO RenderedStat
|
||||
-> IO ()
|
||||
-> RIO e ([Ev], RAcquire e (DriverApi TermEf))
|
||||
term' (tsize, client) serfSIGINT = do
|
||||
term' (tsize, client) stat serfSIGINT = do
|
||||
let TermSize wi hi = tsize
|
||||
initEv = [blewEvent wi hi, initialHail]
|
||||
|
||||
@ -608,7 +614,7 @@ term' (tsize, client) serfSIGINT = do
|
||||
runDriver = do
|
||||
env <- ask
|
||||
ventQ :: TQueue EvErr <- newTQueueIO
|
||||
diOnEffect <- term env (tsize, client) (writeTQueue ventQ) serfSIGINT
|
||||
diOnEffect <- term env (tsize, client) (writeTQueue ventQ) stat serfSIGINT
|
||||
|
||||
let diEventSource = fmap RRWork <$> tryReadTQueue ventQ
|
||||
|
||||
@ -621,9 +627,10 @@ term :: forall e. (HasPierEnv e)
|
||||
=> e
|
||||
-> (TermSize, Client)
|
||||
-> (EvErr -> STM ())
|
||||
-> IO RenderedStat
|
||||
-> IO ()
|
||||
-> RAcquire e (TermEf -> IO ())
|
||||
term env (tsize, Client{..}) plan serfSIGINT = runTerm
|
||||
term env (tsize, Client{..}) plan stat serfSIGINT = runTerm
|
||||
where
|
||||
runTerm :: RAcquire e (TermEf -> IO ())
|
||||
runTerm = do
|
||||
|
Loading…
Reference in New Issue
Block a user