mirror of
https://github.com/urbit/shrub.git
synced 2024-12-24 20:47:27 +03:00
Implement king bug collect-all-fx
For now, this is mostly useful as an integration test: Replay the whole event log of an existing ship using King Haskell. This also opens the door for a future tool that inspects collected effects for debugging purposes.
This commit is contained in:
parent
6a273906b2
commit
3ff5c4fad5
@ -1,7 +1,7 @@
|
|||||||
{-# OPTIONS_GHC -Werror -Wall #-}
|
{-# OPTIONS_GHC -Werror -Wall #-}
|
||||||
{-# LANGUAGE CPP #-}
|
{-# LANGUAGE CPP #-}
|
||||||
|
|
||||||
module CLI (parseArgs, Cmd(..), New(..), Run(..), Opts(..)) where
|
module CLI (parseArgs, Cmd(..), New(..), Run(..), Bug(..), Opts(..)) where
|
||||||
|
|
||||||
import ClassyPrelude
|
import ClassyPrelude
|
||||||
import Options.Applicative
|
import Options.Applicative
|
||||||
@ -40,10 +40,15 @@ data Run = Run
|
|||||||
}
|
}
|
||||||
deriving (Show)
|
deriving (Show)
|
||||||
|
|
||||||
|
data Bug
|
||||||
|
= ValidatePill FilePath
|
||||||
|
| CollectAllFX FilePath
|
||||||
|
deriving (Show)
|
||||||
|
|
||||||
data Cmd
|
data Cmd
|
||||||
= CmdNew New Opts
|
= CmdNew New Opts
|
||||||
| CmdRun Run Opts
|
| CmdRun Run Opts
|
||||||
| CmdVal FilePath -- Validate Pill
|
| CmdBug Bug
|
||||||
deriving (Show)
|
deriving (Show)
|
||||||
|
|
||||||
--------------------------------------------------------------------------------
|
--------------------------------------------------------------------------------
|
||||||
@ -177,16 +182,36 @@ opts = do
|
|||||||
|
|
||||||
pure (Opts{..})
|
pure (Opts{..})
|
||||||
|
|
||||||
|
newShip :: Parser Cmd
|
||||||
|
newShip = CmdNew <$> new <*> opts
|
||||||
|
|
||||||
runShip :: Parser Cmd
|
runShip :: Parser Cmd
|
||||||
runShip = do
|
runShip = do
|
||||||
rPierPath <- strArgument (metavar "PIER" <> help "Path to pier")
|
rPierPath <- strArgument (metavar "PIER" <> help "Path to pier")
|
||||||
o <- opts
|
o <- opts
|
||||||
pure (CmdRun (Run{..}) o)
|
pure (CmdRun (Run{..}) o)
|
||||||
|
|
||||||
valPill :: Parser Cmd
|
valPill :: Parser Bug
|
||||||
valPill = do
|
valPill = do
|
||||||
pillPath <- strArgument (metavar "PILL" <> help "Path to pill")
|
pillPath <- strArgument (metavar "PILL" <> help "Path to pill")
|
||||||
pure (CmdVal pillPath)
|
pure (ValidatePill pillPath)
|
||||||
|
|
||||||
|
bugCmd :: Parser Cmd
|
||||||
|
bugCmd = fmap CmdBug
|
||||||
|
$ subparser
|
||||||
|
$ command "validate-pill"
|
||||||
|
( info (valPill <**> helper)
|
||||||
|
$ progDesc "Validate a pill file."
|
||||||
|
)
|
||||||
|
<> command "collect-all-fx"
|
||||||
|
( info (allFx <**> helper)
|
||||||
|
$ progDesc "Replay entire event log, collecting all effects"
|
||||||
|
)
|
||||||
|
|
||||||
|
allFx :: Parser Bug
|
||||||
|
allFx = do
|
||||||
|
pier <- strArgument (metavar "PIER" <> help "Path to pier")
|
||||||
|
pure (CollectAllFX pier)
|
||||||
|
|
||||||
cmd :: Parser Cmd
|
cmd :: Parser Cmd
|
||||||
cmd = subparser
|
cmd = subparser
|
||||||
@ -196,8 +221,6 @@ cmd = subparser
|
|||||||
<> command "run" ( info (runShip <**> helper)
|
<> command "run" ( info (runShip <**> helper)
|
||||||
$ progDesc "Run an existing ship."
|
$ progDesc "Run an existing ship."
|
||||||
)
|
)
|
||||||
<> command "val" ( info (valPill <**> helper)
|
<> command "bug" ( info (bugCmd <**> helper)
|
||||||
$ progDesc "Validate a pill file."
|
$ progDesc "Run a debugging sub-command."
|
||||||
)
|
)
|
||||||
where
|
|
||||||
newShip = CmdNew <$> new <*> opts
|
|
||||||
|
@ -163,7 +163,7 @@ wipeSnapshot shipPath = do
|
|||||||
tryBootFromPill :: FilePath -> FilePath -> Ship -> IO ()
|
tryBootFromPill :: FilePath -> FilePath -> Ship -> IO ()
|
||||||
tryBootFromPill pillPath shipPath ship = do
|
tryBootFromPill pillPath shipPath ship = do
|
||||||
wipeSnapshot shipPath
|
wipeSnapshot shipPath
|
||||||
with (Pier.booted pillPath shipPath serfFlags ship) $ \(serf, log, ss) -> do
|
with (Pier.booted pillPath shipPath [] ship) $ \(serf, log, ss) -> do
|
||||||
print "lul"
|
print "lul"
|
||||||
print ss
|
print ss
|
||||||
threadDelay 500000
|
threadDelay 500000
|
||||||
@ -176,13 +176,13 @@ tryPlayShip :: FilePath -> IO ()
|
|||||||
tryPlayShip shipPath = do
|
tryPlayShip shipPath = do
|
||||||
runAcquire $ do
|
runAcquire $ do
|
||||||
putStrLn "RESUMING SHIP"
|
putStrLn "RESUMING SHIP"
|
||||||
sls <- Pier.resumed shipPath serfFlags
|
sls <- Pier.resumed shipPath []
|
||||||
putStrLn "SHIP RESUMED"
|
putStrLn "SHIP RESUMED"
|
||||||
Pier.pier shipPath Nothing sls
|
Pier.pier shipPath Nothing sls
|
||||||
|
|
||||||
tryResume :: FilePath -> IO ()
|
tryResume :: FilePath -> IO ()
|
||||||
tryResume shipPath = do
|
tryResume shipPath = do
|
||||||
with (Pier.resumed shipPath serfFlags) $ \(serf, log, ss) -> do
|
with (Pier.resumed shipPath []) $ \(serf, log, ss) -> do
|
||||||
print ss
|
print ss
|
||||||
threadDelay 500000
|
threadDelay 500000
|
||||||
shutdown serf 0 >>= print
|
shutdown serf 0 >>= print
|
||||||
@ -240,20 +240,28 @@ tryParseEvents dir first = do
|
|||||||
|
|
||||||
--------------------------------------------------------------------------------
|
--------------------------------------------------------------------------------
|
||||||
|
|
||||||
serfFlags :: Serf.Flags
|
{-
|
||||||
serfFlags = [Serf.Hashless, Serf.DryRun] -- [Serf.Verbose, Serf.Trace]
|
This runs the serf at `$top/.tmpdir`, but we disable snapshots,
|
||||||
|
so this should never actually be created. We just do this to avoid
|
||||||
collectedFX :: FilePath -> Acquire ()
|
letting the serf use an existing snapshot.
|
||||||
collectedFX top = do
|
-}
|
||||||
log <- Log.existing (top <> "/.urb/log")
|
|
||||||
serf <- Serf.run (Serf.Config top serfFlags)
|
|
||||||
liftIO (Serf.collectFX serf log)
|
|
||||||
|
|
||||||
collectAllFx :: FilePath -> IO ()
|
collectAllFx :: FilePath -> IO ()
|
||||||
collectAllFx top = do
|
collectAllFx top = do
|
||||||
wipeSnapshot top
|
putStrLn (pack top)
|
||||||
with (collectedFX top) $ \() ->
|
with collectedFX $ \() ->
|
||||||
putStrLn "[collectAllFx] Done collecting effects!"
|
putStrLn "[collectAllFx] Done collecting effects!"
|
||||||
|
where
|
||||||
|
tmpDir :: FilePath
|
||||||
|
tmpDir = top <> "/.tmpdir"
|
||||||
|
|
||||||
|
collectedFX :: Acquire ()
|
||||||
|
collectedFX = do
|
||||||
|
log <- Log.existing (top <> "/.urb/log")
|
||||||
|
serf <- Serf.run (Serf.Config tmpDir serfFlags)
|
||||||
|
liftIO (Serf.collectFX serf log)
|
||||||
|
|
||||||
|
serfFlags :: Serf.Flags
|
||||||
|
serfFlags = [Serf.Hashless, Serf.DryRun]
|
||||||
|
|
||||||
--------------------------------------------------------------------------------
|
--------------------------------------------------------------------------------
|
||||||
|
|
||||||
@ -288,9 +296,10 @@ runShip (CLI.Run pierPath) _ = tryPlayShip pierPath
|
|||||||
|
|
||||||
main :: IO ()
|
main :: IO ()
|
||||||
main = CLI.parseArgs >>= \case
|
main = CLI.parseArgs >>= \case
|
||||||
CLI.CmdRun r o -> runShip r o
|
CLI.CmdRun r o -> runShip r o
|
||||||
CLI.CmdNew n o -> newShip n o
|
CLI.CmdNew n o -> newShip n o
|
||||||
CLI.CmdVal pil -> validatePill pil
|
CLI.CmdBug (CLI.CollectAllFX pax) -> collectAllFx pax
|
||||||
|
CLI.CmdBug (CLI.ValidatePill pax) -> print ("validate-pill", pax)
|
||||||
|
|
||||||
validatePill :: FilePath -> IO ()
|
validatePill :: FilePath -> IO ()
|
||||||
validatePill = const (pure ())
|
validatePill = const (pure ())
|
||||||
|
@ -65,6 +65,7 @@ instance Exception EventLogExn where
|
|||||||
|
|
||||||
rawOpen :: FilePath -> IO Env
|
rawOpen :: FilePath -> IO Env
|
||||||
rawOpen dir = do
|
rawOpen dir = do
|
||||||
|
putStrLn $ pack ("PAX: " <> dir)
|
||||||
env <- mdb_env_create
|
env <- mdb_env_create
|
||||||
mdb_env_set_maxdbs env 3
|
mdb_env_set_maxdbs env 3
|
||||||
mdb_env_set_mapsize env (40 * 1024 * 1024 * 1024)
|
mdb_env_set_mapsize env (40 * 1024 * 1024 * 1024)
|
||||||
@ -94,7 +95,7 @@ open dir = do
|
|||||||
EventLog env m e f id <$> newIORef numEvs
|
EventLog env m e f id <$> newIORef numEvs
|
||||||
where
|
where
|
||||||
openTables env =
|
openTables env =
|
||||||
with (openTxn env) $ \txn ->
|
with (writeTxn env) $ \txn ->
|
||||||
(,,) <$> mdb_dbi_open txn (Just "META") []
|
(,,) <$> mdb_dbi_open txn (Just "META") []
|
||||||
<*> mdb_dbi_open txn (Just "EVENTS") [MDB_INTEGERKEY]
|
<*> mdb_dbi_open txn (Just "EVENTS") [MDB_INTEGERKEY]
|
||||||
<*> mdb_dbi_open txn (Just "EFFECTS") [MDB_CREATE, MDB_INTEGERKEY]
|
<*> mdb_dbi_open txn (Just "EFFECTS") [MDB_CREATE, MDB_INTEGERKEY]
|
||||||
@ -119,18 +120,34 @@ new dir id = mkAcquire (create dir id) close
|
|||||||
|
|
||||||
-- Read/Write Log Identity -----------------------------------------------------
|
-- Read/Write Log Identity -----------------------------------------------------
|
||||||
|
|
||||||
openTxn :: Env -> Acquire Txn
|
{-
|
||||||
openTxn env = mkAcquire begin commit
|
A read-only transaction that commits at the end.
|
||||||
|
|
||||||
|
Use this when opening database handles.
|
||||||
|
-}
|
||||||
|
_openTxn :: Env -> Acquire Txn
|
||||||
|
_openTxn env = mkAcquire begin commit
|
||||||
where
|
where
|
||||||
begin = mdb_txn_begin env Nothing True
|
begin = mdb_txn_begin env Nothing True
|
||||||
commit = mdb_txn_commit
|
commit = mdb_txn_commit
|
||||||
|
|
||||||
|
{-
|
||||||
|
A read-only transaction that aborts at the end.
|
||||||
|
|
||||||
|
Use this when reading data from already-opened databases.
|
||||||
|
-}
|
||||||
readTxn :: Env -> Acquire Txn
|
readTxn :: Env -> Acquire Txn
|
||||||
readTxn env = mkAcquire begin abort
|
readTxn env = mkAcquire begin abort
|
||||||
where
|
where
|
||||||
begin = mdb_txn_begin env Nothing True
|
begin = mdb_txn_begin env Nothing True
|
||||||
abort = mdb_txn_abort
|
abort = mdb_txn_abort
|
||||||
|
|
||||||
|
{-
|
||||||
|
A read-write transaction that commits upon sucessful completion and
|
||||||
|
aborts on exception.
|
||||||
|
|
||||||
|
Use this when reading data from already-opened databases.
|
||||||
|
-}
|
||||||
writeTxn :: Env -> Acquire Txn
|
writeTxn :: Env -> Acquire Txn
|
||||||
writeTxn env = mkAcquireType begin finalize
|
writeTxn env = mkAcquireType begin finalize
|
||||||
where
|
where
|
||||||
|
@ -447,21 +447,20 @@ collectFX :: Serf -> Log.EventLog -> IO ()
|
|||||||
collectFX serf log = do
|
collectFX serf log = do
|
||||||
ss <- handshake serf (Log.identity log)
|
ss <- handshake serf (Log.identity log)
|
||||||
|
|
||||||
let pax = "/home/benjamin/testnet-zod-fx"
|
|
||||||
|
|
||||||
createDirectoryIfMissing True pax
|
|
||||||
|
|
||||||
runConduit $ Log.streamEvents log (ssNextEv ss)
|
runConduit $ Log.streamEvents log (ssNextEv ss)
|
||||||
.| toJobs (Log.identity log) (ssNextEv ss)
|
.| toJobs (Log.identity log) (ssNextEv ss)
|
||||||
.| doCollectFX serf ss
|
.| doCollectFX serf ss
|
||||||
.| persistFX pax
|
.| persistFX log
|
||||||
|
|
||||||
persistFX :: FilePath -> ConduitT (EventId, FX) Void IO ()
|
persistFX :: Log.EventLog -> ConduitT (EventId, FX) Void IO ()
|
||||||
persistFX pax = await >>= \case
|
persistFX log = loop
|
||||||
Nothing -> pure ()
|
where
|
||||||
Just (eId, fx) -> do
|
loop = await >>= \case
|
||||||
writeFile (pax <> "/" <> show eId) (jamBS $ toNoun fx)
|
Nothing -> pure ()
|
||||||
persistFX pax
|
Just (eId, fx) -> do
|
||||||
|
liftIO $ Log.writeEffectsRow log eId (jamBS $ toNoun fx)
|
||||||
|
putStr "."
|
||||||
|
loop
|
||||||
|
|
||||||
doCollectFX :: Serf -> SerfState -> ConduitT Job (EventId, FX) IO ()
|
doCollectFX :: Serf -> SerfState -> ConduitT Job (EventId, FX) IO ()
|
||||||
doCollectFX serf = go
|
doCollectFX serf = go
|
||||||
@ -470,7 +469,7 @@ doCollectFX serf = go
|
|||||||
go ss = await >>= \case
|
go ss = await >>= \case
|
||||||
Nothing -> pure ()
|
Nothing -> pure ()
|
||||||
Just jb -> do
|
Just jb -> do
|
||||||
jb <- pure $ replaceMug jb (ssLastMug ss)
|
-- jb <- pure $ replaceMug jb (ssLastMug ss)
|
||||||
(_, ss, fx) <- liftIO (doJob serf jb)
|
(_, ss, fx) <- liftIO (doJob serf jb)
|
||||||
liftIO $ print (jobId jb)
|
liftIO $ print (jobId jb)
|
||||||
yield (jobId jb, fx)
|
yield (jobId jb, fx)
|
||||||
|
Loading…
Reference in New Issue
Block a user