diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs index 2d7b6e3b82..bf47ed113e 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs @@ -26,10 +26,12 @@ import Urbit.Arvo import Urbit.King.Config import Urbit.Vere.Pier.Types import Control.Monad.Trans.Maybe +import Data.Conduit import Data.Text (append) import System.Posix.Files (ownerModes, setFileMode) import Urbit.King.App (HasConfigDir(..), HasStderrLogFunc(..)) +import Urbit.Time (Wen) import Urbit.Vere.Ames (ames) import Urbit.Vere.Behn (behn) import Urbit.Vere.Clay (clay) @@ -37,7 +39,6 @@ import Urbit.Vere.Http.Client (client) import Urbit.Vere.Http.Server (serv) import Urbit.Vere.Log (EventLog) import Urbit.Vere.Serf (Serf, SerfState(..)) -import Data.Conduit import qualified System.Entropy as Ent import qualified Urbit.King.API as King @@ -127,21 +128,6 @@ bootSeqJobs now (BootSeq ident nocks ovums) = zipWith ($) bootSeqFns [1 ..] muckNock nok eId = RunNok $ LifeCyc eId 0 nok muckOvum ov eId = DoWork $ Work eId 0 (wen eId) ov -{- - loop :: [Job] -> SerfState -> Maybe (ProgressBar ()) -> [BootSeqFn] - -> RIO e ([Job], SerfState) - loop acc ss pb = \case - [] -> do - pb <- logStderr (updateProgressBar 0 bootMsg pb) - pure (reverse acc, ss) - x:xs -> do - wen <- io Time.now - job <- pure $ x (ssNextEv ss) (ssLastMug ss) wen - pb <- logStderr (updateProgressBar (1 + length xs) bootMsg pb) - (job, ss) <- bootJob serf job - loop (job:acc) ss pb xs --} - bootNewShip :: (HasPierConfig e, HasStderrLogFunc e, HasLogFunc e) => Pill @@ -223,6 +209,9 @@ getSnapshot top last = do acquireWorker :: RIO e () -> RAcquire e (Async ()) acquireWorker act = mkRAcquire (async act) cancel +acquireWorkerBound :: RIO e () -> RAcquire e (Async ()) +acquireWorkerBound act = mkRAcquire (asyncBound act) cancel + pier :: ∀e. (HasConfigDir e, HasLogFunc e, HasNetworkConfig e, HasPierConfig e) => (Serf, EventLog) -> TVar (Text -> IO ()) @@ -235,7 +224,7 @@ pier (serf, log) vStderr mStart = do saveM <- newEmptyTMVarIO shutdownM <- newEmptyTMVarIO - kapi ← King.kingAPI + kapi <- King.kingAPI termApiQ <- atomically $ do q <- newTQueue @@ -317,15 +306,15 @@ pier (serf, log) vStderr mStart = do death :: Text -> Async () -> STM (Either (Text, SomeException) Text) death tag tid = do waitCatchSTM tid <&> \case - Left exn -> Left (tag, exn) - Right () -> Right tag + Left exn -> Left (tag, exn) + Right () -> Right tag saveSignalThread :: TMVar () -> RAcquire e (Async ()) saveSignalThread tm = mkRAcquire start cancel - where - start = async $ forever $ do - threadDelay (120 * 1000000) -- 120 seconds - atomically $ putTMVar tm () + where + start = async $ forever $ do + threadDelay (120 * 1000000) -- 120 seconds + atomically $ putTMVar tm () -- Start All Drivers ----------------------------------------------------------- @@ -427,10 +416,10 @@ runCompute -> STM () -> (Maybe Text -> STM ()) -> STM () - -> ((Job, FX) -> STM ()) + -> ((Fact, FX) -> STM ()) -> RAcquire e (Async ()) runCompute serf getEvent getSaveSignal getShutdownSignal showSpinner hideSpinner putResult = do - mkRAcquire (async $ newRunCompute serf config) cancel + acquireWorker (newRunCompute serf config) where config = ComputeConfig { ccOnWork = getEvent @@ -441,17 +430,6 @@ runCompute serf getEvent getSaveSignal getShutdownSignal showSpinner hideSpinner , ccHideSpinner = hideSpinner } --- data RunOutput = RunOutput EventId Mug Wen (Either Noun Ev) [Ef] --- data Work = Work EventId Mug Wen Ev - -{- -data ComputeRequest - = CREvent Ev (Serf.RunError -> IO ()) - | CRSave () - | CRShutdown () - deriving (Eq, Show) --} - {- TODO Pack and Peek -} @@ -486,11 +464,18 @@ fromRightErr :: Either a b -> IO b fromRightErr (Left l) = error "unexpected Left value" fromRightErr (Right r) = pure r +data Fact = Fact + { factEve :: EventId + , factMug :: Mug + , factWen :: Wen + , factNon :: Noun + } + data ComputeConfig = ComputeConfig { ccOnWork :: STM (Ev, Serf.RunError -> IO ()) , ccOnKill :: STM () , ccOnSave :: STM () - , ccPutResult :: (Job, FX) -> STM () + , ccPutResult :: (Fact, FX) -> STM () , ccShowSpinner :: Maybe Text -> STM () , ccHideSpinner :: STM () } @@ -509,9 +494,7 @@ newRunCompute serf ComputeConfig {..} = do Nothing -> pure () Just (Serf.RunOutput e m w nounEv fx) -> do lift $ logTrace "newRunCompute: Got play result" - ev <- io $ fromRightErr nounEv -- TODO - let job :: Job = DoWork $ Work e m w ev - atomically (ccPutResult ((job, GoodParse <$> fx))) -- TODO GoodParse + atomically $ ccPutResult (Fact e m w nounEv, GoodParse <$> fx) -- TODO GoodParse sendResults onStatusChange :: Maybe Serf.RunInput -> STM () @@ -521,24 +504,6 @@ newRunCompute serf ComputeConfig {..} = do _ -> pure () -{- - FIND ME - - send event - push event - start spinner - hook for when event starts running - hook for when no event is running - send another event - first event is done - push to persistQ - update spinner to event #2 - second event is done - push to executeQ - remove spinner --} - - -- Persist Thread -------------------------------------------------------------- data PersistExn = BadEventId EventId EventId @@ -550,43 +515,38 @@ instance Exception PersistExn where , "\tExpected " <> show expected <> " but got " <> show got ] -runPersist :: ∀e. (HasPierConfig e, HasLogFunc e) - => EventLog - -> TQueue (Job, FX) - -> (FX -> STM ()) - -> RAcquire e (Async ()) -runPersist log inpQ out = - mkRAcquire runThread cancel - where - runThread :: RIO e (Async ()) - runThread = asyncBound $ do - dryRun <- view dryRunL - forever $ do - writs <- atomically getBatchFromQueue - unless dryRun $ do - events <- validateJobsAndGetBytes (toNullable writs) - Log.appendEvents log events - atomically $ for_ writs $ \(_,fx) -> out fx +runPersist + :: forall e + . (HasPierConfig e, HasLogFunc e) + => EventLog + -> TQueue (Fact, FX) + -> (FX -> STM ()) + -> RAcquire e (Async ()) +runPersist log inpQ out = mkRAcquire runThread cancel + where + runThread :: RIO e (Async ()) + runThread = asyncBound $ do + dryRun <- view dryRunL + forever $ do + writs <- atomically getBatchFromQueue + events <- validateFactsAndGetBytes (fst <$> toNullable writs) + unless dryRun (Log.appendEvents log events) + atomically $ for_ writs $ \(_, fx) -> do + out fx - validateJobsAndGetBytes :: [(Job, FX)] -> RIO e (Vector ByteString) - validateJobsAndGetBytes writs = do - expect <- Log.nextEv log - fmap fromList - $ for (zip [expect..] writs) - $ \(expectedId, (j, fx)) -> do - unless (expectedId == jobId j) $ - throwIO (BadEventId expectedId (jobId j)) - case j of - RunNok _ -> - error "This shouldn't happen here!" - DoWork (Work eId mug wen ev) -> - pure $ jamBS $ toNoun (mug, wen, ev) + validateFactsAndGetBytes :: [Fact] -> RIO e (Vector ByteString) + validateFactsAndGetBytes facts = do + expect <- Log.nextEv log + lis <- for (zip [expect ..] facts) $ \(expectedId, Fact eve mug wen non) -> + do + unless (expectedId == eve) $ do + throwIO (BadEventId expectedId eve) + pure $ jamBS $ toNoun (mug, wen, non) + pure (fromList lis) - getBatchFromQueue :: STM (NonNull [(Job, FX)]) - getBatchFromQueue = - readTQueue inpQ >>= go . singleton - where - go acc = - tryReadTQueue inpQ >>= \case - Nothing -> pure (reverse acc) - Just item -> go (item <| acc) + getBatchFromQueue :: STM (NonNull [(Fact, FX)]) + getBatchFromQueue = readTQueue inpQ >>= go . singleton + where + go acc = tryReadTQueue inpQ >>= \case + Nothing -> pure (reverse acc) + Just item -> go (item <| acc) diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs index ee06b96700..f4b2df3830 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs @@ -167,7 +167,7 @@ data RunInput | RunPeek Wen Gang Path (Maybe (Term, Noun) -> IO ()) | RunWork Ev (RunError -> IO ()) -data RunOutput = RunOutput EventId Mug Wen (Either Noun Ev) [Ef] +data RunOutput = RunOutput EventId Mug Wen Noun [Ef] -- Exceptions ------------------------------------------------------------------ @@ -432,11 +432,11 @@ running serf notice = do io (sendWrit serf (WWork wen evn)) io (recvWork serf) >>= \case WDone eid hash fx -> do - yield (RunOutput eid hash wen (Right evn) fx) + yield (RunOutput eid hash wen (toNoun evn) fx) loop hash eid WSwap eid hash (wen, noun) fx -> do io $ err (RunSwap eid hash wen noun fx) - yield (RunOutput eid hash wen (Left noun) fx) + yield (RunOutput eid hash wen noun fx) loop hash eid WBail goofs -> do io $ err (RunBail goofs)