From 3ff5c4fad540a105d53f9ee733706a8358b83f1c Mon Sep 17 00:00:00 2001 From: Benjamin Summers Date: Tue, 20 Aug 2019 17:42:53 -0700 Subject: [PATCH] Implement `king bug collect-all-fx` For now, this is mostly useful as an integration test: Replay the whole event log of an existing ship using King Haskell. This also opens the door for a future tool that inspects collected effects for debugging purposes. --- pkg/king/app/CLI.hs | 39 +++++++++++++++++++++++++++-------- pkg/king/app/Main.hs | 43 +++++++++++++++++++++++---------------- pkg/king/lib/Vere/Log.hs | 23 ++++++++++++++++++--- pkg/king/lib/Vere/Serf.hs | 23 ++++++++++----------- 4 files changed, 88 insertions(+), 40 deletions(-) diff --git a/pkg/king/app/CLI.hs b/pkg/king/app/CLI.hs index da94e6a6ef..4e238bc84d 100644 --- a/pkg/king/app/CLI.hs +++ b/pkg/king/app/CLI.hs @@ -1,7 +1,7 @@ {-# OPTIONS_GHC -Werror -Wall #-} {-# LANGUAGE CPP #-} -module CLI (parseArgs, Cmd(..), New(..), Run(..), Opts(..)) where +module CLI (parseArgs, Cmd(..), New(..), Run(..), Bug(..), Opts(..)) where import ClassyPrelude import Options.Applicative @@ -40,10 +40,15 @@ data Run = Run } deriving (Show) +data Bug + = ValidatePill FilePath + | CollectAllFX FilePath + deriving (Show) + data Cmd = CmdNew New Opts | CmdRun Run Opts - | CmdVal FilePath -- Validate Pill + | CmdBug Bug deriving (Show) -------------------------------------------------------------------------------- @@ -177,16 +182,36 @@ opts = do pure (Opts{..}) +newShip :: Parser Cmd +newShip = CmdNew <$> new <*> opts + runShip :: Parser Cmd runShip = do rPierPath <- strArgument (metavar "PIER" <> help "Path to pier") o <- opts pure (CmdRun (Run{..}) o) -valPill :: Parser Cmd +valPill :: Parser Bug valPill = do pillPath <- strArgument (metavar "PILL" <> help "Path to pill") - pure (CmdVal pillPath) + pure (ValidatePill pillPath) + +bugCmd :: Parser Cmd +bugCmd = fmap CmdBug + $ subparser + $ command "validate-pill" + ( info (valPill <**> helper) + $ progDesc "Validate a pill file." + ) + <> command "collect-all-fx" + ( info (allFx <**> helper) + $ progDesc "Replay entire event log, collecting all effects" + ) + +allFx :: Parser Bug +allFx = do + pier <- strArgument (metavar "PIER" <> help "Path to pier") + pure (CollectAllFX pier) cmd :: Parser Cmd cmd = subparser @@ -196,8 +221,6 @@ cmd = subparser <> command "run" ( info (runShip <**> helper) $ progDesc "Run an existing ship." ) - <> command "val" ( info (valPill <**> helper) - $ progDesc "Validate a pill file." + <> command "bug" ( info (bugCmd <**> helper) + $ progDesc "Run a debugging sub-command." ) - where - newShip = CmdNew <$> new <*> opts diff --git a/pkg/king/app/Main.hs b/pkg/king/app/Main.hs index 662bebfc05..0a426de02f 100644 --- a/pkg/king/app/Main.hs +++ b/pkg/king/app/Main.hs @@ -163,7 +163,7 @@ wipeSnapshot shipPath = do tryBootFromPill :: FilePath -> FilePath -> Ship -> IO () tryBootFromPill pillPath shipPath ship = do wipeSnapshot shipPath - with (Pier.booted pillPath shipPath serfFlags ship) $ \(serf, log, ss) -> do + with (Pier.booted pillPath shipPath [] ship) $ \(serf, log, ss) -> do print "lul" print ss threadDelay 500000 @@ -176,13 +176,13 @@ tryPlayShip :: FilePath -> IO () tryPlayShip shipPath = do runAcquire $ do putStrLn "RESUMING SHIP" - sls <- Pier.resumed shipPath serfFlags + sls <- Pier.resumed shipPath [] putStrLn "SHIP RESUMED" Pier.pier shipPath Nothing sls tryResume :: FilePath -> IO () tryResume shipPath = do - with (Pier.resumed shipPath serfFlags) $ \(serf, log, ss) -> do + with (Pier.resumed shipPath []) $ \(serf, log, ss) -> do print ss threadDelay 500000 shutdown serf 0 >>= print @@ -240,20 +240,28 @@ tryParseEvents dir first = do -------------------------------------------------------------------------------- -serfFlags :: Serf.Flags -serfFlags = [Serf.Hashless, Serf.DryRun] -- [Serf.Verbose, Serf.Trace] - -collectedFX :: FilePath -> Acquire () -collectedFX top = do - log <- Log.existing (top <> "/.urb/log") - serf <- Serf.run (Serf.Config top serfFlags) - liftIO (Serf.collectFX serf log) - +{- + This runs the serf at `$top/.tmpdir`, but we disable snapshots, + so this should never actually be created. We just do this to avoid + letting the serf use an existing snapshot. +-} collectAllFx :: FilePath -> IO () collectAllFx top = do - wipeSnapshot top - with (collectedFX top) $ \() -> + putStrLn (pack top) + with collectedFX $ \() -> putStrLn "[collectAllFx] Done collecting effects!" + where + tmpDir :: FilePath + tmpDir = top <> "/.tmpdir" + + collectedFX :: Acquire () + collectedFX = do + log <- Log.existing (top <> "/.urb/log") + serf <- Serf.run (Serf.Config tmpDir serfFlags) + liftIO (Serf.collectFX serf log) + + serfFlags :: Serf.Flags + serfFlags = [Serf.Hashless, Serf.DryRun] -------------------------------------------------------------------------------- @@ -288,9 +296,10 @@ runShip (CLI.Run pierPath) _ = tryPlayShip pierPath main :: IO () main = CLI.parseArgs >>= \case - CLI.CmdRun r o -> runShip r o - CLI.CmdNew n o -> newShip n o - CLI.CmdVal pil -> validatePill pil + CLI.CmdRun r o -> runShip r o + CLI.CmdNew n o -> newShip n o + CLI.CmdBug (CLI.CollectAllFX pax) -> collectAllFx pax + CLI.CmdBug (CLI.ValidatePill pax) -> print ("validate-pill", pax) validatePill :: FilePath -> IO () validatePill = const (pure ()) diff --git a/pkg/king/lib/Vere/Log.hs b/pkg/king/lib/Vere/Log.hs index 6ce8a58881..b3cecb223d 100644 --- a/pkg/king/lib/Vere/Log.hs +++ b/pkg/king/lib/Vere/Log.hs @@ -65,6 +65,7 @@ instance Exception EventLogExn where rawOpen :: FilePath -> IO Env rawOpen dir = do + putStrLn $ pack ("PAX: " <> dir) env <- mdb_env_create mdb_env_set_maxdbs env 3 mdb_env_set_mapsize env (40 * 1024 * 1024 * 1024) @@ -94,7 +95,7 @@ open dir = do EventLog env m e f id <$> newIORef numEvs where openTables env = - with (openTxn env) $ \txn -> + with (writeTxn env) $ \txn -> (,,) <$> mdb_dbi_open txn (Just "META") [] <*> mdb_dbi_open txn (Just "EVENTS") [MDB_INTEGERKEY] <*> mdb_dbi_open txn (Just "EFFECTS") [MDB_CREATE, MDB_INTEGERKEY] @@ -119,18 +120,34 @@ new dir id = mkAcquire (create dir id) close -- Read/Write Log Identity ----------------------------------------------------- -openTxn :: Env -> Acquire Txn -openTxn env = mkAcquire begin commit +{- + A read-only transaction that commits at the end. + + Use this when opening database handles. +-} +_openTxn :: Env -> Acquire Txn +_openTxn env = mkAcquire begin commit where begin = mdb_txn_begin env Nothing True commit = mdb_txn_commit +{- + A read-only transaction that aborts at the end. + + Use this when reading data from already-opened databases. +-} readTxn :: Env -> Acquire Txn readTxn env = mkAcquire begin abort where begin = mdb_txn_begin env Nothing True abort = mdb_txn_abort +{- + A read-write transaction that commits upon sucessful completion and + aborts on exception. + + Use this when reading data from already-opened databases. +-} writeTxn :: Env -> Acquire Txn writeTxn env = mkAcquireType begin finalize where diff --git a/pkg/king/lib/Vere/Serf.hs b/pkg/king/lib/Vere/Serf.hs index fd1b0d22ce..95055114d7 100644 --- a/pkg/king/lib/Vere/Serf.hs +++ b/pkg/king/lib/Vere/Serf.hs @@ -447,21 +447,20 @@ collectFX :: Serf -> Log.EventLog -> IO () collectFX serf log = do ss <- handshake serf (Log.identity log) - let pax = "/home/benjamin/testnet-zod-fx" - - createDirectoryIfMissing True pax - runConduit $ Log.streamEvents log (ssNextEv ss) .| toJobs (Log.identity log) (ssNextEv ss) .| doCollectFX serf ss - .| persistFX pax + .| persistFX log -persistFX :: FilePath -> ConduitT (EventId, FX) Void IO () -persistFX pax = await >>= \case - Nothing -> pure () - Just (eId, fx) -> do - writeFile (pax <> "/" <> show eId) (jamBS $ toNoun fx) - persistFX pax +persistFX :: Log.EventLog -> ConduitT (EventId, FX) Void IO () +persistFX log = loop + where + loop = await >>= \case + Nothing -> pure () + Just (eId, fx) -> do + liftIO $ Log.writeEffectsRow log eId (jamBS $ toNoun fx) + putStr "." + loop doCollectFX :: Serf -> SerfState -> ConduitT Job (EventId, FX) IO () doCollectFX serf = go @@ -470,7 +469,7 @@ doCollectFX serf = go go ss = await >>= \case Nothing -> pure () Just jb -> do - jb <- pure $ replaceMug jb (ssLastMug ss) + -- jb <- pure $ replaceMug jb (ssLastMug ss) (_, ss, fx) <- liftIO (doJob serf jb) liftIO $ print (jobId jb) yield (jobId jb, fx)