king: fix unbounded queues in Pier.hs; how did I miss this

This commit is contained in:
pilfer-pandex 2021-08-30 20:01:24 -04:00
parent 32a4c8a375
commit c05b5ecdc0

View File

@ -267,11 +267,11 @@ pier (serf, log) vSlog startedSig injected = do
-- TODO Instead of using a TMVar, pull directly from the IO driver -- TODO Instead of using a TMVar, pull directly from the IO driver
-- event sources. -- event sources.
computeQ :: TMVar RunReq <- newEmptyTMVarIO computeQ :: TMVar RunReq <- newEmptyTMVarIO
persistQ :: TQueue (Fact, FX) <- newTQueueIO persistQ :: TBQueue (Fact, FX) <- newTBQueueIO 10 -- TODO tuning?
executeQ :: TQueue FX <- newTQueueIO executeQ :: TBQueue FX <- newTBQueueIO 10
saveSig :: TMVar () <- newEmptyTMVarIO saveSig :: TMVar () <- newEmptyTMVarIO
kingApi :: King.King <- King.kingAPI kingApi :: King.King <- King.kingAPI
termApiQ :: TQueue TermConn <- atomically $ do termApiQ :: TQueue TermConn <- atomically $ do
q <- newTQueue q <- newTQueue
@ -299,8 +299,8 @@ pier (serf, log) vSlog startedSig injected = do
-- the c serf code. Logging output from our haskell process must manually -- the c serf code. Logging output from our haskell process must manually
-- add them. -- add them.
let compute = putTMVar computeQ let compute = putTMVar computeQ
let execute = writeTQueue executeQ let execute = writeTBQueue executeQ
let persist = writeTQueue persistQ let persist = writeTBQueue persistQ
let sigint = Serf.sendSIGINT serf let sigint = Serf.sendSIGINT serf
let scry = \g r -> do let scry = \g r -> do
res <- newEmptyMVar res <- newEmptyMVar
@ -367,7 +367,7 @@ pier (serf, log) vSlog startedSig injected = do
fn (0, textToTank txt) fn (0, textToTank txt)
drivz <- startDrivers drivz <- startDrivers
tExec <- acquireWorker "Effects" (router slog (readTQueue executeQ) drivz) tExec <- acquireWorker "Effects" (router slog (readTBQueue executeQ) drivz)
tDisk <- acquireWorkerBound "Persist" (runPersist log persistQ execute) tDisk <- acquireWorkerBound "Persist" (runPersist log persistQ execute)
-- Now that the Serf is configured, the IO drivers are hooked up, their -- Now that the Serf is configured, the IO drivers are hooked up, their
@ -656,12 +656,14 @@ runPersist
:: forall e :: forall e
. HasPierEnv e . HasPierEnv e
=> EventLog => EventLog
-> TQueue (Fact, FX) -> TBQueue (Fact, FX)
-> (FX -> STM ()) -> (FX -> STM ())
-> RIO e () -> RIO e ()
runPersist log inpQ out = do runPersist log inpQ out = do
dryRun <- view dryRunL dryRun <- view dryRunL
forever $ do forever $ do
-- This is not a memory leak because eventually the TBQueue at out will
-- fill up, blocking the loop.
writs <- atomically getBatchFromQueue writs <- atomically getBatchFromQueue
events <- validateFactsAndGetBytes (fst <$> toNullable writs) events <- validateFactsAndGetBytes (fst <$> toNullable writs)
unless dryRun (Log.appendEvents log events) unless dryRun (Log.appendEvents log events)
@ -679,9 +681,11 @@ runPersist log inpQ out = do
pure $ buildLogEvent mug $ toNoun (wen, non) pure $ buildLogEvent mug $ toNoun (wen, non)
pure (fromList lis) pure (fromList lis)
-- Read as much out of the queue as possible (i.e. the entire contents),
-- blocking if empty.
getBatchFromQueue :: STM (NonNull [(Fact, FX)]) getBatchFromQueue :: STM (NonNull [(Fact, FX)])
getBatchFromQueue = readTQueue inpQ >>= go . singleton getBatchFromQueue = readTBQueue inpQ >>= go . singleton
where where
go acc = tryReadTQueue inpQ >>= \case go acc = tryReadTBQueue inpQ >>= \case
Nothing -> pure (reverse acc) Nothing -> pure (reverse acc)
Just item -> go (item <| acc) Just item -> go (item <| acc)