From c05b5ecdc083c08ec18b32d847f31ac9c39a401d Mon Sep 17 00:00:00 2001 From: pilfer-pandex <47340789+pilfer-pandex@users.noreply.github.com> Date: Mon, 30 Aug 2021 20:01:24 -0400 Subject: [PATCH] king: fix unbounded queues in Pier.hs; how did I miss this --- pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs | 26 ++++++++++++++---------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs index 5600496792..494a22e910 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs @@ -267,11 +267,11 @@ pier (serf, log) vSlog startedSig injected = do -- TODO Instead of using a TMVar, pull directly from the IO driver -- event sources. - computeQ :: TMVar RunReq <- newEmptyTMVarIO - persistQ :: TQueue (Fact, FX) <- newTQueueIO - executeQ :: TQueue FX <- newTQueueIO - saveSig :: TMVar () <- newEmptyTMVarIO - kingApi :: King.King <- King.kingAPI + computeQ :: TMVar RunReq <- newEmptyTMVarIO + persistQ :: TBQueue (Fact, FX) <- newTBQueueIO 10 -- TODO tuning? + executeQ :: TBQueue FX <- newTBQueueIO 10 + saveSig :: TMVar () <- newEmptyTMVarIO + kingApi :: King.King <- King.kingAPI termApiQ :: TQueue TermConn <- atomically $ do q <- newTQueue @@ -299,8 +299,8 @@ pier (serf, log) vSlog startedSig injected = do -- the c serf code. Logging output from our haskell process must manually -- add them. let compute = putTMVar computeQ - let execute = writeTQueue executeQ - let persist = writeTQueue persistQ + let execute = writeTBQueue executeQ + let persist = writeTBQueue persistQ let sigint = Serf.sendSIGINT serf let scry = \g r -> do res <- newEmptyMVar @@ -367,7 +367,7 @@ pier (serf, log) vSlog startedSig injected = do fn (0, textToTank txt) drivz <- startDrivers - tExec <- acquireWorker "Effects" (router slog (readTQueue executeQ) drivz) + tExec <- acquireWorker "Effects" (router slog (readTBQueue executeQ) drivz) tDisk <- acquireWorkerBound "Persist" (runPersist log persistQ execute) -- Now that the Serf is configured, the IO drivers are hooked up, their @@ -656,12 +656,14 @@ runPersist :: forall e . HasPierEnv e => EventLog - -> TQueue (Fact, FX) + -> TBQueue (Fact, FX) -> (FX -> STM ()) -> RIO e () runPersist log inpQ out = do dryRun <- view dryRunL forever $ do + -- This is not a memory leak because eventually the TBQueue at out will + -- fill up, blocking the loop. writs <- atomically getBatchFromQueue events <- validateFactsAndGetBytes (fst <$> toNullable writs) unless dryRun (Log.appendEvents log events) @@ -679,9 +681,11 @@ runPersist log inpQ out = do pure $ buildLogEvent mug $ toNoun (wen, non) pure (fromList lis) + -- Read as much out of the queue as possible (i.e. the entire contents), + -- blocking if empty. getBatchFromQueue :: STM (NonNull [(Fact, FX)]) - getBatchFromQueue = readTQueue inpQ >>= go . singleton + getBatchFromQueue = readTBQueue inpQ >>= go . singleton where - go acc = tryReadTQueue inpQ >>= \case + go acc = tryReadTBQueue inpQ >>= \case Nothing -> pure (reverse acc) Just item -> go (item <| acc)