diff --git a/pkg/hs/urbit-king/TODO.md b/pkg/hs/urbit-king/TODO.md index 1cadbef8bd..1dacb9ff90 100644 --- a/pkg/hs/urbit-king/TODO.md +++ b/pkg/hs/urbit-king/TODO.md @@ -15,7 +15,6 @@ Stubbed out: - [x] Snapshots should block until that event is commited to disk. - [x] Hook up error callbacks to IO Drivers. - [x] Do something useful with error callbacks from IO Drivers. -- [ ] Make sure replay progress bars go to stderr. Bugs: @@ -37,9 +36,11 @@ Polish: - [x] Cleanup batching flow. - [x] Think through how to shutdown the serf on exception. - [x] King should shutdown promptly on ^C. Always takes 2s in practice. +- [x] Bring back progress bars. +- [x] Make sure replay progress bars go to stderr. - [ ] Logging for new IPC flow. - [ ] Logging for boot sequence. -- [ ] Bring back progress bars. +- [ ] Take snapshots on clean shutdown. # Misc Bugs diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs index 2a4666c5da..65c2e859f1 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Pier.hs @@ -184,7 +184,7 @@ resumed -> Maybe Word64 -> [Serf.Flag] -> RAcquire PierEnv (Serf, EventLog) -resumed vSlog replayUntil flags = do +resumed vSlog replayUntil flags = do rio $ logTrace "Resuming ship" top <- view pierPathL tap <- fmap (fromMaybe top) $ rio $ runMaybeT $ do diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Serf.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Serf.hs index 4032266574..72ee1a04da 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Serf.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Serf.hs @@ -20,14 +20,16 @@ import Control.Monad.Trans.Resource (runResourceT) import Urbit.Arvo (FX) import qualified Data.Conduit.Combinators as CC +import qualified System.ProgressBar as PB import qualified Urbit.Vere.Log as Log +import Urbit.King.App (HasStderrLogFunc(..)) + import qualified Urbit.Vere.Serf.IPC as X (Config(..), EvErr(..), Flag(..), RunReq(..), Serf, WorkError(..), run, snapshot, start, stop) -- ort System.ProgressBar --- ort Urbit.King.App (HasStderrLogFunc(..)) -- ort qualified Urbit.Ob as Ob -- ort qualified Urbit.Time as Time @@ -56,7 +58,7 @@ withSerf config = mkRAcquire startup kill execReplay :: forall e - . HasLogFunc e + . (HasLogFunc e, HasStderrLogFunc e) => Serf -> Log.EventLog -> Maybe Word64 @@ -107,20 +109,41 @@ execReplay serf log last = do when (numEvs < 0) $ do error "impossible" + incProgress <- logStderr (trackProgress (fromIntegral numEvs)) + logTrace $ display $ "Replaying up to event #" <> tshow replayUpTo logTrace $ display $ "Will replay " <> tshow numEvs <> " in total." env <- ask - let onProgress n = do - runRIO env $ logTrace $ display ("Serf is at event# " <> tshow n) - - runResourceT + res <- runResourceT $ runConduit $ Log.streamEvents log (lastEventInSnap + 1) .| CC.take (fromIntegral numEvs) .| CC.mapM (fmap snd . parseLogRow) - .| replay 10 onProgress serf + .| replay 5 incProgress serf + + pure res + +logStderr :: HasStderrLogFunc e => RIO LogFunc a -> RIO e a +logStderr action = do + logFunc <- view stderrLogFuncL + runRIO logFunc action + +trackProgress + :: HasLogFunc e + => Word64 + -> RIO e (Int -> IO ()) +trackProgress = \case + 0 -> pure $ const $ pure () + num -> do + let style = PB.defStyle { PB.stylePostfix = PB.exact } + let refresh = 10 + let init = PB.Progress 0 (fromIntegral num) () + bar <- PB.newProgressBar style refresh init + env <- ask + let incr = PB.incProgress bar + pure (runRIO env . incr) -- Collect FX ------------------------------------------------------------------ diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs index 5a5257103f..50059e6ddc 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs @@ -481,7 +481,7 @@ replay :: forall m . (MonadResource m, MonadUnliftIO m, MonadIO m) => Int - -> (EventId -> IO ()) + -> (Int -> IO ()) -> Serf -> ConduitT Noun Void m (Maybe PlayBail) replay batchSize cb serf = do @@ -491,7 +491,6 @@ replay batchSize cb serf = do where loop :: SerfState -> ConduitT Noun Void m (Maybe PlayBail, SerfState) loop ss@(SerfState lastEve lastMug) = do - io (cb lastEve) awaitBatch batchSize >>= \case [] -> pure (Nothing, SerfState lastEve lastMug) evs -> do @@ -500,7 +499,9 @@ replay batchSize cb serf = do io $ sendWrit serf (WPlay nexEve evs) io (recvPlay serf) >>= \case PBail bail -> pure (Just bail, SerfState lastEve lastMug) - PDone newMug -> loop (SerfState newEve newMug) + PDone newMug -> do + io (cb $ length evs) + loop (SerfState newEve newMug) {-| TODO If this is slow, use a mutable vector instead of reversing a list.