mirror of
https://github.com/ilyakooo0/urbit.git
synced 2024-12-26 00:12:28 +03:00
Merge pull request #5195 from urbit/pp/leaks2
king: fix several more space leaks
This commit is contained in:
commit
eeac4e5489
@ -2,7 +2,6 @@ resolver: lts-16.15
|
||||
|
||||
packages:
|
||||
- natpmp-static
|
||||
- proto
|
||||
- racquire
|
||||
- terminal-progress-bar
|
||||
- urbit-atom
|
||||
|
@ -6,12 +6,10 @@
|
||||
-}
|
||||
module Urbit.King.CLI where
|
||||
|
||||
import ClassyPrelude hiding (log)
|
||||
import Urbit.Prelude hiding (log, Parser)
|
||||
import Options.Applicative
|
||||
import Options.Applicative.Help.Pretty
|
||||
|
||||
import Data.Word (Word16)
|
||||
import RIO (LogLevel(..))
|
||||
import System.Environment (getProgName)
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
@ -3,9 +3,8 @@
|
||||
-}
|
||||
module Urbit.King.TryJamPill where
|
||||
|
||||
import ClassyPrelude
|
||||
import Control.Lens
|
||||
import Urbit.Noun
|
||||
import Urbit.Prelude
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
|
@ -2,31 +2,63 @@ module Urbit.Vere.Ames.LaneCache (cache) where
|
||||
|
||||
import Urbit.Prelude
|
||||
|
||||
import qualified Data.HashPSQ as P
|
||||
|
||||
import Urbit.Noun.Time
|
||||
|
||||
expiry :: Gap
|
||||
expiry = (2 * 60) ^. from secs
|
||||
|
||||
bound :: Int
|
||||
bound = 1_000
|
||||
|
||||
-- | An "upside down" time for use as a priority.
|
||||
newtype New = New Wen
|
||||
deriving newtype (Eq, Ord)
|
||||
|
||||
new :: Wen -> New
|
||||
new = New . negate
|
||||
|
||||
wen :: New -> Wen
|
||||
wen (New w) = negate w
|
||||
|
||||
-- | Given a new, find an older new corresponding to `expiry` ago.
|
||||
lag :: New -> New
|
||||
lag n = new (addGap (wen n) expiry)
|
||||
|
||||
trim :: (Hashable a, Ord a, Ord b) => P.HashPSQ a b c -> P.HashPSQ a b c
|
||||
trim p = if P.size p > bound then P.deleteMin p else p
|
||||
|
||||
cache :: forall a b m n
|
||||
. (Ord a, MonadIO m, MonadIO n)
|
||||
. (Ord a, Hashable a, MonadIO m, MonadIO n)
|
||||
=> (a -> m b)
|
||||
-> n (a -> m b)
|
||||
cache act = do
|
||||
cas <- newTVarIO (mempty :: Map a (Wen, b))
|
||||
cas <- newTVarIO (P.empty :: P.HashPSQ a New b)
|
||||
|
||||
let fun x = lookup x <$> readTVarIO cas >>= \case
|
||||
let fun x = P.lookup x <$> readTVarIO cas >>= \case
|
||||
Nothing -> thru
|
||||
Just (t, v) -> do
|
||||
Just (n, v) -> do
|
||||
let t = wen n
|
||||
t' <- io now
|
||||
if gap t' t > expiry
|
||||
then thru
|
||||
else pure v
|
||||
where
|
||||
-- Insert a key into the map, simultaneously removing *all* stale
|
||||
-- entries. Since insertion is linear in the size of the map,
|
||||
-- presumably it's not horrible to do it this way. The alternative
|
||||
-- would be to have a thread doing a purge every 10s or something, and
|
||||
-- then we'd have to be in RAcquire.
|
||||
up :: a -> New -> b -> P.HashPSQ a New b -> P.HashPSQ a New b
|
||||
up k n v ps = trim
|
||||
$ P.insert k n v
|
||||
$ snd $ P.atMostView (lag n) ps
|
||||
thru :: m b
|
||||
thru = do
|
||||
t <- io now
|
||||
n <- new <$> io now
|
||||
v <- act x
|
||||
atomically $ modifyTVar' cas (insertMap x (t, v))
|
||||
atomically $ modifyTVar' cas $ up x n v
|
||||
pure v
|
||||
|
||||
pure fun
|
||||
|
@ -158,8 +158,8 @@ instance Serialize Packet where
|
||||
putByteString body
|
||||
|
||||
where
|
||||
putShipGetRank s@(Ship (LargeKey p q)) = case () of
|
||||
_ | s < 2 ^ 16 -> (0, putWord16le $ fromIntegral s) -- lord
|
||||
| s < 2 ^ 32 -> (1, putWord32le $ fromIntegral s) -- planet
|
||||
| s < 2 ^ 64 -> (2, putWord64le $ fromIntegral s) -- moon
|
||||
| otherwise -> (3, putWord64le p >> putWord64le q) -- comet
|
||||
putShipGetRank (Ship (LargeKey p q)) = case q of
|
||||
0 | p < 2 ^ 16 -> (0, putWord16le $ fromIntegral p) -- lord
|
||||
| p < 2 ^ 32 -> (1, putWord32le $ fromIntegral p) -- planet
|
||||
| otherwise -> (2, putWord64le $ fromIntegral p) -- moon
|
||||
_ -> (3, putWord64le p >> putWord64le q) -- comet
|
||||
|
@ -4,9 +4,8 @@
|
||||
|
||||
module Urbit.Vere.Http where
|
||||
|
||||
import ClassyPrelude
|
||||
import Urbit.Prelude
|
||||
import Urbit.Arvo
|
||||
import Urbit.Noun
|
||||
|
||||
import qualified Data.CaseInsensitive as CI
|
||||
import qualified Network.HTTP.Types as HT
|
||||
|
@ -279,8 +279,8 @@ pier (serf, log) vSlog startedSig injected = do
|
||||
-- TODO Instead of using a TMVar, pull directly from the IO driver
|
||||
-- event sources.
|
||||
computeQ :: TMVar RunReq <- newEmptyTMVarIO
|
||||
persistQ :: TQueue (Fact, FX) <- newTQueueIO
|
||||
executeQ :: TQueue FX <- newTQueueIO
|
||||
persistQ :: TBQueue (Fact, FX) <- newTBQueueIO 10 -- TODO tuning?
|
||||
executeQ :: TBQueue FX <- newTBQueueIO 10
|
||||
saveSig :: TMVar () <- newEmptyTMVarIO
|
||||
kingApi :: King.King <- King.kingAPI
|
||||
|
||||
@ -310,8 +310,8 @@ pier (serf, log) vSlog startedSig injected = do
|
||||
-- the c serf code. Logging output from our haskell process must manually
|
||||
-- add them.
|
||||
let compute = putTMVar computeQ
|
||||
let execute = writeTQueue executeQ
|
||||
let persist = writeTQueue persistQ
|
||||
let execute = writeTBQueue executeQ
|
||||
let persist = writeTBQueue persistQ
|
||||
let sigint = Serf.sendSIGINT serf
|
||||
let scry = \g r -> do
|
||||
res <- newEmptyMVar
|
||||
@ -378,7 +378,7 @@ pier (serf, log) vSlog startedSig injected = do
|
||||
fn (0, textToTank txt)
|
||||
|
||||
drivz <- startDrivers
|
||||
tExec <- acquireWorker "Effects" (router slog (readTQueue executeQ) drivz)
|
||||
tExec <- acquireWorker "Effects" (router slog (readTBQueue executeQ) drivz)
|
||||
tDisk <- acquireWorkerBound "Persist" (runPersist log persistQ execute)
|
||||
|
||||
-- Now that the Serf is configured, the IO drivers are hooked up, their
|
||||
@ -667,12 +667,14 @@ runPersist
|
||||
:: forall e
|
||||
. HasPierEnv e
|
||||
=> EventLog
|
||||
-> TQueue (Fact, FX)
|
||||
-> TBQueue (Fact, FX)
|
||||
-> (FX -> STM ())
|
||||
-> RIO e ()
|
||||
runPersist log inpQ out = do
|
||||
dryRun <- view dryRunL
|
||||
forever $ do
|
||||
-- This is not a memory leak because eventually the TBQueue at out will
|
||||
-- fill up, blocking the loop.
|
||||
writs <- atomically getBatchFromQueue
|
||||
events <- validateFactsAndGetBytes (fst <$> toNullable writs)
|
||||
unless dryRun (Log.appendEvents log events)
|
||||
@ -690,9 +692,11 @@ runPersist log inpQ out = do
|
||||
pure $ buildLogEvent mug $ toNoun (wen, non)
|
||||
pure (fromList lis)
|
||||
|
||||
-- Read as much out of the queue as possible (i.e. the entire contents),
|
||||
-- blocking if empty.
|
||||
getBatchFromQueue :: STM (NonNull [(Fact, FX)])
|
||||
getBatchFromQueue = readTQueue inpQ >>= go . singleton
|
||||
getBatchFromQueue = readTBQueue inpQ >>= go . singleton
|
||||
where
|
||||
go acc = tryReadTQueue inpQ >>= \case
|
||||
go acc = tryReadTBQueue inpQ >>= \case
|
||||
Nothing -> pure (reverse acc)
|
||||
Just item -> go (item <| acc)
|
||||
|
@ -9,7 +9,7 @@ module Urbit.Vere.Term.Render
|
||||
, soundBell
|
||||
) where
|
||||
|
||||
import ClassyPrelude
|
||||
import Urbit.Prelude
|
||||
|
||||
import qualified System.Console.ANSI as ANSI
|
||||
|
||||
|
@ -79,6 +79,7 @@ dependencies:
|
||||
- pretty-show
|
||||
- primitive
|
||||
- process
|
||||
- psqueues
|
||||
- QuickCheck
|
||||
- racquire
|
||||
- random
|
||||
|
@ -30,6 +30,7 @@ newtype Parser a = Parser {
|
||||
runParser :: forall r. ParseStack -> Failure r -> Success a r -> r
|
||||
}
|
||||
|
||||
{-# INLINE named #-} -- keep out of the cost centers
|
||||
named :: Text -> Parser a -> Parser a
|
||||
named nm (Parser cb) =
|
||||
Parser $ \path kf ks -> cb (nm:path) kf ks
|
||||
|
@ -84,15 +84,15 @@ flush = Put $ \tbl s@S{..} -> do
|
||||
|
||||
{-# INLINE update #-}
|
||||
update :: (S -> S) -> Put ()
|
||||
update f = Put $ \tbl s@S{..} -> pure (PutResult (f s) ())
|
||||
update f = Put $ \tbl s@S{} -> pure (PutResult (f s) ())
|
||||
|
||||
{-# INLINE setRegOff #-}
|
||||
setRegOff :: Word -> Int -> Put ()
|
||||
setRegOff r o = update $ \s@S{..} -> (s {reg=r, off=o})
|
||||
setRegOff r o = update $ \s@S{} -> (s {reg=r, off=o})
|
||||
|
||||
{-# INLINE setReg #-}
|
||||
setReg :: Word -> Put ()
|
||||
setReg r = update $ \s@S{..} -> (s { reg=r })
|
||||
setReg r = update $ \s@S{} -> (s { reg=r })
|
||||
|
||||
{-# INLINE getS #-}
|
||||
getS :: Put S
|
||||
|
@ -110,9 +110,22 @@ deriveToNounFunc tyName = do
|
||||
|
||||
addErrTag :: String -> Exp -> Exp
|
||||
addErrTag tag exp =
|
||||
InfixE (Just $ AppE (VarE 'named) str) (VarE (mkName ".")) (Just exp)
|
||||
-- This spurious let is inserted so we can get better cost center data
|
||||
-- during heap profiling.
|
||||
LetE [ValD (VarP nom) (NormalB nam) []]
|
||||
$ InfixE (Just $ VarE nom) (VarE (mkName ".")) (Just exp)
|
||||
where
|
||||
-- XX arguably we should use newName rather than mkName here
|
||||
nom = mkName $ "named_" ++ filter C.isAlphaNum tag
|
||||
str = LitE $ StringL tag
|
||||
nam = LamE [VarP $ mkName "x"] $ AppE (AppE (VarE 'named) str)
|
||||
$ VarE (mkName "x")
|
||||
|
||||
addCostCenter :: String -> Exp -> Exp
|
||||
addCostCenter tag exp =
|
||||
LetE [ValD (VarP nom) (NormalB exp) []] (VarE nom)
|
||||
where
|
||||
nom = mkName $ "scc_" ++ filter C.isAlphaNum tag
|
||||
|
||||
deriveFromNoun :: Name -> Q [Dec]
|
||||
deriveFromNoun tyName = do
|
||||
@ -124,7 +137,8 @@ deriveFromNoun tyName = do
|
||||
let ty = foldl' (\acc v -> AppT acc (VarT v)) (ConT tyName) params
|
||||
|
||||
let overlap = Nothing
|
||||
body = NormalB (addErrTag (nameStr tyName) exp)
|
||||
body = NormalB (addCostCenter nom $ addErrTag nom exp)
|
||||
nom = nameStr tyName
|
||||
ctx = params <&> \t -> AppT (ConT ''FromNoun) (VarT t)
|
||||
inst = AppT (ConT ''FromNoun) ty
|
||||
|
||||
@ -214,12 +228,13 @@ taggedFromNoun cons = LamE [VarP n] (DoE [getHead, getTag, examine])
|
||||
|
||||
matches = mkMatch <$> cons
|
||||
mkMatch = \(tag, (n, tys)) ->
|
||||
let body = AppE (addErrTag ('%':tag) (tupFromNoun (n, tys)))
|
||||
let body = addCostCenter tag
|
||||
$ AppE (addErrTag ('%':tag) (tupFromNoun (n, tys)))
|
||||
(VarE t)
|
||||
in Match (LitP $ StringL tag) (NormalB body) []
|
||||
|
||||
fallback = Match WildP (NormalB $ AppE (VarE 'fail) matchFail) []
|
||||
matchFail = unexpectedTag (fst <$> cons) (VarE c)
|
||||
matchFail = addCostCenter "matchFail" $ unexpectedTag (fst <$> cons) (VarE c)
|
||||
|
||||
tagFail = LitE $ StringL (intercalate " " (('%':) <$> (fst <$> cons)))
|
||||
|
||||
|
@ -1,3 +1,5 @@
|
||||
{-# LANGUAGE StrictData #-}
|
||||
|
||||
{-|
|
||||
Large Library of conversion between various types and Nouns.
|
||||
-}
|
||||
@ -29,7 +31,7 @@ import Urbit.Noun.Convert
|
||||
import Urbit.Noun.Core
|
||||
import Urbit.Noun.TH
|
||||
|
||||
import Data.LargeWord (LargeKey, Word128, Word256)
|
||||
import Data.LargeWord (LargeKey(..), Word128, Word256)
|
||||
import GHC.Exts (chr#, isTrue#, leWord#, word2Int#)
|
||||
import GHC.Natural (Natural)
|
||||
import GHC.Types (Char(C#))
|
||||
@ -600,6 +602,9 @@ newtype Ship = Ship Word128 -- @p
|
||||
instance Show Ship where
|
||||
show = show . patp . fromIntegral
|
||||
|
||||
instance Hashable Ship where
|
||||
hashWithSalt s (Ship (LargeKey a b)) = s `hashWithSalt` a `hashWithSalt` b
|
||||
|
||||
|
||||
-- Path ------------------------------------------------------------------------
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user