From 95df4b0764ff40b9a1148cd9e3ab0e922ef44800 Mon Sep 17 00:00:00 2001 From: ~siprel Date: Sat, 6 Jun 2020 23:34:27 +0000 Subject: [PATCH] king: Pier cleanup pass. --- pkg/hs/urbit-king/lib/Urbit/King/Main.hs | 2 +- pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs | 8 +- pkg/hs/urbit-king/lib/Urbit/Vere/Behn.hs | 6 +- pkg/hs/urbit-king/lib/Urbit/Vere/Clay.hs | 8 +- pkg/hs/urbit-king/lib/Urbit/Vere/Eyre.hs | 8 +- .../urbit-king/lib/Urbit/Vere/Http/Client.hs | 10 +- pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs | 304 +++++++++--------- .../urbit-king/lib/Urbit/Vere/Pier/Types.hs | 10 +- pkg/hs/urbit-king/lib/Urbit/Vere/Term.hs | 6 +- 9 files changed, 180 insertions(+), 182 deletions(-) diff --git a/pkg/hs/urbit-king/lib/Urbit/King/Main.hs b/pkg/hs/urbit-king/lib/Urbit/King/Main.hs index 9af3a4f14a..25fd340078 100644 --- a/pkg/hs/urbit-king/lib/Urbit/King/Main.hs +++ b/pkg/hs/urbit-king/lib/Urbit/King/Main.hs @@ -383,7 +383,7 @@ testPill pax showPil showSeq = do pill <- fromNounErr pillNoun & either (throwIO . uncurry ParseErr) pure logTrace "Using pill to generate boot sequence." - bootSeq <- generateBootSeq (Ship 0) pill False (Fake $ Ship 0) + bootSeq <- genBootSeq (Ship 0) pill False (Fake (Ship 0)) logTrace "Validate jam/cue and toNoun/fromNoun on pill value" reJam <- validateNounVal pill diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs index 8c5ccbd9de..0c73673899 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Ames.hs @@ -118,7 +118,7 @@ ames -> Bool -> (EvErr -> STM ()) -> (Text -> RIO e ()) - -> ([EvErr], RAcquire e (EffCb e NewtEf)) + -> ([EvErr], RAcquire e (NewtEf -> IO ())) ames env who isFake enqueueEv stderr = (initialEvents, runAmes) where king = fromIntegral (env ^. kingIdL) @@ -126,7 +126,7 @@ ames env who isFake enqueueEv stderr = (initialEvents, runAmes) initialEvents :: [EvErr] initialEvents = [EvErr (bornEv king) (bornFailed env)] - runAmes :: RAcquire e (EffCb e NewtEf) + runAmes :: RAcquire e (NewtEf -> IO ()) runAmes = do mode <- rio (netMode isFake) drv <- mkRAcquire start stop @@ -153,8 +153,8 @@ ames env who isFake enqueueEv stderr = (initialEvents, runAmes) rsKill aResolvr cancel aRecvTid - handleEffect :: AmesDrv -> NetworkMode -> NewtEf -> RIO e () - handleEffect drv@AmesDrv {..} mode = \case + handleEffect :: AmesDrv -> NetworkMode -> NewtEf -> IO () + handleEffect drv@AmesDrv {..} mode = runRIO env . \case NewtEfTurf (_id, ()) turfs -> do atomically $ writeTVar aTurfs (Just turfs) diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Behn.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Behn.hs index cf58bc1803..91d6fb70d6 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Behn.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Behn.hs @@ -34,7 +34,7 @@ wakeErr :: WorkError -> IO () wakeErr _ = pure () behn - :: HasKingId e => e -> (EvErr -> STM ()) -> ([EvErr], Acquire (EffCb e BehnEf)) + :: HasKingId e => e -> (EvErr -> STM ()) -> ([EvErr], Acquire (BehnEf -> IO ())) behn env enqueueEv = (initialEvents, runBehn) where @@ -42,10 +42,10 @@ behn env enqueueEv = initialEvents = [EvErr (bornEv king) (bornFailed env)] - runBehn :: Acquire (EffCb e BehnEf) + runBehn :: Acquire (BehnEf -> IO ()) runBehn = do tim <- mkAcquire Timer.init Timer.stop - pure (handleEf tim) + pure (runRIO env . handleEf tim) handleEf :: Timer -> BehnEf -> RIO e () handleEf b = io . \case diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Clay.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Clay.hs index 353ad47975..00d9a18dd1 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Clay.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Clay.hs @@ -122,7 +122,7 @@ clay . (HasPierConfig e, HasLogFunc e, HasKingId e) => e -> (EvErr -> STM ()) - -> ([EvErr], RAcquire e (EffCb e SyncEf)) + -> ([EvErr], RAcquire e (SyncEf -> IO ())) clay env plan = (initialEvents, runSync) where @@ -134,15 +134,15 @@ clay env plan = -- specified directory and shove it into an %into event. initialEvents = [EvErr boatEv (boatFailed env)] - runSync :: RAcquire e (EffCb e SyncEf) + runSync :: RAcquire e (SyncEf -> IO ()) runSync = handleEffect <$> mkRAcquire start stop start :: RIO e ClayDrv start = ClayDrv <$> newTVarIO mempty stop c = pure () - handleEffect :: ClayDrv -> SyncEf -> RIO e () - handleEffect cd = \case + handleEffect :: ClayDrv -> SyncEf -> IO () + handleEffect cd = runRIO env . \case SyncEfHill _ mountPoints -> do logDebug $ displayShow ("(clay) known mount points:", mountPoints) pierPath <- view pierPathL diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Eyre.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Eyre.hs index 34df63330d..7ef6180c8f 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Eyre.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Eyre.hs @@ -287,7 +287,7 @@ eyre -> Ship -> (EvErr -> STM ()) -> Bool - -> ([EvErr], RAcquire e (EffCb e HttpServerEf)) + -> ([EvErr], RAcquire e (HttpServerEf -> IO ())) eyre env multi who plan isFake = (initialEvents, runHttpServer) where king = fromIntegral (env ^. kingIdL) @@ -295,7 +295,7 @@ eyre env multi who plan isFake = (initialEvents, runHttpServer) initialEvents :: [EvErr] initialEvents = [EvErr (bornEv king) (bornFailed env)] - runHttpServer :: RAcquire e (EffCb e HttpServerEf) + runHttpServer :: RAcquire e (HttpServerEf -> IO ()) runHttpServer = handleEf <$> mkRAcquire (Drv <$> newMVar Nothing) (\(Drv v) -> stopService v kill >>= fromEither) @@ -318,8 +318,8 @@ eyre env multi who plan isFake = (initialEvents, runHttpServer) liveFailed _ = pure () - handleEf :: Drv -> HttpServerEf -> RIO e () - handleEf drv = \case + handleEf :: Drv -> HttpServerEf -> IO () + handleEf drv = runRIO env . \case HSESetConfig (i, ()) conf -> do logDebug (displayShow ("EYRE", "%set-config")) Serv {..} <- restart drv conf diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Http/Client.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Http/Client.hs index 0bf9abf38d..1cd1007f8f 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Http/Client.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Http/Client.hs @@ -66,7 +66,7 @@ client . (HasLogFunc e, HasKingId e) => e -> (EvErr -> STM ()) - -> ([EvErr], RAcquire e (EffCb e HttpClientEf)) + -> ([EvErr], RAcquire e (HttpClientEf -> IO ())) client env plan = (initialEvents, runHttpClient) where kingId = view (kingIdL . to fromIntegral) env @@ -74,7 +74,7 @@ client env plan = (initialEvents, runHttpClient) initialEvents :: [EvErr] initialEvents = [EvErr (bornEv kingId) (bornFailed env)] - runHttpClient :: RAcquire e (EffCb e HttpClientEf) + runHttpClient :: RAcquire e (HttpClientEf -> IO ()) runHttpClient = handleEffect <$> mkRAcquire start stop start :: RIO e (HttpClientDrv) @@ -88,10 +88,10 @@ client env plan = (initialEvents, runHttpClient) liveThreads <- atomically $ readTVar hcdLive mapM_ cancel liveThreads - handleEffect :: HttpClientDrv -> HttpClientEf -> RIO e () + handleEffect :: HttpClientDrv -> HttpClientEf -> IO () handleEffect drv = \case - HCERequest _ id req -> newReq drv id req - HCECancelRequest _ id -> cancelReq drv id + HCERequest _ id req -> runRIO env (newReq drv id req) + HCECancelRequest _ id -> runRIO env (cancelReq drv id) newReq :: HttpClientDrv -> ReqId -> HttpClientReq -> RIO e () newReq drv id req = do diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs index 5a380999ff..2a4666c5da 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs @@ -12,7 +12,7 @@ module Urbit.Vere.Pier , pier , runPersist , runCompute - , generateBootSeq + , genBootSeq ) where @@ -49,7 +49,7 @@ import qualified Urbit.Vere.Term.Demux as Term import qualified Urbit.Vere.Term.Render as Term --------------------------------------------------------------------------------- +-- Initialize pier directory. -------------------------------------------------- setupPierDirectory :: FilePath -> RIO e () setupPierDirectory shipPath = do @@ -64,8 +64,8 @@ setupPierDirectory shipPath = do genEntropy :: RIO e Word512 genEntropy = fromIntegral . bytesAtom <$> io (Ent.getEntropy 64) -generateBootSeq :: Ship -> Pill -> Bool -> LegacyBootEvent -> RIO e BootSeq -generateBootSeq ship Pill {..} lite boot = do +genBootSeq :: Ship -> Pill -> Bool -> LegacyBootEvent -> RIO e BootSeq +genBootSeq ship Pill {..} lite boot = do ent <- genEntropy let ovums = preKern ent <> pKernelOvums <> postKern <> pUserspaceOvums pure $ BootSeq ident pBootFormulas ovums @@ -100,7 +100,7 @@ writeJobs log !jobs = do jobPayload (DoWork (Work _ m d o )) = toNoun (m, d, o) --- Boot a new ship. ------------------------------------------------------------ +-- Acquire a running serf. ----------------------------------------------------- printTank :: (Text -> IO ()) -> Atom -> Tank -> IO () printTank f _ = io . f . unlines . fmap unTape . wash (WashCfg 0 80) @@ -125,6 +125,9 @@ runSerf vSlog pax fax = do , scDead = pure () -- TODO: What can be done? } + +-- Boot a new ship. ------------------------------------------------------------ + booted :: TVar (Text -> IO ()) -> Pill @@ -158,7 +161,7 @@ bootNewShip -> LegacyBootEvent -> RIO e () bootNewShip pill lite flags ship bootEv = do - seq@(BootSeq ident x y) <- generateBootSeq ship pill lite bootEv + seq@(BootSeq ident x y) <- genBootSeq ship pill lite bootEv logTrace "BootSeq Computed" pierPath <- view pierPathL @@ -203,22 +206,22 @@ resumed vSlog replayUntil flags = do pure (serf, log) -getSnapshot :: forall e. FilePath -> Word64 -> RIO e (Maybe FilePath) +getSnapshot :: forall e . FilePath -> Word64 -> RIO e (Maybe FilePath) getSnapshot top last = do - lastSnapshot <- lastMay <$> listReplays - pure (replayToPath <$> lastSnapshot) - where - replayDir = top ".partial-replay" - replayToPath eId = replayDir show eId + lastSnapshot <- lastMay <$> listReplays + pure (replayToPath <$> lastSnapshot) + where + replayDir = top ".partial-replay" + replayToPath eId = replayDir show eId - listReplays :: RIO e [Word64] - listReplays = do - createDirectoryIfMissing True replayDir - snapshotNums <- mapMaybe readMay <$> listDirectory replayDir - pure $ sort (filter (<= fromIntegral last) snapshotNums) + listReplays :: RIO e [Word64] + listReplays = do + createDirectoryIfMissing True replayDir + snapshotNums <- mapMaybe readMay <$> listDirectory replayDir + pure $ sort (filter (<= fromIntegral last) snapshotNums) --- Run Pier -------------------------------------------------------------------- +-- Utils for Spawning Worker Threads ------------------------------------------- acquireWorker :: HasLogFunc e => Text -> RIO e () -> RAcquire e (Async ()) acquireWorker nam act = mkRAcquire (async act) kill @@ -236,6 +239,9 @@ acquireWorkerBound nam act = mkRAcquire (asyncBound act) kill cancel tid logTrace ("Killed worker thread: " <> display nam) + +-- Run Pier -------------------------------------------------------------------- + pier :: (Serf, EventLog) -> TVar (Text -> IO ()) @@ -243,112 +249,106 @@ pier -> MultiEyreApi -> RAcquire PierEnv () pier (serf, log) vSlog mStart multi = do - computeQ <- newTQueueIO @_ @Serf.EvErr - persistQ <- newTQueueIO - executeQ <- newTQueueIO - saveM <- newEmptyTMVarIO + computeQ <- newTQueueIO @_ @Serf.EvErr + persistQ <- newTQueueIO + executeQ <- newTQueueIO + saveM <- newEmptyTMVarIO + kingApi <- King.kingAPI - kapi <- King.kingAPI + termApiQ <- atomically $ do + q <- newTQueue + writeTVar (King.kTermConn kingApi) (Just $ writeTQueue q) + pure q - termApiQ <- atomically $ do - q <- newTQueue - writeTVar (King.kTermConn kapi) (Just $ writeTQueue q) - pure q + (demux, muxed) <- atomically $ do + res <- Term.mkDemux + pure (res, Term.useDemux res) - -- (sz, local) <- Term.localClient - - -- (waitExternalTerm, termServPort) <- Term.termServer - - (demux, muxed) <- atomically $ do - res <- Term.mkDemux - -- Term.addDemux local res - pure (res, Term.useDemux res) - - -- rio $ logInfo $ display $ - -- "TERMSERV Terminal Server running on port: " <> tshow termServPort - - acquireWorker "TERMINAL" $ forever $ do - logTrace "TERMSERV Waiting for external terminal." - atomically $ do - ext <- Term.connClient <$> readTQueue termApiQ - Term.addDemux ext demux - logTrace "TERMSERV External terminal connected." - - -- Slogs go to both stderr and to the terminal. + acquireWorker "TERMSERV" $ forever $ do + logTrace "TERMSERV Waiting for external terminal." atomically $ do - oldSlog <- readTVar vSlog - writeTVar vSlog $ \txt -> do - atomically $ Term.trace muxed txt - oldSlog txt + ext <- Term.connClient <$> readTQueue termApiQ + Term.addDemux ext demux + logTrace "TERMSERV External terminal connected." - let logId = Log.identity log - let ship = who logId + -- Slogs go to both stderr and to the terminal. + atomically $ do + oldSlog <- readTVar vSlog + writeTVar vSlog $ \txt -> do + atomically $ Term.trace muxed txt + oldSlog txt - -- Our call above to set the logging function which echos errors from the - -- Serf doesn't have the appended \r\n because those \r\n s are added in - -- the c serf code. Logging output from our haskell process must manually - -- add them. - let showErr = atomically . Term.trace muxed . (flip append "\r\n") + let logId = Log.identity log + let ship = who logId + -- Our call above to set the logging function which echos errors from the + -- Serf doesn't have the appended \r\n because those \r\n s are added in + -- the c serf code. Logging output from our haskell process must manually + -- add them. + let showErr = atomically . Term.trace muxed . (flip append "\r\n") + + env <- ask + + let (bootEvents, startDrivers) = drivers + env + multi + ship + (isFake logId) + (writeTQueue computeQ) + (Term.TSize { tsWide = 80, tsTall = 24 }, muxed) + showErr + + io $ atomically $ for_ bootEvents (writeTQueue computeQ) + + scryM <- newEmptyTMVarIO + + onKill <- view onKillPierSigL + + let computeConfig = ComputeConfig { ccOnWork = readTQueue computeQ + , ccOnKill = onKill + , ccOnSave = takeTMVar saveM + , ccOnScry = takeTMVar scryM + , ccPutResult = writeTQueue persistQ + , ccShowSpinner = Term.spin muxed + , ccHideSpinner = Term.stopSpin muxed + , ccLastEvInLog = Log.lastEv log + } + + let plan = writeTQueue executeQ + + drivz <- startDrivers + tExec <- acquireWorker "Effects" (router (readTQueue executeQ) drivz) + tDisk <- acquireWorkerBound "Persist" (runPersist log persistQ plan) + tSerf <- acquireWorker "Serf" (runCompute serf computeConfig) + + tSaveSignal <- saveSignalThread saveM + + -- TODO bullshit scry tester + void $ acquireWorker "bullshit scry tester" $ forever $ do env <- ask + threadDelay 1_000_000 + wen <- io Time.now + let kal = \mTermNoun -> runRIO env $ do + logTrace $ displayShow ("scry result: ", mTermNoun) + let nkt = MkKnot $ tshow $ Time.MkDate wen + let pax = Path ["j", "~zod", "life", nkt, "~zod"] + atomically $ putTMVar scryM (wen, Nothing, pax, kal) - let (bootEvents, startDrivers) = - drivers env multi ship (isFake logId) - (writeTQueue computeQ) - (Term.TSize{tsWide=80, tsTall=24}, muxed) - showErr + putMVar mStart () - io $ atomically $ for_ bootEvents (writeTQueue computeQ) + -- Wait for something to die. - scryM <- newEmptyTMVarIO + let ded = asum + [ death "effects thread" tExec + , death "persist thread" tDisk + , death "compute thread" tSerf + ] - onKill <- view onKillPierSigL + atomically ded >>= \case + Left (txt, exn) -> logError $ displayShow ("Somthing died", txt, exn) + Right tag -> logError $ displayShow ("Something simply exited", tag) - let computeConfig = ComputeConfig - { ccOnWork = readTQueue computeQ - , ccOnKill = onKill - , ccOnSave = takeTMVar saveM - , ccOnScry = takeTMVar scryM - , ccPutResult = writeTQueue persistQ - , ccShowSpinner = Term.spin muxed - , ccHideSpinner = Term.stopSpin muxed - , ccLastEvInLog = Log.lastEv log - } - - let plan = writeTQueue executeQ - - drivz <- startDrivers - tExec <- acquireWorker "Effects" (router (readTQueue executeQ) drivz) - tDisk <- acquireWorkerBound "Persist" (runPersist log persistQ plan) - tSerf <- acquireWorker "Serf" (runCompute serf computeConfig) - - tSaveSignal <- saveSignalThread saveM - - -- bullshit scry tester - void $ acquireWorker "bullshit scry tester" $ forever $ do - env <- ask - threadDelay 1_000_000 - wen <- io Time.now - let kal = \mTermNoun -> runRIO env $ do - logTrace $ displayShow ("scry result: ", mTermNoun) - let nkt = MkKnot $ tshow $ Time.MkDate wen - let pax = Path ["j", "~zod", "life", nkt, "~zod"] - atomically $ putTMVar scryM (wen, Nothing, pax, kal) - - putMVar mStart () - - -- Wait for something to die. - - let ded = asum [ death "effects thread" tExec - , death "persist thread" tDisk - , death "compute thread" tSerf - ] - - atomically ded >>= \case - Left (txt, exn) -> logError $ displayShow ("Somthing died", txt, exn) - Right tag -> logError $ displayShow ("Something simply exited", tag) - - atomically $ (Term.spin muxed) (Just "shutdown") + atomically $ (Term.spin muxed) (Just "shutdown") death :: Text -> Async () -> STM (Either (Text, SomeException) Text) @@ -368,14 +368,14 @@ saveSignalThread tm = mkRAcquire start cancel -- Start All Drivers ----------------------------------------------------------- data Drivers e = Drivers - { dAmes :: EffCb e AmesEf - , dBehn :: EffCb e BehnEf - , dHttpClient :: EffCb e HttpClientEf - , dHttpServer :: EffCb e HttpServerEf - , dNewt :: EffCb e NewtEf - , dSync :: EffCb e SyncEf - , dTerm :: EffCb e TermEf - } + { dAmes :: AmesEf -> IO () + , dBehn :: BehnEf -> IO () + , dIris :: HttpClientEf -> IO () + , dEyre :: HttpServerEf -> IO () + , dNewt :: NewtEf -> IO () + , dSync :: SyncEf -> IO () + , dTerm :: TermEf -> IO () + } drivers :: HasPierEnv e @@ -388,25 +388,26 @@ drivers -> (Text -> RIO e ()) -> ([EvErr], RAcquire e (Drivers e)) drivers env multi who isFake plan termSys stderr = - (initialEvents, runDrivers) -- TODO - where - (behnBorn, runBehn) = behn env plan - (amesBorn, runAmes) = ames env who isFake plan stderr - (httpBorn, runHttp) = eyre env multi who plan isFake - (clayBorn, runClay) = clay env plan - (irisBorn, runIris) = client env plan - (termBorn, runTerm) = Term.term env termSys plan - initialEvents = mconcat [behnBorn, clayBorn, amesBorn, httpBorn, - termBorn, irisBorn] - runDrivers = do - dNewt <- runAmes - dBehn <- liftAcquire $ runBehn - dAmes <- pure $ const $ pure () - dHttpClient <- runIris - dHttpServer <- runHttp - dSync <- runClay - dTerm <- runTerm - pure (Drivers{..}) + (initialEvents, runDrivers) + where + (behnBorn, runBehn) = behn env plan + (amesBorn, runAmes) = ames env who isFake plan stderr + (httpBorn, runEyre) = eyre env multi who plan isFake + (clayBorn, runClay) = clay env plan + (irisBorn, runIris) = client env plan + (termBorn, runTerm) = Term.term env termSys plan + initialEvents = mconcat [behnBorn, clayBorn, amesBorn, httpBorn, + termBorn, irisBorn] + + runDrivers = do + dNewt <- runAmes + dBehn <- liftAcquire $ runBehn + dAmes <- pure $ const $ pure () + dIris <- runIris + dEyre <- runEyre + dSync <- runClay + dTerm <- runTerm + pure (Drivers{..}) -- Route Effects to Drivers ---------------------------------------------------- @@ -419,19 +420,19 @@ router waitFx Drivers {..} = forever $ do case ef of GoodParse (EfVega _ _ ) -> error "TODO" GoodParse (EfExit _ _ ) -> error "TODO" - GoodParse (EfVane (VEAmes ef)) -> dAmes ef - GoodParse (EfVane (VEBehn ef)) -> dBehn ef - GoodParse (EfVane (VEBoat ef)) -> dSync ef - GoodParse (EfVane (VEClay ef)) -> dSync ef - GoodParse (EfVane (VEHttpClient ef)) -> dHttpClient ef - GoodParse (EfVane (VEHttpServer ef)) -> dHttpServer ef - GoodParse (EfVane (VENewt ef)) -> dNewt ef - GoodParse (EfVane (VESync ef)) -> dSync ef - GoodParse (EfVane (VETerm ef)) -> dTerm ef + GoodParse (EfVane (VEAmes ef)) -> io (dAmes ef) + GoodParse (EfVane (VEBehn ef)) -> io (dBehn ef) + GoodParse (EfVane (VEBoat ef)) -> io (dSync ef) + GoodParse (EfVane (VEClay ef)) -> io (dSync ef) + GoodParse (EfVane (VEHttpClient ef)) -> io (dIris ef) + GoodParse (EfVane (VEHttpServer ef)) -> io (dEyre ef) + GoodParse (EfVane (VENewt ef)) -> io (dNewt ef) + GoodParse (EfVane (VESync ef)) -> io (dSync ef) + GoodParse (EfVane (VETerm ef)) -> io (dTerm ef) FailParse n -> logError $ display $ pack @Text (ppShow n) --- Compute Thread -------------------------------------------------------------- +-- Compute (Serf) Thread ------------------------------------------------------- logEvent :: HasLogFunc e => Ev -> RIO e () logEvent ev = logDebug $ display $ "[EVENT]\n" <> pretty @@ -473,8 +474,9 @@ runCompute serf ComputeConfig {..} = do void $ async $ forever (atomically (takeTMVar vEvProcessing) >>= logEvent) let onSpin :: Maybe Ev -> STM () - onSpin Nothing = ccHideSpinner - onSpin (Just ev) = do + onSpin = \case + Nothing -> ccHideSpinner + Just ev -> do ccShowSpinner (getSpinnerNameForEvent ev) putTMVar vEvProcessing ev @@ -483,7 +485,7 @@ runCompute serf ComputeConfig {..} = do io (Serf.run serf maxBatchSize ccLastEvInLog onCR ccPutResult onSpin) --- Persist Thread -------------------------------------------------------------- +-- Event-Log Persistence Thread ------------------------------------------------ data PersistExn = BadEventId EventId EventId deriving Show diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Pier/Types.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Pier/Types.hs index 5bc194ae8b..df479c94d9 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Pier/Types.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Pier/Types.hs @@ -107,14 +107,10 @@ data Order deriveToNoun ''Order -type EffCb e a = a -> RIO e () - -type Perform = Ef -> IO () - data IODriver = IODriver - { bornEvent :: IO Ev - , startDriver :: (Ev -> STM ()) -> IO (Async (), Perform) - } + { bornEvent :: IO Ev + , startDriver :: (Ev -> STM ()) -> IO (Async (), Ef -> IO ()) + } data Fact = Fact { factEve :: EventId diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Term.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Term.hs index 40fd1581bd..7fe9bb2b1b 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Term.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Term.hs @@ -505,7 +505,7 @@ term :: forall e. (HasPierEnv e) => e -> (T.TSize, Client) -> (EvErr -> STM ()) - -> ([EvErr], RAcquire e (EffCb e TermEf)) + -> ([EvErr], RAcquire e (TermEf -> IO ())) term env (tsize, Client{..}) plan = (initialEvents, runTerm) where @@ -516,10 +516,10 @@ term env (tsize, Client{..}) plan = , EvErr initialHail (initialHailFailed env) ] - runTerm :: RAcquire e (EffCb e TermEf) + runTerm :: RAcquire e (TermEf -> IO ()) runTerm = do tim <- mkRAcquire (async readLoop) cancel - pure handleEffect + pure (runRIO env . handleEffect) {- Because our terminals are always `Demux`ed, we don't have to