king: Progress bars on replay.

This commit is contained in:
~siprel 2020-06-07 00:26:59 +00:00
parent 95df4b0764
commit 6ab2d78d7b
4 changed files with 38 additions and 13 deletions

View File

@ -15,7 +15,6 @@ Stubbed out:
- [x] Snapshots should block until that event is commited to disk. - [x] Snapshots should block until that event is commited to disk.
- [x] Hook up error callbacks to IO Drivers. - [x] Hook up error callbacks to IO Drivers.
- [x] Do something useful with error callbacks from IO Drivers. - [x] Do something useful with error callbacks from IO Drivers.
- [ ] Make sure replay progress bars go to stderr.
Bugs: Bugs:
@ -37,9 +36,11 @@ Polish:
- [x] Cleanup batching flow. - [x] Cleanup batching flow.
- [x] Think through how to shutdown the serf on exception. - [x] Think through how to shutdown the serf on exception.
- [x] King should shutdown promptly on ^C. Always takes 2s in practice. - [x] King should shutdown promptly on ^C. Always takes 2s in practice.
- [x] Bring back progress bars.
- [x] Make sure replay progress bars go to stderr.
- [ ] Logging for new IPC flow. - [ ] Logging for new IPC flow.
- [ ] Logging for boot sequence. - [ ] Logging for boot sequence.
- [ ] Bring back progress bars. - [ ] Take snapshots on clean shutdown.
# Misc Bugs # Misc Bugs

View File

@ -184,7 +184,7 @@ resumed
-> Maybe Word64 -> Maybe Word64
-> [Serf.Flag] -> [Serf.Flag]
-> RAcquire PierEnv (Serf, EventLog) -> RAcquire PierEnv (Serf, EventLog)
resumed vSlog replayUntil flags = do resumed vSlog replayUntil flags = do
rio $ logTrace "Resuming ship" rio $ logTrace "Resuming ship"
top <- view pierPathL top <- view pierPathL
tap <- fmap (fromMaybe top) $ rio $ runMaybeT $ do tap <- fmap (fromMaybe top) $ rio $ runMaybeT $ do

View File

@ -20,14 +20,16 @@ import Control.Monad.Trans.Resource (runResourceT)
import Urbit.Arvo (FX) import Urbit.Arvo (FX)
import qualified Data.Conduit.Combinators as CC import qualified Data.Conduit.Combinators as CC
import qualified System.ProgressBar as PB
import qualified Urbit.Vere.Log as Log import qualified Urbit.Vere.Log as Log
import Urbit.King.App (HasStderrLogFunc(..))
import qualified Urbit.Vere.Serf.IPC as X (Config(..), EvErr(..), Flag(..), import qualified Urbit.Vere.Serf.IPC as X (Config(..), EvErr(..), Flag(..),
RunReq(..), Serf, WorkError(..), run, RunReq(..), Serf, WorkError(..), run,
snapshot, start, stop) snapshot, start, stop)
-- ort System.ProgressBar -- ort System.ProgressBar
-- ort Urbit.King.App (HasStderrLogFunc(..))
-- ort qualified Urbit.Ob as Ob -- ort qualified Urbit.Ob as Ob
-- ort qualified Urbit.Time as Time -- ort qualified Urbit.Time as Time
@ -56,7 +58,7 @@ withSerf config = mkRAcquire startup kill
execReplay execReplay
:: forall e :: forall e
. HasLogFunc e . (HasLogFunc e, HasStderrLogFunc e)
=> Serf => Serf
-> Log.EventLog -> Log.EventLog
-> Maybe Word64 -> Maybe Word64
@ -107,20 +109,41 @@ execReplay serf log last = do
when (numEvs < 0) $ do when (numEvs < 0) $ do
error "impossible" error "impossible"
incProgress <- logStderr (trackProgress (fromIntegral numEvs))
logTrace $ display $ "Replaying up to event #" <> tshow replayUpTo logTrace $ display $ "Replaying up to event #" <> tshow replayUpTo
logTrace $ display $ "Will replay " <> tshow numEvs <> " in total." logTrace $ display $ "Will replay " <> tshow numEvs <> " in total."
env <- ask env <- ask
let onProgress n = do res <- runResourceT
runRIO env $ logTrace $ display ("Serf is at event# " <> tshow n)
runResourceT
$ runConduit $ runConduit
$ Log.streamEvents log (lastEventInSnap + 1) $ Log.streamEvents log (lastEventInSnap + 1)
.| CC.take (fromIntegral numEvs) .| CC.take (fromIntegral numEvs)
.| CC.mapM (fmap snd . parseLogRow) .| CC.mapM (fmap snd . parseLogRow)
.| replay 10 onProgress serf .| replay 5 incProgress serf
pure res
logStderr :: HasStderrLogFunc e => RIO LogFunc a -> RIO e a
logStderr action = do
logFunc <- view stderrLogFuncL
runRIO logFunc action
trackProgress
:: HasLogFunc e
=> Word64
-> RIO e (Int -> IO ())
trackProgress = \case
0 -> pure $ const $ pure ()
num -> do
let style = PB.defStyle { PB.stylePostfix = PB.exact }
let refresh = 10
let init = PB.Progress 0 (fromIntegral num) ()
bar <- PB.newProgressBar style refresh init
env <- ask
let incr = PB.incProgress bar
pure (runRIO env . incr)
-- Collect FX ------------------------------------------------------------------ -- Collect FX ------------------------------------------------------------------

View File

@ -481,7 +481,7 @@ replay
:: forall m :: forall m
. (MonadResource m, MonadUnliftIO m, MonadIO m) . (MonadResource m, MonadUnliftIO m, MonadIO m)
=> Int => Int
-> (EventId -> IO ()) -> (Int -> IO ())
-> Serf -> Serf
-> ConduitT Noun Void m (Maybe PlayBail) -> ConduitT Noun Void m (Maybe PlayBail)
replay batchSize cb serf = do replay batchSize cb serf = do
@ -491,7 +491,6 @@ replay batchSize cb serf = do
where where
loop :: SerfState -> ConduitT Noun Void m (Maybe PlayBail, SerfState) loop :: SerfState -> ConduitT Noun Void m (Maybe PlayBail, SerfState)
loop ss@(SerfState lastEve lastMug) = do loop ss@(SerfState lastEve lastMug) = do
io (cb lastEve)
awaitBatch batchSize >>= \case awaitBatch batchSize >>= \case
[] -> pure (Nothing, SerfState lastEve lastMug) [] -> pure (Nothing, SerfState lastEve lastMug)
evs -> do evs -> do
@ -500,7 +499,9 @@ replay batchSize cb serf = do
io $ sendWrit serf (WPlay nexEve evs) io $ sendWrit serf (WPlay nexEve evs)
io (recvPlay serf) >>= \case io (recvPlay serf) >>= \case
PBail bail -> pure (Just bail, SerfState lastEve lastMug) PBail bail -> pure (Just bail, SerfState lastEve lastMug)
PDone newMug -> loop (SerfState newEve newMug) PDone newMug -> do
io (cb $ length evs)
loop (SerfState newEve newMug)
{-| {-|
TODO If this is slow, use a mutable vector instead of reversing a list. TODO If this is slow, use a mutable vector instead of reversing a list.