Merge pull request #2236 from urbit/bs/fix-log-replay

king: Fix log replay bug.
This commit is contained in:
benjamin-tlon 2020-02-05 15:43:33 -08:00 committed by GitHub
commit 8836513a7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 21 deletions

View File

@ -131,13 +131,19 @@ resumed :: (HasStderrLogFunc e, HasPierConfig e, HasLogFunc e)
=> Maybe Word64 -> Serf.Flags
-> RAcquire e (Serf e, EventLog, SerfState)
resumed event flags = do
rio $ logTrace "Resuming ship"
top <- view pierPathL
tap <- fmap (fromMaybe top) $ rio $ runMaybeT $ do
ev <- MaybeT (pure event)
MaybeT (getSnapshot top ev)
rio $ logTrace $ displayShow tap
rio $ logTrace $ display @Text ("pier: " <> pack top)
rio $ logTrace $ display @Text ("running serf in: " <> pack tap)
log <- Log.existing (top <> "/.urb/log")
serf <- Serf.run (Serf.Config tap flags)
serfSt <- rio $ Serf.replay serf log event
rio $ Serf.snapshot serf serfSt

View File

@ -270,7 +270,9 @@ cordText = T.strip . unCord
--------------------------------------------------------------------------------
snapshot :: HasLogFunc e => Serf e -> SerfState -> RIO e ()
snapshot serf ss = sendOrder serf $ OSave $ ssLastEv ss
snapshot serf ss = do
logTrace $ display ("Taking snapshot at event " <> tshow (ssLastEv ss))
sendOrder serf $ OSave $ ssLastEv ss
shutdown :: HasLogFunc e => Serf e -> Word8 -> RIO e ()
shutdown serf code = sendOrder serf (OExit code)
@ -298,12 +300,20 @@ recvPlea w = do
-}
handshake :: HasLogFunc e => Serf e -> LogIdentity -> RIO e SerfState
handshake serf ident = do
logTrace "Serf Handshake"
ss@SerfState{..} <- recvPlea serf >>= \case
PPlay e m -> pure $ SerfState e m
x -> throwIO (InvalidInitialPlea x)
logTrace $ display ("Handshake result: " <> tshow ss)
when (ssNextEv == 1) $ do
sendOrder serf (OBoot (lifecycleLen ident))
let ev = OBoot (lifecycleLen ident)
logTrace $ display ("No snapshot. Sending boot event: " <> tshow ev)
sendOrder serf ev
logTrace "Finished handshake"
pure ss
@ -427,35 +437,51 @@ replayJobs :: (HasStderrLogFunc e, HasLogFunc e)
replayJobs serf lastEv = go Nothing
where
go pb ss = do
await >>= \case
Nothing -> pure ss
Just job -> do
pb <- lift $ logStderr (updatePb ss pb)
played <- lift $ replayJob serf job
go pb played
await >>= \case
Nothing -> pure ss
Just job -> do
pb <- lift $ logStderr (updatePb ss pb)
played <- lift $ replayJob serf job
go pb played
updatePb ss = do
let start = lastEv - (fromIntegral (ssNextEv ss))
let msg = pack ("Replaying events #" ++ (show (ssNextEv ss)) ++
" to #" ++ (show lastEv))
updateProgressBar start msg
let start = lastEv - fromIntegral (ssNextEv ss)
let msg = pack ( "Replaying events #" ++ (show (ssNextEv ss))
<> " to #" ++ (show lastEv)
)
updateProgressBar start msg
replay :: (HasStderrLogFunc e, HasLogFunc e)
=> Serf e -> Log.EventLog -> Maybe Word64 -> RIO e SerfState
replay serf log last = do
logTrace "Beginning event log replay"
last & \case
Nothing -> pure ()
Just lt -> logTrace $ display $
"User requested to replay up to event #" <> tshow lt
ss <- handshake serf (Log.identity log)
let numEvs = case last of
Nothing -> ssNextEv ss
Just la -> la - (ssNextEv ss) + 1
logLastEv :: Word64 <- fromIntegral <$> Log.lastEv log
lastEv <- last & maybe (Log.lastEv log) pure
let serfNextEv = ssNextEv ss
lastEventInSnap = serfNextEv - 1
runConduit $ Log.streamEvents log (ssNextEv ss)
.| CC.take (fromIntegral numEvs)
.| toJobs (Log.identity log) (ssNextEv ss)
.| replayJobs serf (fromIntegral lastEv) ss
logTrace $ display $ "Last event in event log is #" <> tshow logLastEv
let replayUpTo = fromMaybe logLastEv last
let numEvs :: Int = fromIntegral replayUpTo - fromIntegral lastEventInSnap
logTrace $ display $ "Replaying up to event #" <> tshow replayUpTo
logTrace $ display $ "Will replay " <> tshow numEvs <> " in total."
runConduit $ Log.streamEvents log serfNextEv
.| CC.take (fromIntegral numEvs)
.| toJobs (Log.identity log) serfNextEv
.| replayJobs serf (fromIntegral replayUpTo) ss
toJobs :: HasLogFunc e
=> LogIdentity -> EventId -> ConduitT ByteString Job (RIO e) ()