mirror of
https://github.com/urbit/shrub.git
synced 2024-12-24 20:47:27 +03:00
Merge pull request #2163 from urbit/philip/king-replay
king: add partial-replay and --dry-from
This commit is contained in:
commit
84ddf79264
@ -20,6 +20,7 @@ data Opts = Opts
|
||||
, oHashless :: Bool
|
||||
, oExit :: Bool
|
||||
, oDryRun :: Bool
|
||||
, oDryFrom :: Maybe Word64
|
||||
, oVerbose :: Bool
|
||||
, oAmesPort :: Maybe Word16
|
||||
, oTrace :: Bool
|
||||
@ -77,6 +78,10 @@ data Bug
|
||||
, bFirstEvt :: Word64
|
||||
, bFinalEvt :: Word64
|
||||
}
|
||||
| ReplayEvents
|
||||
{ bPierPath :: FilePath
|
||||
, bFinalEvt :: Word64
|
||||
}
|
||||
| CheckDawn
|
||||
{ bKeyfilePath :: FilePath
|
||||
}
|
||||
@ -204,11 +209,10 @@ new = do
|
||||
|
||||
opts :: Parser Opts
|
||||
opts = do
|
||||
oAmesPort <- option auto $ metavar "PORT"
|
||||
oAmesPort <- optional $ option auto $ metavar "PORT"
|
||||
<> short 'p'
|
||||
<> long "ames"
|
||||
<> help "Ames port number"
|
||||
<> value Nothing
|
||||
<> hidden
|
||||
|
||||
-- Always disable hashboard. Right now, urbit is almost unusable with this
|
||||
@ -238,10 +242,15 @@ opts = do
|
||||
<> help "Persist no events and turn off Ames networking"
|
||||
<> hidden
|
||||
|
||||
oTrace <- switch $ short 't'
|
||||
<> long "trace"
|
||||
<> help "Enable tracing"
|
||||
<> hidden
|
||||
oDryFrom <- optional $ option auto $ metavar "EVENT"
|
||||
<> long "dry-from"
|
||||
<> help "Dry run from event number"
|
||||
<> hidden
|
||||
|
||||
oTrace <- switch $ short 't'
|
||||
<> long "trace"
|
||||
<> help "Enable tracing"
|
||||
<> hidden
|
||||
|
||||
oLocalhost <- switch $ short 'L'
|
||||
<> long "local"
|
||||
@ -298,7 +307,7 @@ firstEv = option auto $ long "first"
|
||||
lastEv :: Parser Word64
|
||||
lastEv = option auto $ long "last"
|
||||
<> metavar "LAS"
|
||||
<> help "anding with event LAS"
|
||||
<> help "ending with event LAS"
|
||||
<> value maxBound
|
||||
|
||||
checkEvs :: Parser Bug
|
||||
@ -307,6 +316,9 @@ checkEvs = ValidateEvents <$> pierPath <*> firstEv <*> lastEv
|
||||
checkFx :: Parser Bug
|
||||
checkFx = ValidateFX <$> pierPath <*> firstEv <*> lastEv
|
||||
|
||||
replayEvs :: Parser Bug
|
||||
replayEvs = ReplayEvents <$> pierPath <*> lastEv
|
||||
|
||||
browseEvs :: Parser Bug
|
||||
browseEvs = EventBrowser <$> pierPath
|
||||
|
||||
@ -336,6 +348,10 @@ bugCmd = fmap CmdBug
|
||||
( info (checkFx <**> helper)
|
||||
$ progDesc "Parse all data in event log"
|
||||
)
|
||||
<> command "partial-replay"
|
||||
( info (replayEvs <**> helper)
|
||||
$ progDesc "Replay up to N events"
|
||||
)
|
||||
<> command "dawn"
|
||||
( info (checkDawn <**> helper)
|
||||
$ progDesc "Test run dawn"
|
||||
|
@ -68,6 +68,7 @@ import Urbit.Vere.Serf
|
||||
import Control.Concurrent (myThreadId)
|
||||
import Control.Exception (AsyncException(UserInterrupt))
|
||||
import Control.Lens ((&))
|
||||
import System.Process (system)
|
||||
import Text.Show.Pretty (pPrint)
|
||||
import Urbit.King.App (runApp, runAppLogFile, runPierApp)
|
||||
import Urbit.King.App (HasConfigDir(..))
|
||||
@ -114,7 +115,7 @@ toSerfFlags CLI.Opts{..} = catMaybes m
|
||||
, from oHashless Serf.Hashless
|
||||
, from oQuiet Serf.Quiet
|
||||
, from oVerbose Serf.Verbose
|
||||
, from oDryRun Serf.DryRun
|
||||
, from (oDryRun || isJust oDryFrom) Serf.DryRun
|
||||
]
|
||||
from True flag = Just flag
|
||||
from False _ = Nothing
|
||||
@ -123,12 +124,12 @@ toSerfFlags CLI.Opts{..} = catMaybes m
|
||||
toPierConfig :: FilePath -> CLI.Opts -> PierConfig
|
||||
toPierConfig pierPath CLI.Opts{..} = PierConfig
|
||||
{ _pcPierPath = pierPath
|
||||
, _pcDryRun = oDryRun
|
||||
, _pcDryRun = (oDryRun || isJust oDryFrom)
|
||||
}
|
||||
|
||||
toNetworkConfig :: CLI.Opts -> NetworkConfig
|
||||
toNetworkConfig CLI.Opts{..} = NetworkConfig
|
||||
{ ncNetworking = if oDryRun then NetworkNone
|
||||
{ ncNetworking = if (oDryRun || isJust oDryFrom) then NetworkNone
|
||||
else if oOffline then NetworkNone
|
||||
else if oLocalhost then NetworkLocalhost
|
||||
else NetworkNormal
|
||||
@ -163,7 +164,10 @@ runOrExitImmediately getPier oExit =
|
||||
shutdownImmediately (serf, log, ss) = do
|
||||
logTrace "Sending shutdown signal"
|
||||
logTrace $ displayShow ss
|
||||
io $ threadDelay 500000 -- Why is this here? Do I need to force a snapshot to happen?
|
||||
|
||||
-- Why is this here? Do I need to force a snapshot to happen?
|
||||
io $ threadDelay 500000
|
||||
|
||||
ss <- shutdown serf 0
|
||||
logTrace $ displayShow ss
|
||||
logTrace "Shutdown!"
|
||||
@ -174,8 +178,8 @@ runOrExitImmediately getPier oExit =
|
||||
tryPlayShip :: ( HasLogFunc e, HasNetworkConfig e, HasPierConfig e
|
||||
, HasConfigDir e
|
||||
)
|
||||
=> Bool -> Bool -> Serf.Flags -> RIO e ()
|
||||
tryPlayShip exitImmediately fullReplay flags = do
|
||||
=> Bool -> Bool -> Maybe Word64 -> Serf.Flags -> RIO e ()
|
||||
tryPlayShip exitImmediately fullReplay playFrom flags = do
|
||||
when fullReplay wipeSnapshot
|
||||
runOrExitImmediately resumeShip exitImmediately
|
||||
where
|
||||
@ -193,7 +197,7 @@ tryPlayShip exitImmediately fullReplay flags = do
|
||||
resumeShip = do
|
||||
view pierPathL >>= lockFile
|
||||
rio $ logTrace "RESUMING SHIP"
|
||||
sls <- Pier.resumed flags
|
||||
sls <- Pier.resumed playFrom flags
|
||||
rio $ logTrace "SHIP RESUMED"
|
||||
pure sls
|
||||
|
||||
@ -257,7 +261,7 @@ collectAllFx top = do
|
||||
logTrace "Done collecting effects!"
|
||||
where
|
||||
tmpDir :: FilePath
|
||||
tmpDir = top <> "/.tmpdir"
|
||||
tmpDir = top </> ".tmpdir"
|
||||
|
||||
collectedFX :: RAcquire e ()
|
||||
collectedFX = do
|
||||
@ -271,6 +275,41 @@ collectAllFx top = do
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
replayPartEvs :: ∀e. HasLogFunc e => FilePath -> Word64 -> RIO e ()
|
||||
replayPartEvs top last = do
|
||||
logTrace $ display $ pack @Text top
|
||||
fetchSnapshot
|
||||
rwith replayedEvs $ \() ->
|
||||
logTrace "Done replaying events!"
|
||||
where
|
||||
fetchSnapshot :: RIO e ()
|
||||
fetchSnapshot = do
|
||||
snap <- Pier.getSnapshot top last
|
||||
case snap of
|
||||
Nothing -> pure ()
|
||||
Just sn -> do
|
||||
liftIO $ system $ "cp -r \"" <> sn <> "\" \"" <> tmpDir <> "\""
|
||||
pure ()
|
||||
|
||||
tmpDir :: FilePath
|
||||
tmpDir = top </> ".partial-replay" </> show last
|
||||
|
||||
replayedEvs :: RAcquire e ()
|
||||
replayedEvs = do
|
||||
lockFile top
|
||||
log <- Log.existing (top <> "/.urb/log")
|
||||
serf <- Serf.run (Serf.Config tmpDir serfFlags)
|
||||
rio $ do
|
||||
ss <- Serf.replay serf log $ Just last
|
||||
Serf.snapshot serf ss
|
||||
io $ threadDelay 500000 -- Copied from runOrExitImmediately
|
||||
pure ()
|
||||
|
||||
serfFlags :: Serf.Flags
|
||||
serfFlags = [Serf.Hashless]
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
{-|
|
||||
Interesting
|
||||
-}
|
||||
@ -432,7 +471,11 @@ runShip (CLI.Run pierPath) opts = do
|
||||
let pierConfig = toPierConfig pierPath opts
|
||||
let networkConfig = toNetworkConfig opts
|
||||
runPierApp pierConfig networkConfig $
|
||||
tryPlayShip (CLI.oExit opts) (CLI.oFullReplay opts) (toSerfFlags opts)
|
||||
tryPlayShip
|
||||
(CLI.oExit opts)
|
||||
(CLI.oFullReplay opts)
|
||||
(CLI.oDryFrom opts)
|
||||
(toSerfFlags opts)
|
||||
|
||||
|
||||
startBrowser :: HasLogFunc e => FilePath -> RIO e ()
|
||||
@ -505,6 +548,7 @@ main = do
|
||||
CLI.CmdBug (CLI.ValidatePill pax pil s) -> runApp $ testPill pax pil s
|
||||
CLI.CmdBug (CLI.ValidateEvents pax f l) -> runApp $ checkEvs pax f l
|
||||
CLI.CmdBug (CLI.ValidateFX pax f l) -> runApp $ checkFx pax f l
|
||||
CLI.CmdBug (CLI.ReplayEvents pax l) -> runApp $ replayPartEvs pax l
|
||||
CLI.CmdBug (CLI.CheckDawn pax) -> runApp $ checkDawn pax
|
||||
CLI.CmdBug CLI.CheckComet -> runApp $ checkComet
|
||||
CLI.CmdCon pier -> runAppLogFile $ connTerm pier
|
||||
|
@ -268,7 +268,6 @@ streamEvents :: HasLogFunc e
|
||||
=> EventLog -> Word64
|
||||
-> ConduitT () ByteString (RIO e) ()
|
||||
streamEvents log first = do
|
||||
last <- lift $ lastEv log
|
||||
batch <- lift $ readBatch log first
|
||||
unless (null batch) $ do
|
||||
for_ batch yield
|
||||
|
@ -5,15 +5,17 @@
|
||||
communication between the serf, the log, and the IO drivers.
|
||||
-}
|
||||
module Urbit.Vere.Pier
|
||||
( booted, resumed, pier, runPersist, runCompute, generateBootSeq
|
||||
( booted, resumed, getSnapshot, pier, runPersist, runCompute, generateBootSeq
|
||||
) where
|
||||
|
||||
import Urbit.Prelude
|
||||
|
||||
import RIO.Directory
|
||||
import System.Random
|
||||
import Urbit.Arvo
|
||||
import Urbit.King.Config
|
||||
import Urbit.Vere.Pier.Types
|
||||
import Control.Monad.Trans.Maybe
|
||||
|
||||
import Data.Text (append)
|
||||
import System.Posix.Files (ownerModes, setFileMode)
|
||||
@ -26,8 +28,6 @@ import Urbit.Vere.Http.Server (serv)
|
||||
import Urbit.Vere.Log (EventLog)
|
||||
import Urbit.Vere.Serf (Serf, SerfState(..), doJob, sStderr)
|
||||
|
||||
import RIO.Directory
|
||||
|
||||
import qualified System.Console.Terminal.Size as TSize
|
||||
import qualified System.Entropy as Ent
|
||||
import qualified Urbit.King.API as King
|
||||
@ -128,18 +128,36 @@ booted pill lite flags ship boot = do
|
||||
-- Resume an existing ship. ----------------------------------------------------
|
||||
|
||||
resumed :: (HasPierConfig e, HasLogFunc e)
|
||||
=> Serf.Flags
|
||||
=> Maybe Word64 -> Serf.Flags
|
||||
-> RAcquire e (Serf e, EventLog, SerfState)
|
||||
resumed flags = do
|
||||
resumed event flags = do
|
||||
top <- view pierPathL
|
||||
tap <- fmap (fromMaybe top) $ rio $ runMaybeT $ do
|
||||
ev <- MaybeT (pure event)
|
||||
MaybeT (getSnapshot top ev)
|
||||
rio $ logTrace $ displayShow tap
|
||||
log <- Log.existing (top <> "/.urb/log")
|
||||
serf <- Serf.run (Serf.Config top flags)
|
||||
serfSt <- rio $ Serf.replay serf log
|
||||
serf <- Serf.run (Serf.Config tap flags)
|
||||
serfSt <- rio $ Serf.replay serf log event
|
||||
|
||||
rio $ Serf.snapshot serf serfSt
|
||||
|
||||
pure (serf, log, serfSt)
|
||||
|
||||
getSnapshot :: forall e. FilePath -> Word64 -> RIO e (Maybe FilePath)
|
||||
getSnapshot top last = do
|
||||
lastSnapshot <- lastMay <$> listReplays
|
||||
pure (replayToPath <$> lastSnapshot)
|
||||
where
|
||||
replayDir = top </> ".partial-replay"
|
||||
replayToPath eId = replayDir </> show eId
|
||||
|
||||
listReplays :: RIO e [Word64]
|
||||
listReplays = do
|
||||
createDirectoryIfMissing True replayDir
|
||||
snapshotNums <- mapMaybe readMay <$> listDirectory replayDir
|
||||
pure $ sort (filter (<= fromIntegral last) snapshotNums)
|
||||
|
||||
|
||||
-- Run Pier --------------------------------------------------------------------
|
||||
|
||||
|
@ -27,14 +27,14 @@ import Foreign.Ptr (castPtr)
|
||||
import Foreign.Storable (peek, poke)
|
||||
import System.Exit (ExitCode)
|
||||
|
||||
import qualified Urbit.Ob as Ob
|
||||
|
||||
import qualified Data.ByteString.Unsafe as BS
|
||||
import qualified Data.Text as T
|
||||
import qualified System.IO as IO
|
||||
import qualified System.IO.Error as IO
|
||||
import qualified Urbit.Time as Time
|
||||
import qualified Urbit.Vere.Log as Log
|
||||
import qualified Data.ByteString.Unsafe as BS
|
||||
import qualified Data.Conduit.Combinators as CC
|
||||
import qualified Data.Text as T
|
||||
import qualified System.IO as IO
|
||||
import qualified System.IO.Error as IO
|
||||
import qualified Urbit.Ob as Ob
|
||||
import qualified Urbit.Time as Time
|
||||
import qualified Urbit.Vere.Log as Log
|
||||
|
||||
|
||||
-- Serf Config -----------------------------------------------------------------
|
||||
@ -435,12 +435,19 @@ replayJobs serf lastEv = go Nothing
|
||||
updateProgressBar start msg
|
||||
|
||||
|
||||
replay :: HasLogFunc e => Serf e -> Log.EventLog -> RIO e SerfState
|
||||
replay serf log = do
|
||||
replay :: HasLogFunc e
|
||||
=> Serf e -> Log.EventLog -> Maybe Word64 -> RIO e SerfState
|
||||
replay serf log last = do
|
||||
ss <- handshake serf (Log.identity log)
|
||||
|
||||
lastEv <- Log.lastEv log
|
||||
let numEvs = case last of
|
||||
Nothing -> ssNextEv ss
|
||||
Just la -> la - (ssNextEv ss) + 1
|
||||
|
||||
lastEv <- last & maybe (Log.lastEv log) pure
|
||||
|
||||
runConduit $ Log.streamEvents log (ssNextEv ss)
|
||||
.| CC.take (fromIntegral numEvs)
|
||||
.| toJobs (Log.identity log) (ssNextEv ss)
|
||||
.| replayJobs serf (fromIntegral lastEv) ss
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user