From 360d1663448a29f6af7137da32de3ff968dc9fb6 Mon Sep 17 00:00:00 2001 From: Benjamin Summers Date: Wed, 28 Aug 2019 04:00:26 -0700 Subject: [PATCH 1/5] Started RIO's logging system instead of printfs. --- pkg/king/app/Main.hs | 273 +++++++++++++++++++++------------- pkg/king/lib/Data/RAcquire.hs | 136 +++++++++++++++++ pkg/king/lib/Vere/Log.hs | 10 +- pkg/king/package.yaml | 4 +- 4 files changed, 318 insertions(+), 105 deletions(-) create mode 100644 pkg/king/lib/Data/RAcquire.hs diff --git a/pkg/king/app/Main.hs b/pkg/king/app/Main.hs index e4125f9b4..64cdecc42 100644 --- a/pkg/king/app/Main.hs +++ b/pkg/king/app/Main.hs @@ -105,10 +105,9 @@ module Main where -import ClassyPrelude +import UrbitPrelude -import Options.Applicative -import Options.Applicative.Help.Pretty +import Data.RAcquire import Arvo import Control.Exception hiding (evaluate, throwIO) @@ -120,7 +119,7 @@ import Vere.Pier import Vere.Pier.Types import Vere.Serf -import Control.Concurrent (runInBoundThread, threadDelay) +import Control.Concurrent (runInBoundThread) import Control.Lens ((&)) import System.Directory (doesFileExist, removeFile) import System.Environment (getProgName) @@ -133,6 +132,80 @@ import qualified Vere.Log as Log import qualified Vere.Pier as Pier import qualified Vere.Serf as Serf +import RIO (RIO, runRIO) +import RIO (Utf8Builder, display, displayShow) +import RIO (threadDelay) + +import RIO ( HasLogFunc + , LogFunc + , logError + , logInfo + , logWarn + , logDebug + , logOther + , logFuncL + , logOptionsHandle + , withLogFunc + , setLogUseTime + , setLogUseLoc + ) + +-------------------------------------------------------------------------------- + +logTrace :: HasLogFunc e => Utf8Builder -> RIO e () +logTrace = logOther "trace" + +-------------------------------------------------------------------------------- + +class HasAppName env where + appNameL :: Lens' env Utf8Builder + +data App = App + { _appLogFunc :: !LogFunc + , _appName :: !Utf8Builder + } + +makeLenses ''App + +instance HasLogFunc App where + logFuncL = appLogFunc + +instance HasAppName App where + appNameL = appName + +runApp :: RIO App a -> IO a +runApp inner = do + logOptions <- logOptionsHandle stderr True + <&> setLogUseTime True + <&> setLogUseLoc False + + withLogFunc logOptions $ \logFunc -> do + let app = App { _appLogFunc = logFunc + , _appName = "Alice" + } + runRIO app inner + +-------------------------------------------------------------------------------- + +io :: MonadIO m => IO a -> m a +io = liftIO + +rio :: MonadRIO m => RIO e a -> m e a +rio = liftRIO + +-------------------------------------------------------------------------------- + +example :: IO () +example = runApp sayHello + +sayHello :: RIO App () +sayHello = do + name <- view $ to _appName + logDebug $ "Hello, " <> name + logInfo $ "Hello, " <> name + logWarn $ "Hello, " <> name + logError $ "Hello, " <> name + -------------------------------------------------------------------------------- zod :: Ship @@ -140,87 +213,97 @@ zod = 0 -------------------------------------------------------------------------------- -removeFileIfExists :: FilePath -> IO () +removeFileIfExists :: HasLogFunc env => FilePath -> RIO env () removeFileIfExists pax = do - exists <- doesFileExist pax + exists <- io $ doesFileExist pax when exists $ do - removeFile pax - -catchAny :: IO a -> (SomeException -> IO a) -> IO a -catchAny = Control.Exception.catch + io $ removeFile pax -------------------------------------------------------------------------------- -wipeSnapshot :: FilePath -> IO () +wipeSnapshot :: HasLogFunc env => FilePath -> RIO env () wipeSnapshot shipPath = do - putStrLn "wipeSnapshot" - removeFileIfExists (shipPath <> "/.urb/chk/north.bin") - removeFileIfExists (shipPath <> "/.urb/chk/south.bin") - print (shipPath <> "/.urb/chk/north.bin") - print (shipPath <> "/.urb/chk/south.bin") - putStrLn "SNAPSHOT WIPED" + logTrace "wipeSnapshot" + logDebug $ display $ pack @Text ("Wiping " <> north) + logDebug $ display $ pack @Text ("Wiping " <> south) + removeFileIfExists north + removeFileIfExists south + where + north = shipPath <> "/.urb/chk/north.bin" + south = shipPath <> "/.urb/chk/south.bin" -tryBootFromPill :: FilePath -> FilePath -> Ship -> IO () +-------------------------------------------------------------------------------- + +tryBootFromPill :: HasLogFunc e => FilePath -> FilePath -> Ship -> RIO e () tryBootFromPill pillPath shipPath ship = do wipeSnapshot shipPath with (Pier.booted pillPath shipPath [] ship) $ \(serf, log, ss) -> do - print "lul" - print ss - threadDelay 500000 - shutdown serf 0 >>= print - putStrLn "[tryBootFromPill] Booted!" + logTrace "Booting" + logTrace $ displayShow ss + io $ threadDelay 500000 + ss <- io $ shutdown serf 0 + logTrace $ displayShow ss + logTrace "Booted!" +runAcquire :: (MonadUnliftIO m, MonadIO m) + => Acquire a -> m a runAcquire act = with act pure -tryPlayShip :: FilePath -> IO () +runRAcquire :: (MonadUnliftIO (m e), MonadIO (m e), MonadReader e (m e)) + => RAcquire e a -> m e a +runRAcquire act = rwith act pure + +tryPlayShip :: HasLogFunc e => FilePath -> RIO e () tryPlayShip shipPath = do - runAcquire $ do - putStrLn "RESUMING SHIP" - sls <- Pier.resumed shipPath [] - putStrLn "SHIP RESUMED" - Pier.pier shipPath Nothing sls + runRAcquire $ do + liftRIO $ logTrace "RESUMING SHIP" + sls <- liftAcquire $ Pier.resumed shipPath [] + liftRIO $ logTrace "SHIP RESUMED" + liftAcquire $ Pier.pier shipPath Nothing sls -tryResume :: FilePath -> IO () +tryResume :: HasLogFunc e => FilePath -> RIO e () tryResume shipPath = do - with (Pier.resumed shipPath []) $ \(serf, log, ss) -> do - print ss + rwith (liftAcquire $ Pier.resumed shipPath []) $ \(serf, log, ss) -> do + logTrace (displayShow ss) threadDelay 500000 - shutdown serf 0 >>= print - putStrLn "[tryResume] Resumed!" + ss <- io (shutdown serf 0) + logTrace (displayShow ss) + logTrace "Resumed!" -tryFullReplay :: FilePath -> IO () +tryFullReplay :: HasLogFunc e => FilePath -> RIO e () tryFullReplay shipPath = do wipeSnapshot shipPath tryResume shipPath -------------------------------------------------------------------------------- -checkEvs :: FilePath -> Word64 -> Word64 -> IO () +checkEvs :: forall e. HasLogFunc e => FilePath -> Word64 -> Word64 -> RIO e () checkEvs pierPath first last = do - with (Log.existing logPath) $ \log -> do + rwith (liftAcquire $ Log.existing logPath) $ \log -> do let ident = Log.identity log - print ident + logTrace (displayShow ident) runConduit $ Log.streamEvents log first .| showEvents first (fromIntegral $ lifecycleLen ident) where logPath :: FilePath logPath = pierPath <> "/.urb/log" - showEvents :: EventId -> EventId -> ConduitT ByteString Void IO () + showEvents :: EventId -> EventId -> ConduitT ByteString Void (RIO e) () showEvents eId _ | eId > last = pure () showEvents eId cycle = await >>= \case - Nothing -> print "Everything checks out." + Nothing -> lift $ logTrace "Everything checks out." Just bs -> do - liftIO $ do - n <- cueBSExn bs + lift $ do + n <- io $ cueBSExn bs when (eId > cycle) $ do (mug, wen, evNoun) <- unpackJob n - fromNounErr evNoun & either print pure + fromNounErr evNoun & + either (logError . displayShow) pure showEvents (succ eId) cycle - unpackJob :: Noun -> IO (Mug, Wen, Noun) - unpackJob n = fromNounExn n + unpackJob :: Noun -> RIO e (Mug, Wen, Noun) + unpackJob = io . fromNounExn -------------------------------------------------------------------------------- @@ -229,19 +312,19 @@ checkEvs pierPath first last = do so this should never actually be created. We just do this to avoid letting the serf use an existing snapshot. -} -collectAllFx :: FilePath -> IO () +collectAllFx :: ∀e. HasLogFunc e => FilePath -> RIO e () collectAllFx top = do - putStrLn (pack top) - with collectedFX $ \() -> - putStrLn "[collectAllFx] Done collecting effects!" + logTrace $ display $ pack @Text top + rwith collectedFX $ \() -> + logTrace "Done collecting effects!" where tmpDir :: FilePath tmpDir = top <> "/.tmpdir" - collectedFX :: Acquire () + collectedFX :: RAcquire e () collectedFX = do - log <- Log.existing (top <> "/.urb/log") - serf <- Serf.run (Serf.Config tmpDir serfFlags) + log <- liftAcquire $ Log.existing (top <> "/.urb/log") + serf <- liftAcquire $ Serf.run (Serf.Config tmpDir serfFlags) liftIO (Serf.collectFX serf log) serfFlags :: Serf.Flags @@ -249,35 +332,37 @@ collectAllFx top = do -------------------------------------------------------------------------------- -tryDoStuff :: FilePath -> IO () -tryDoStuff shipPath = runInBoundThread $ do - let pillPath = "/home/benjamin/r/urbit/bin/solid.pill" - ship = zod +tryDoStuff :: HasLogFunc e => FilePath -> RIO e () +tryDoStuff shipPath = do + env <- ask + liftIO $ runInBoundThread $ runRIO env $ do + let pillPath = "/home/benjamin/r/urbit/bin/solid.pill" + ship = zod - -- tryResume shipPath - tryPlayShip shipPath - -- tryFullReplay shipPath + -- tryResume shipPath + tryPlayShip shipPath + -- tryFullReplay shipPath - pure () + pure () -------------------------------------------------------------------------------- {- Interesting -} -testPill :: FilePath -> Bool -> Bool -> IO () +testPill :: HasLogFunc e => FilePath -> Bool -> Bool -> RIO e () testPill pax showPil showSeq = do putStrLn "Reading pill file." pillBytes <- readFile pax putStrLn "Cueing pill file." - pillNoun <- cueBS pillBytes & either throwIO pure + pillNoun <- io $ cueBS pillBytes & either throwIO pure putStrLn "Parsing pill file." - pill <- fromNounErr pillNoun & either (throwIO . uncurry ParseErr) pure + pill <- io $ fromNounErr pillNoun & either (throwIO . uncurry ParseErr) pure putStrLn "Using pill to generate boot sequence." - bootSeq <- generateBootSeq zod pill + bootSeq <- io $ generateBootSeq zod pill putStrLn "Validate jam/cue and toNoun/fromNoun on pill value" reJam <- validateNounVal pill @@ -289,13 +374,14 @@ testPill pax showPil showSeq = do when showPil $ do putStrLn "\n\n== Pill ==\n" - pPrint pill + io $ pPrint pill when showSeq $ do putStrLn "\n\n== Boot Sequence ==\n" - pPrint bootSeq + io $ pPrint bootSeq -validateNounVal :: (Eq a, ToNoun a, FromNoun a) => a -> IO ByteString +validateNounVal :: (HasLogFunc e, Eq a, ToNoun a, FromNoun a) + => a -> RIO e ByteString validateNounVal inpVal = do putStrLn " jam" inpByt <- evaluate $ jamBS $ toNoun inpVal @@ -324,41 +410,37 @@ validateNounVal inpVal = do -------------------------------------------------------------------------------- -newShip :: CLI.New -> CLI.Opts -> IO () +newShip :: HasLogFunc e => CLI.New -> CLI.Opts -> RIO e () newShip CLI.New{..} _ = do tryBootFromPill nPillPath pierPath (Ship 0) where pierPath = fromMaybe ("./" <> unpack nShipAddr) nPierPath -runShip :: CLI.Run -> CLI.Opts -> IO () +runShip :: HasLogFunc e => CLI.Run -> CLI.Opts -> RIO e () runShip (CLI.Run pierPath) _ = tryPlayShip pierPath main :: IO () main = CLI.parseArgs >>= \case - CLI.CmdRun r o -> runShip r o - CLI.CmdNew n o -> newShip n o - CLI.CmdBug (CLI.CollectAllFX pax) -> collectAllFx pax - CLI.CmdBug (CLI.ValidatePill pax pil seq) -> testPill pax pil seq - CLI.CmdBug (CLI.ValidateEvents pax f l) -> checkEvs pax f l - CLI.CmdBug (CLI.ValidateFX pax f l) -> checkFx pax f l - - -- tryParseFX "/home/benjamin/zod-fx" 1 100000000 - -- tryParseFX "/home/benjamin/testnet-zod-fx" 1 100000000 - -validatePill :: FilePath -> IO () -validatePill = const (pure ()) + CLI.CmdRun r o -> runApp $ runShip r o + CLI.CmdNew n o -> runApp $ newShip n o + CLI.CmdBug (CLI.CollectAllFX pax) -> runApp $ collectAllFx pax + CLI.CmdBug (CLI.ValidatePill pax pil seq) -> runApp $ testPill pax pil seq + CLI.CmdBug (CLI.ValidateEvents pax f l) -> runApp $ checkEvs pax f l + CLI.CmdBug (CLI.ValidateFX pax f l) -> runApp $ checkFx pax f l -------------------------------------------------------------------------------- -checkFx :: FilePath -> Word64 -> Word64 -> IO () +checkFx :: HasLogFunc e + => FilePath -> Word64 -> Word64 -> RIO e () checkFx pierPath first last = - with (Log.existing logPath) $ \log -> + rwith (liftAcquire $ Log.existing logPath) $ \log -> runConduit $ streamFX log first last .| tryParseFXStream where logPath = pierPath <> "/.urb/log" -streamFX :: Log.EventLog -> Word64 -> Word64 -> ConduitT () ByteString IO () +streamFX :: MonadIO m + => Log.EventLog -> Word64 -> Word64 -> ConduitT () ByteString m () streamFX log first last = do Log.streamEffectsRows log first .| loop where @@ -366,24 +448,15 @@ streamFX log first last = do Just (eId, bs) | eId > last -> pure () Just (eId, bs) -> yield bs >> loop -tryParseFXStream :: ConduitT ByteString Void IO () -tryParseFXStream = loop 0 (mempty :: Set (Text, Noun)) +tryParseFXStream :: HasLogFunc e => ConduitT ByteString Void (RIO e) () +tryParseFXStream = loop where - loop 1 pax = for_ (setToList pax) print - loop errors pax = - await >>= \case - Nothing -> for_ (setToList pax) $ \(t,n) -> - putStrLn (t <> ": " <> tshow n) + loop = await >>= \case + Nothing -> pure () Just bs -> do - n <- liftIO (cueBSExn bs) - fromNounErr n & \case - Left err -> print err >> loop (errors + 1) pax - Right [] -> loop errors pax - Right (fx :: FX) -> do - -- pax <- pure $ Set.union pax - -- $ setFromList - -- $ fx <&> \(Effect p v) -> (getTag v, toNoun p) - loop errors pax + n <- liftIO (cueBSExn bs) + fromNounErr n & either (logError . displayShow) pure + loop {- diff --git a/pkg/king/lib/Data/RAcquire.hs b/pkg/king/lib/Data/RAcquire.hs new file mode 100644 index 000000000..a0ea66c2d --- /dev/null +++ b/pkg/king/lib/Data/RAcquire.hs @@ -0,0 +1,136 @@ +module Data.RAcquire where +{- + ( RAcquire (..) + , Allocated (..) + , with + , mkRAcquire + , ReleaseType (..) + , mkRAcquireType + ) where +-} + +import Prelude + +import qualified Control.Exception as E +import qualified Control.Monad.Catch as C () +import qualified Data.Acquire.Internal as Act + +import Control.Applicative (Applicative(..)) +import Control.Monad (ap, liftM) +import Control.Monad.IO.Unlift (MonadIO(..), MonadUnliftIO, withRunInIO) +import Data.Typeable (Typeable) +import Control.Monad.Reader + +import RIO (RIO, runRIO) + +-------------------------------------------------------------------------------- + +data ReleaseType + = ReleaseEarly + | ReleaseNormal + | ReleaseException + deriving (Show, Read, Eq, Ord, Enum, Bounded, Typeable) + +data Allocated e a + = Allocated !a !(ReleaseType -> RIO e ()) + +newtype RAcquire e a + = RAcquire ((forall b. RIO e b -> RIO e b) -> RIO e (Allocated e a)) + deriving Typeable + +-------------------------------------------------------------------------------- + +class MonadRIO m where + liftRIO :: RIO e a -> m e a + +instance MonadRIO RIO where + liftRIO = id + +class MonadAcquire m where + liftAcquire :: Act.Acquire a -> m a + +-------------------------------------------------------------------------------- + +instance Functor (RAcquire e) where + fmap = liftM + +instance Applicative (RAcquire e) where + pure a = RAcquire (\_ -> return (Allocated a (const $ return ()))) + (<*>) = ap + +instance Monad (RAcquire e) where + return = pure + RAcquire f >>= g' = RAcquire $ \restore -> do + env <- ask + Allocated x free1 <- f restore + let RAcquire g = g' x + Allocated y free2 <- liftIO $ E.onException + (runRIO env $ g restore) + (runRIO env $ free1 ReleaseException) + + return $! Allocated y $ \rt -> + liftIO $ E.finally (runRIO env $ free2 rt) + (runRIO env $ free1 rt) + +instance MonadReader e (RAcquire e) where + ask = liftRIO ask + local mod (RAcquire f) = RAcquire $ \restore -> local mod (f restore) + +-------------------------------------------------------------------------------- + +instance MonadRIO RAcquire where + liftRIO f = RAcquire $ \restore -> do + x <- restore f + return $! Allocated x (const $ return ()) + +instance MonadIO (RAcquire e) where + liftIO = liftRIO . liftIO + +unTransRIO :: e -> (RIO e a -> RIO e a) -> IO a -> IO a +unTransRIO env trans act = runRIO env $ trans $ liftIO act + +instance MonadAcquire (RAcquire e) where + liftAcquire (Act.Acquire f) = do + env <- liftRIO ask + RAcquire $ \restore -> do + fmap fixAllo $ liftIO $ f $ unTransRIO env restore + where + fixAllo (Act.Allocated x y) = Allocated x $ fmap liftIO (y . fixTy) + + fixTy = \case + ReleaseEarly -> Act.ReleaseEarly + ReleaseNormal -> Act.ReleaseNormal + ReleaseException -> Act.ReleaseException + +-------------------------------------------------------------------------------- + +mkRAcquire :: RIO e a + -> (a -> RIO e ()) + -> RAcquire e a +mkRAcquire create free = RAcquire $ \restore -> do + x <- restore create + return $! Allocated x (const $ free x) + +mkRAcquireType + :: RIO e a -- ^ acquire the resource + -> (a -> ReleaseType -> RIO e ()) -- ^ free the resource + -> RAcquire e a +mkRAcquireType create free = RAcquire $ \restore -> do + x <- restore create + return $! Allocated x (free x) + +transRIO :: e -> (IO a -> IO a) -> RIO e a -> RIO e a +transRIO env trans act = liftIO $ trans $ runRIO env act + +rwith :: (MonadUnliftIO (m e), MonadReader e (m e)) + => RAcquire e a + -> (a -> m e b) + -> m e b +rwith (RAcquire f) g = do + env <- ask + withRunInIO $ \run -> E.mask $ \restore -> do + Allocated x free <- runRIO env $ f $ transRIO env restore + res <- E.onException (restore $ run $ g x) + (runRIO env $ free ReleaseException) + runRIO env $ free ReleaseNormal + return res diff --git a/pkg/king/lib/Vere/Log.hs b/pkg/king/lib/Vere/Log.hs index 01261ece5..35c2a684e 100644 --- a/pkg/king/lib/Vere/Log.hs +++ b/pkg/king/lib/Vere/Log.hs @@ -244,8 +244,9 @@ writeEffectsRow log k v = do -------------------------------------------------------------------------------- -- Read Events ----------------------------------------------------------------- -streamEvents :: EventLog -> Word64 - -> ConduitT () ByteString IO () +streamEvents :: MonadIO m + => EventLog -> Word64 + -> ConduitT () ByteString m () streamEvents log first = do last <- liftIO $ lastEv log batch <- liftIO (readBatch log first) @@ -253,8 +254,9 @@ streamEvents log first = do for_ batch yield streamEvents log (first + word (length batch)) -streamEffectsRows :: EventLog -> EventId - -> ConduitT () (Word64, ByteString) IO () +streamEffectsRows :: MonadIO m + => EventLog -> EventId + -> ConduitT () (Word64, ByteString) m () streamEffectsRows log = go where go next = do diff --git a/pkg/king/package.yaml b/pkg/king/package.yaml index d229ae1de..d87f9ef47 100644 --- a/pkg/king/package.yaml +++ b/pkg/king/package.yaml @@ -34,6 +34,7 @@ dependencies: - data-fix - directory - entropy + - exceptions - extra - fixed-vector - flat @@ -51,6 +52,7 @@ dependencies: - mtl - multimap - network + - optparse-applicative - para - pretty-show - primitive @@ -76,13 +78,13 @@ dependencies: - time - transformers - unix + - unliftio-core - unordered-containers - vector - wai - wai-conduit - warp - warp-tls - - optparse-applicative default-extensions: - ApplicativeDo From 524671d310164930bbf71f80556be04a3ca80df4 Mon Sep 17 00:00:00 2001 From: Benjamin Summers Date: Wed, 28 Aug 2019 04:06:48 -0700 Subject: [PATCH 2/5] Minor cleanup. --- pkg/king/app/Main.hs | 46 ++++++++++++-------------------------------- 1 file changed, 12 insertions(+), 34 deletions(-) diff --git a/pkg/king/app/Main.hs b/pkg/king/app/Main.hs index 64cdecc42..b282c9703 100644 --- a/pkg/king/app/Main.hs +++ b/pkg/king/app/Main.hs @@ -103,7 +103,7 @@ - `Trace`: TODO What does this do? -} -module Main where +module Main (main) where import UrbitPrelude @@ -161,9 +161,9 @@ class HasAppName env where appNameL :: Lens' env Utf8Builder data App = App - { _appLogFunc :: !LogFunc - , _appName :: !Utf8Builder - } + { _appLogFunc :: !LogFunc + , _appName :: !Utf8Builder + } makeLenses ''App @@ -200,7 +200,7 @@ example = runApp sayHello sayHello :: RIO App () sayHello = do - name <- view $ to _appName + name <- view appName logDebug $ "Hello, " <> name logInfo $ "Hello, " <> name logWarn $ "Hello, " <> name @@ -332,21 +332,6 @@ collectAllFx top = do -------------------------------------------------------------------------------- -tryDoStuff :: HasLogFunc e => FilePath -> RIO e () -tryDoStuff shipPath = do - env <- ask - liftIO $ runInBoundThread $ runRIO env $ do - let pillPath = "/home/benjamin/r/urbit/bin/solid.pill" - ship = zod - - -- tryResume shipPath - tryPlayShip shipPath - -- tryFullReplay shipPath - - pure () - --------------------------------------------------------------------------------- - {- Interesting -} @@ -420,13 +405,13 @@ runShip :: HasLogFunc e => CLI.Run -> CLI.Opts -> RIO e () runShip (CLI.Run pierPath) _ = tryPlayShip pierPath main :: IO () -main = CLI.parseArgs >>= \case - CLI.CmdRun r o -> runApp $ runShip r o - CLI.CmdNew n o -> runApp $ newShip n o - CLI.CmdBug (CLI.CollectAllFX pax) -> runApp $ collectAllFx pax - CLI.CmdBug (CLI.ValidatePill pax pil seq) -> runApp $ testPill pax pil seq - CLI.CmdBug (CLI.ValidateEvents pax f l) -> runApp $ checkEvs pax f l - CLI.CmdBug (CLI.ValidateFX pax f l) -> runApp $ checkFx pax f l +main = CLI.parseArgs >>= runApp . \case + CLI.CmdRun r o -> runShip r o + CLI.CmdNew n o -> newShip n o + CLI.CmdBug (CLI.CollectAllFX pax) -> collectAllFx pax + CLI.CmdBug (CLI.ValidatePill pax pil seq) -> testPill pax pil seq + CLI.CmdBug (CLI.ValidateEvents pax f l) -> checkEvs pax f l + CLI.CmdBug (CLI.ValidateFX pax f l) -> checkFx pax f l -------------------------------------------------------------------------------- @@ -460,13 +445,6 @@ tryParseFXStream = loop {- -getTag :: Effect -> Text -getTag fx = - let n = toNoun fx - in case n of - A _ -> maybe "ERR" unCord (fromNoun n) - C h _ -> maybe "ERR" unCord (fromNoun h) - tryCopyLog :: IO () tryCopyLog = do let logPath = "/Users/erg/src/urbit/zod/.urb/falselog/" From c1bb26e628cf62bca61c89d0e686a1d686458011 Mon Sep 17 00:00:00 2001 From: Benjamin Summers Date: Wed, 28 Aug 2019 04:45:49 -0700 Subject: [PATCH 3/5] Logging in Vere.Pier --- pkg/king/app/Main.hs | 43 ++------- pkg/king/lib/Noun/Convert.hs | 2 +- pkg/king/lib/Noun/Cue.hs | 4 +- pkg/king/lib/UrbitPrelude.hs | 33 +++++++ pkg/king/lib/Vere/Ames.hs | 2 - pkg/king/lib/Vere/Log.hs | 4 +- pkg/king/lib/Vere/Pier.hs | 167 ++++++++++++++++++++--------------- pkg/king/lib/Vere/Serf.hs | 1 - pkg/king/package.yaml | 1 + 9 files changed, 140 insertions(+), 117 deletions(-) diff --git a/pkg/king/app/Main.hs b/pkg/king/app/Main.hs index b282c9703..380bfc87a 100644 --- a/pkg/king/app/Main.hs +++ b/pkg/king/app/Main.hs @@ -132,29 +132,6 @@ import qualified Vere.Log as Log import qualified Vere.Pier as Pier import qualified Vere.Serf as Serf -import RIO (RIO, runRIO) -import RIO (Utf8Builder, display, displayShow) -import RIO (threadDelay) - -import RIO ( HasLogFunc - , LogFunc - , logError - , logInfo - , logWarn - , logDebug - , logOther - , logFuncL - , logOptionsHandle - , withLogFunc - , setLogUseTime - , setLogUseLoc - ) - --------------------------------------------------------------------------------- - -logTrace :: HasLogFunc e => Utf8Builder -> RIO e () -logTrace = logOther "trace" - -------------------------------------------------------------------------------- class HasAppName env where @@ -187,14 +164,6 @@ runApp inner = do -------------------------------------------------------------------------------- -io :: MonadIO m => IO a -> m a -io = liftIO - -rio :: MonadRIO m => RIO e a -> m e a -rio = liftRIO - --------------------------------------------------------------------------------- - example :: IO () example = runApp sayHello @@ -237,7 +206,7 @@ wipeSnapshot shipPath = do tryBootFromPill :: HasLogFunc e => FilePath -> FilePath -> Ship -> RIO e () tryBootFromPill pillPath shipPath ship = do wipeSnapshot shipPath - with (Pier.booted pillPath shipPath [] ship) $ \(serf, log, ss) -> do + rwith (Pier.booted pillPath shipPath [] ship) $ \(serf, log, ss) -> do logTrace "Booting" logTrace $ displayShow ss io $ threadDelay 500000 @@ -256,10 +225,10 @@ runRAcquire act = rwith act pure tryPlayShip :: HasLogFunc e => FilePath -> RIO e () tryPlayShip shipPath = do runRAcquire $ do - liftRIO $ logTrace "RESUMING SHIP" + rio $ logTrace "RESUMING SHIP" sls <- liftAcquire $ Pier.resumed shipPath [] - liftRIO $ logTrace "SHIP RESUMED" - liftAcquire $ Pier.pier shipPath Nothing sls + rio $ logTrace "SHIP RESUMED" + Pier.pier shipPath Nothing sls tryResume :: HasLogFunc e => FilePath -> RIO e () tryResume shipPath = do @@ -344,10 +313,10 @@ testPill pax showPil showSeq = do pillNoun <- io $ cueBS pillBytes & either throwIO pure putStrLn "Parsing pill file." - pill <- io $ fromNounErr pillNoun & either (throwIO . uncurry ParseErr) pure + pill <- fromNounErr pillNoun & either (throwIO . uncurry ParseErr) pure putStrLn "Using pill to generate boot sequence." - bootSeq <- io $ generateBootSeq zod pill + bootSeq <- generateBootSeq zod pill putStrLn "Validate jam/cue and toNoun/fromNoun on pill value" reJam <- validateNounVal pill diff --git a/pkg/king/lib/Noun/Convert.hs b/pkg/king/lib/Noun/Convert.hs index a3a6d0be0..29d4a7976 100644 --- a/pkg/king/lib/Noun/Convert.hs +++ b/pkg/king/lib/Noun/Convert.hs @@ -178,7 +178,7 @@ instance Show BadNoun where instance Exception BadNoun where -fromNounExn :: FromNoun a => Noun -> IO a +fromNounExn :: MonadIO m => FromNoun a => Noun -> m a fromNounExn n = runParser (parseNoun n) [] onFail onSuccess where onFail p m = throwIO (BadNoun p m) diff --git a/pkg/king/lib/Noun/Cue.hs b/pkg/king/lib/Noun/Cue.hs index 2d6f746b1..4d60cd863 100644 --- a/pkg/king/lib/Noun/Cue.hs +++ b/pkg/king/lib/Noun/Cue.hs @@ -26,7 +26,7 @@ import qualified Data.Vector.Primitive as VP cueBS :: ByteString -> Either DecodeErr Noun cueBS = doGet dNoun -cueBSExn :: ByteString -> IO Noun +cueBSExn :: MonadIO m => ByteString -> m Noun cueBSExn bs = cueBS bs & \case Left e -> throwIO e @@ -35,7 +35,7 @@ cueBSExn bs = cue :: Atom -> Either DecodeErr Noun cue = cueBS . view atomBytes -cueExn :: Atom -> IO Noun +cueExn :: MonadIO m => Atom -> m Noun cueExn atm = cueBSExn (atm ^. atomBytes) diff --git a/pkg/king/lib/UrbitPrelude.hs b/pkg/king/lib/UrbitPrelude.hs index 401df5a64..3bb2a35be 100644 --- a/pkg/king/lib/UrbitPrelude.hs +++ b/pkg/king/lib/UrbitPrelude.hs @@ -3,10 +3,14 @@ module UrbitPrelude , module Control.Arrow , module Control.Lens , module Data.Acquire + , module Data.RAcquire , module Data.Void , module Noun , module Text.Show.Pretty , module Text.Printf + , module RIO + , io, rio + , logTrace ) where import ClassyPrelude @@ -16,7 +20,36 @@ import Control.Lens hiding (Index, cons, index, snoc, uncons, unsnoc, (<.>), (<|)) import Control.Arrow ((<<<), (>>>)) +import Data.RAcquire (RAcquire, mkRAcquire, rwith) +import Data.RAcquire (MonadRIO(..), MonadAcquire(..)) import Data.Acquire (Acquire, mkAcquire, with) import Data.Void (Void, absurd) import Text.Printf (printf) import Text.Show.Pretty (pPrint, ppShow) + +import RIO (RIO, runRIO) +import RIO (Utf8Builder, display, displayShow) +import RIO (threadDelay) + +import RIO ( HasLogFunc + , LogFunc + , logError + , logInfo + , logWarn + , logDebug + , logOther + , logFuncL + , logOptionsHandle + , withLogFunc + , setLogUseTime + , setLogUseLoc + ) + +io :: MonadIO m => IO a -> m a +io = liftIO + +rio :: MonadRIO m => RIO e a -> m e a +rio = liftRIO + +logTrace :: HasLogFunc e => Utf8Builder -> RIO e () +logTrace = logOther "trace" diff --git a/pkg/king/lib/Vere/Ames.hs b/pkg/king/lib/Vere/Ames.hs index 07d23eb36..fd3021333 100644 --- a/pkg/king/lib/Vere/Ames.hs +++ b/pkg/king/lib/Vere/Ames.hs @@ -7,8 +7,6 @@ import Network.Socket hiding (recvFrom, sendTo) import Network.Socket.ByteString import Vere.Pier.Types -import Control.Concurrent (threadDelay) - import qualified Urbit.Time as Time diff --git a/pkg/king/lib/Vere/Log.hs b/pkg/king/lib/Vere/Log.hs index 35c2a684e..e16be6aee 100644 --- a/pkg/king/lib/Vere/Log.hs +++ b/pkg/king/lib/Vere/Log.hs @@ -8,13 +8,13 @@ module Vere.Log ( EventLog, identity, nextEv , streamEffectsRows, writeEffectsRow ) where -import ClassyPrelude hiding (init) +import UrbitPrelude hiding (init) + import Data.Acquire import Data.Conduit import Database.LMDB.Raw import Foreign.Marshal.Alloc import Foreign.Ptr -import Noun import Vere.Pier.Types import Foreign.Storable (peek, poke, sizeOf) diff --git a/pkg/king/lib/Vere/Pier.hs b/pkg/king/lib/Vere/Pier.hs index 6c8a59b73..4d3b6590f 100644 --- a/pkg/king/lib/Vere/Pier.hs +++ b/pkg/king/lib/Vere/Pier.hs @@ -28,20 +28,20 @@ import qualified Vere.Serf as Serf _ioDrivers = [] :: [IODriver] -setupPierDirectory :: FilePath -> IO () +setupPierDirectory :: FilePath -> RIO e () setupPierDirectory shipPath = do for_ ["put", "get", "log", "chk"] $ \seg -> do - let pax = shipPath <> "/.urb/" <> seg - createDirectoryIfMissing True pax - setFileMode pax ownerModes + let pax = shipPath <> "/.urb/" <> seg + io $ createDirectoryIfMissing True pax + io $ setFileMode pax ownerModes -- Load pill into boot sequence. ----------------------------------------------- -genEntropy :: IO Word512 -genEntropy = fromIntegral . view (from atomBytes) <$> Ent.getEntropy 64 +genEntropy :: RIO e Word512 +genEntropy = fromIntegral . view (from atomBytes) <$> io (Ent.getEntropy 64) -generateBootSeq :: Ship -> Pill -> IO BootSeq +generateBootSeq :: Ship -> Pill -> RIO e BootSeq generateBootSeq ship Pill{..} = do ent <- genEntropy let ovums = preKern ent <> pKernelOvums <> pUserspaceOvums @@ -56,15 +56,16 @@ generateBootSeq ship Pill{..} = do -- Write a batch of jobs into the event log ------------------------------------ -writeJobs :: EventLog -> Vector Job -> IO () +writeJobs :: EventLog -> Vector Job -> RIO e () writeJobs log !jobs = do - expect <- Log.nextEv log + expect <- io $ Log.nextEv log events <- fmap fromList $ traverse fromJob (zip [expect..] $ toList jobs) - Log.appendEvents log events + io $ Log.appendEvents log events where - fromJob :: (EventId, Job) -> IO ByteString + fromJob :: (EventId, Job) -> RIO e ByteString fromJob (expectedId, job) = do - guard (expectedId == jobId job) + unless (expectedId == jobId job) $ + error $ show ("bad job id!", expectedId, jobId job) pure $ jamBS $ jobPayload job jobPayload :: Job -> Noun @@ -74,38 +75,39 @@ writeJobs log !jobs = do -- Boot a new ship. ------------------------------------------------------------ -booted :: FilePath -> FilePath -> Serf.Flags -> Ship - -> Acquire (Serf, EventLog, SerfState) +booted :: HasLogFunc e + => FilePath -> FilePath -> Serf.Flags -> Ship + -> RAcquire e (Serf, EventLog, SerfState) booted pillPath pierPath flags ship = do - putStrLn "LOADING PILL" + rio $ logTrace "LOADING PILL" - pill <- liftIO (loadFile pillPath >>= either throwIO pure) + pill <- io (loadFile pillPath >>= either throwIO pure) - putStrLn "PILL LOADED" + rio $ logTrace "PILL LOADED" - seq@(BootSeq ident x y) <- liftIO $ generateBootSeq ship pill + seq@(BootSeq ident x y) <- rio $ generateBootSeq ship pill - putStrLn "BootSeq Computed" + rio $ logTrace "BootSeq Computed" - liftIO (setupPierDirectory pierPath) + liftRIO (setupPierDirectory pierPath) - putStrLn "Directory Setup" + rio $ logTrace "Directory Setup" - log <- Log.new (pierPath <> "/.urb/log") ident + log <- liftAcquire $ Log.new (pierPath <> "/.urb/log") ident - putStrLn "Event Log Initialized" + rio $ logTrace "Event Log Initialized" - serf <- Serf.run (Serf.Config pierPath flags) + serf <- liftAcquire $ Serf.run (Serf.Config pierPath flags) - putStrLn "Serf Started" + rio $ logTrace "Serf Started" - liftIO $ do - (events, serfSt) <- Serf.bootFromSeq serf seq - putStrLn "Boot Sequence completed" - Serf.snapshot serf serfSt - putStrLn "Snapshot taken" + rio $ do + (events, serfSt) <- io $ Serf.bootFromSeq serf seq + logTrace "Boot Sequence completed" + io $ Serf.snapshot serf serfSt + logTrace "Snapshot taken" writeJobs log (fromList events) - putStrLn "Events written" + logTrace "Events written" pure (serf, log, serfSt) @@ -116,32 +118,33 @@ resumed :: FilePath -> Serf.Flags resumed top flags = do log <- Log.existing (top <> "/.urb/log") serf <- Serf.run (Serf.Config top flags) - serfSt <- liftIO (Serf.replay serf log) + serfSt <- io (Serf.replay serf log) - liftIO (Serf.snapshot serf serfSt) + io (Serf.snapshot serf serfSt) pure (serf, log, serfSt) -- Run Pier -------------------------------------------------------------------- -pier :: FilePath +pier :: ∀e. HasLogFunc e + => FilePath -> Maybe Port -> (Serf, EventLog, SerfState) - -> Acquire () + -> RAcquire e () pier pierPath mPort (serf, log, ss) = do - computeQ <- newTQueueIO :: Acquire (TQueue Ev) - persistQ <- newTQueueIO :: Acquire (TQueue (Job, FX)) - executeQ <- newTQueueIO :: Acquire (TQueue FX) + computeQ <- newTQueueIO :: RAcquire e (TQueue Ev) + persistQ <- newTQueueIO :: RAcquire e (TQueue (Job, FX)) + executeQ <- newTQueueIO :: RAcquire e (TQueue FX) - inst <- liftIO (KingId . UV . fromIntegral <$> randomIO @Word16) + inst <- io (KingId . UV . fromIntegral <$> randomIO @Word16) let ship = who (Log.identity log) let (bootEvents, startDrivers) = drivers pierPath inst ship mPort (writeTQueue computeQ) - liftIO $ atomically $ for_ bootEvents (writeTQueue computeQ) + io $ atomically $ for_ bootEvents (writeTQueue computeQ) tExe <- startDrivers >>= router (readTQueue executeQ) tDisk <- runPersist log persistQ (writeTQueue executeQ) @@ -155,8 +158,8 @@ pier pierPath mPort (serf, log, ss) = do ] atomically ded >>= \case - Left (txt, exn) -> print ("Somthing died", txt, exn) - Right tag -> print ("something simply exited", tag) + Left (txt, exn) -> logError $ displayShow ("Somthing died", txt, exn) + Right tag -> logError $ displayShow ("something simply exited", tag) death :: Text -> Async () -> STM (Either (Text, SomeException) Text) death tag tid = do @@ -181,9 +184,9 @@ drivers :: FilePath -> Ship -> Maybe Port -> (Ev -> STM ()) - -> ([Ev], Acquire Drivers) + -> ([Ev], RAcquire e Drivers) drivers pierPath inst who mPort plan = - (initialEvents, runDrivers) + (initialEvents, liftAcquire runDrivers) where (behnBorn, runBehn) = behn inst plan (amesBorn, runAmes) = ames inst who mPort plan @@ -202,44 +205,64 @@ drivers pierPath inst who mPort plan = -- Route Effects to Drivers ---------------------------------------------------- -router :: STM FX -> Drivers -> Acquire (Async ()) -router waitFx Drivers{..} = mkAcquire start cancel +router :: HasLogFunc e => STM FX -> Drivers -> RAcquire e (Async ()) +router waitFx Drivers{..} = + mkRAcquire start cancel where start = async $ forever $ do fx <- atomically waitFx for_ fx $ \ef -> do - putStrLn ("[EFFECT]\n" <> pack (ppShow ef) <> "\n\n") + logEffect ef case ef of GoodParse (EfVega _ _) -> error "TODO" GoodParse (EfExit _ _) -> error "TODO" - GoodParse (EfVane (VEAmes ef)) -> dAmes ef - GoodParse (EfVane (VEBehn ef)) -> dBehn ef - GoodParse (EfVane (VEBoat ef)) -> dSync ef - GoodParse (EfVane (VEClay ef)) -> dSync ef - GoodParse (EfVane (VEHttpClient ef)) -> dHttpClient ef - GoodParse (EfVane (VEHttpServer ef)) -> dHttpServer ef - GoodParse (EfVane (VENewt ef)) -> dNewt ef - GoodParse (EfVane (VESync ef)) -> dSync ef - GoodParse (EfVane (VETerm ef)) -> dTerm ef - FailParse n -> pPrint n + GoodParse (EfVane (VEAmes ef)) -> io $ dAmes ef + GoodParse (EfVane (VEBehn ef)) -> io $ dBehn ef + GoodParse (EfVane (VEBoat ef)) -> io $ dSync ef + GoodParse (EfVane (VEClay ef)) -> io $ dSync ef + GoodParse (EfVane (VEHttpClient ef)) -> io $ dHttpClient ef + GoodParse (EfVane (VEHttpServer ef)) -> io $ dHttpServer ef + GoodParse (EfVane (VENewt ef)) -> io $ dNewt ef + GoodParse (EfVane (VESync ef)) -> io $ dSync ef + GoodParse (EfVane (VETerm ef)) -> io $ dTerm ef + FailParse n -> logError + $ display + $ pack @Text (ppShow n) -- Compute Thread -------------------------------------------------------------- -runCompute :: Serf -> SerfState -> STM Ev -> ((Job, FX) -> STM ()) - -> Acquire (Async ()) -runCompute serf ss getEvent putResult = - mkAcquire (async (go ss)) cancel +logEvent :: HasLogFunc e => Ev -> RIO e () +logEvent ev = + logDebug $ display $ "[EVENT]\n" <> pretty where - go :: SerfState -> IO () + pretty :: Text + pretty = pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow ev + +logEffect :: HasLogFunc e => Lenient Ef -> RIO e () +logEffect ef = + logDebug $ display $ "[EFFECT]\n" <> pretty ef + where + pretty :: Lenient Ef -> Text + pretty = \case + GoodParse e -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow e + FailParse n -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow n + +runCompute :: ∀e. HasLogFunc e + => Serf -> SerfState -> STM Ev -> ((Job, FX) -> STM ()) + -> RAcquire e (Async ()) +runCompute serf ss getEvent putResult = + mkRAcquire (async (go ss)) cancel + where + go :: SerfState -> RIO e () go ss = do ev <- atomically getEvent - putStrLn ("[EVENT]\n" <> pack (ppShow ev) <> "\n\n") - wen <- Time.now + logEvent ev + wen <- io Time.now eId <- pure (ssNextEv ss) mug <- pure (ssLastMug ss) - (job', ss', fx) <- doJob serf (DoWork (Work eId mug wen ev)) + (job', ss', fx) <- io $ doJob serf $ DoWork $ Work eId mug wen ev atomically (putResult (job', fx)) go ss' @@ -258,23 +281,23 @@ instance Exception PersistExn where runPersist :: EventLog -> TQueue (Job, FX) -> (FX -> STM ()) - -> Acquire (Async ()) + -> RAcquire e (Async ()) runPersist log inpQ out = - mkAcquire runThread cancelWait + mkRAcquire runThread cancelWait where - cancelWait :: Async () -> IO () + cancelWait :: Async () -> RIO e () cancelWait tid = cancel tid >> wait tid - runThread :: IO (Async ()) + runThread :: RIO e (Async ()) runThread = asyncBound $ forever $ do writs <- atomically getBatchFromQueue events <- validateJobsAndGetBytes (toNullable writs) - Log.appendEvents log events + io $ Log.appendEvents log events atomically $ for_ writs $ \(_,fx) -> out fx - validateJobsAndGetBytes :: [(Job, FX)] -> IO (Vector ByteString) + validateJobsAndGetBytes :: [(Job, FX)] -> RIO e (Vector ByteString) validateJobsAndGetBytes writs = do - expect <- Log.nextEv log + expect <- io $ Log.nextEv log fmap fromList $ for (zip [expect..] writs) $ \(expectedId, (j, fx)) -> do diff --git a/pkg/king/lib/Vere/Serf.hs b/pkg/king/lib/Vere/Serf.hs index 95055114d..a1b39cb0e 100644 --- a/pkg/king/lib/Vere/Serf.hs +++ b/pkg/king/lib/Vere/Serf.hs @@ -22,7 +22,6 @@ import System.Process import Vere.Pier.Types import Data.Bits (setBit) -import Control.Concurrent (threadDelay) import Data.ByteString (hGet) import Data.ByteString.Unsafe (unsafeUseAsCString) import Foreign.Marshal.Alloc (alloca) diff --git a/pkg/king/package.yaml b/pkg/king/package.yaml index d87f9ef47..7d16d7367 100644 --- a/pkg/king/package.yaml +++ b/pkg/king/package.yaml @@ -78,6 +78,7 @@ dependencies: - time - transformers - unix + - unliftio - unliftio-core - unordered-containers - vector From 062c71b57d0f6f78d535053de6d8b66dc6483e55 Mon Sep 17 00:00:00 2001 From: Benjamin Summers Date: Wed, 28 Aug 2019 05:22:56 -0700 Subject: [PATCH 4/5] Logging in Vere.Serf --- pkg/king/app/Main.hs | 12 +-- pkg/king/lib/Vere/Pier.hs | 19 ++-- pkg/king/lib/Vere/Serf.hs | 185 +++++++++++++++++++------------------- 3 files changed, 110 insertions(+), 106 deletions(-) diff --git a/pkg/king/app/Main.hs b/pkg/king/app/Main.hs index 380bfc87a..cca3e0e11 100644 --- a/pkg/king/app/Main.hs +++ b/pkg/king/app/Main.hs @@ -210,7 +210,7 @@ tryBootFromPill pillPath shipPath ship = do logTrace "Booting" logTrace $ displayShow ss io $ threadDelay 500000 - ss <- io $ shutdown serf 0 + ss <- shutdown serf 0 logTrace $ displayShow ss logTrace "Booted!" @@ -226,16 +226,16 @@ tryPlayShip :: HasLogFunc e => FilePath -> RIO e () tryPlayShip shipPath = do runRAcquire $ do rio $ logTrace "RESUMING SHIP" - sls <- liftAcquire $ Pier.resumed shipPath [] + sls <- Pier.resumed shipPath [] rio $ logTrace "SHIP RESUMED" Pier.pier shipPath Nothing sls tryResume :: HasLogFunc e => FilePath -> RIO e () tryResume shipPath = do - rwith (liftAcquire $ Pier.resumed shipPath []) $ \(serf, log, ss) -> do + rwith (Pier.resumed shipPath []) $ \(serf, log, ss) -> do logTrace (displayShow ss) threadDelay 500000 - ss <- io (shutdown serf 0) + ss <- shutdown serf 0 logTrace (displayShow ss) logTrace "Resumed!" @@ -293,8 +293,8 @@ collectAllFx top = do collectedFX :: RAcquire e () collectedFX = do log <- liftAcquire $ Log.existing (top <> "/.urb/log") - serf <- liftAcquire $ Serf.run (Serf.Config tmpDir serfFlags) - liftIO (Serf.collectFX serf log) + serf <- Serf.run (Serf.Config tmpDir serfFlags) + rio $ Serf.collectFX serf log serfFlags :: Serf.Flags serfFlags = [Serf.Hashless, Serf.DryRun] diff --git a/pkg/king/lib/Vere/Pier.hs b/pkg/king/lib/Vere/Pier.hs index 4d3b6590f..dd506eaa7 100644 --- a/pkg/king/lib/Vere/Pier.hs +++ b/pkg/king/lib/Vere/Pier.hs @@ -97,14 +97,14 @@ booted pillPath pierPath flags ship = do rio $ logTrace "Event Log Initialized" - serf <- liftAcquire $ Serf.run (Serf.Config pierPath flags) + serf <- Serf.run (Serf.Config pierPath flags) rio $ logTrace "Serf Started" rio $ do - (events, serfSt) <- io $ Serf.bootFromSeq serf seq + (events, serfSt) <- Serf.bootFromSeq serf seq logTrace "Boot Sequence completed" - io $ Serf.snapshot serf serfSt + Serf.snapshot serf serfSt logTrace "Snapshot taken" writeJobs log (fromList events) logTrace "Events written" @@ -113,14 +113,15 @@ booted pillPath pierPath flags ship = do -- Resume an existing ship. ---------------------------------------------------- -resumed :: FilePath -> Serf.Flags - -> Acquire (Serf, EventLog, SerfState) +resumed :: HasLogFunc e + => FilePath -> Serf.Flags + -> RAcquire e (Serf, EventLog, SerfState) resumed top flags = do - log <- Log.existing (top <> "/.urb/log") + log <- liftAcquire $ Log.existing (top <> "/.urb/log") serf <- Serf.run (Serf.Config top flags) - serfSt <- io (Serf.replay serf log) + serfSt <- rio $ Serf.replay serf log - io (Serf.snapshot serf serfSt) + rio $ Serf.snapshot serf serfSt pure (serf, log, serfSt) @@ -262,7 +263,7 @@ runCompute serf ss getEvent putResult = eId <- pure (ssNextEv ss) mug <- pure (ssLastMug ss) - (job', ss', fx) <- io $ doJob serf $ DoWork $ Work eId mug wen ev + (job', ss', fx) <- doJob serf $ DoWork $ Work eId mug wen ev atomically (putResult (job', fx)) go ss' diff --git a/pkg/king/lib/Vere/Serf.hs b/pkg/king/lib/Vere/Serf.hs index a1b39cb0e..21aa6068f 100644 --- a/pkg/king/lib/Vere/Serf.hs +++ b/pkg/king/lib/Vere/Serf.hs @@ -60,9 +60,8 @@ compileFlags = foldl' (\acc flag -> setBit acc (fromEnum flag)) 0 data Config = Config FilePath [Flag] deriving (Show) -debug _msg = pure () -- putStrLn ("[DEBUG]\t" <> msg) - -serf msg = putStrLn ("[SERF]\t" <> msg) +serf :: HasLogFunc e => Text -> RIO e () +serf msg = logInfo $ display ("SERF: " <> msg) -- Types ----------------------------------------------------------------------- @@ -97,8 +96,6 @@ data Plea | PSlog EventId Word32 Tank deriving (Eq, Show) -type GetJobs = EventId -> Word64 -> IO (Vector Job) - type ReplacementEv = Job type WorkResult = (SerfState, FX) type SerfResp = Either ReplacementEv WorkResult @@ -128,28 +125,32 @@ deriveNoun ''Plea -- Utils ----------------------------------------------------------------------- -printTank :: Word32 -> Tank -> IO () +printTank :: HasLogFunc e => Word32 -> Tank -> RIO e () printTank _pri tank = (serf . unlines . fmap unTape . wash (WashCfg 0 80)) tank -guardExn :: Exception e => Bool -> e -> IO () -guardExn ok = unless ok . throwIO +guardExn :: (Exception e, MonadIO m) => Bool -> e -> m () +guardExn ok = io . unless ok . throwIO -fromRightExn :: Exception e => Either a b -> (a -> e) -> IO b +fromRightExn :: (Exception e, MonadIO m) => Either a b -> (a -> e) -> m b fromRightExn (Left m) exn = throwIO (exn m) fromRightExn (Right x) _ = pure x -- Process Management ---------------------------------------------------------- -run :: Config -> Acquire Serf -run config = mkAcquire (startUp config) tearDown +run :: HasLogFunc e => Config -> RAcquire e Serf +run config = mkRAcquire (startUp config) tearDown -startUp :: Config -> IO Serf +startUp :: HasLogFunc e => Config -> RIO e Serf startUp conf@(Config pierPath flags) = do - debug "STARTING SERF" - debug (tshow conf) - (Just i, Just o, Just e, p) <- createProcess pSpec + logTrace "STARTING SERF" + logTrace (displayShow conf) + + (i, o, e, p) <- io $ do + (Just i, Just o, Just e, p) <- createProcess pSpec + pure (i, o, e, p) + ss <- newEmptyMVar et <- async (readStdErr e) pure (Serf i o et p ss) @@ -163,28 +164,30 @@ startUp conf@(Config pierPath flags) = do , std_err = CreatePipe } -readStdErr :: Handle -> IO () +readStdErr :: ∀e. HasLogFunc e => Handle -> RIO e () readStdErr h = untilEOFExn $ do - ln <- IO.hGetLine h + ln <- io $ IO.hGetLine h serf ("[stderr] " <> T.strip (pack ln)) where eofMsg = "[Serf.readStdErr] serf stderr closed" - untilEOFExn :: IO () -> IO () + untilEOFExn :: RIO e () -> RIO e () untilEOFExn act = loop where + loop :: RIO e () loop = do - IO.tryIOError act >>= \case - Left exn | IO.isEOFError exn -> do debug eofMsg - pure () - Left exn -> IO.ioError exn - Right () -> loop + env <- ask + res <- io $ IO.tryIOError $ runRIO env act + case res of + Left exn | IO.isEOFError exn -> logDebug eofMsg + Left exn -> io (IO.ioError exn) + Right () -> loop -tearDown :: Serf -> IO () +tearDown :: Serf -> RIO e () tearDown serf = do - terminateProcess (process serf) - void (waitForExit serf) + io $ terminateProcess (process serf) + void $ waitForExit serf -- race_ waitThenKill (shutdownAndWait serf 0) where @@ -196,50 +199,49 @@ tearDown serf = do -- debug killedMsg -- terminateProcess (process serf) -waitForExit :: Serf -> IO ExitCode -waitForExit serf = waitForProcess (process serf) +waitForExit :: Serf -> RIO e ExitCode +waitForExit = io . waitForProcess . process -kill :: Serf -> IO ExitCode -kill serf = terminateProcess (process serf) >> waitForExit serf +kill :: Serf -> RIO e ExitCode +kill serf = io (terminateProcess $ process serf) >> waitForExit serf -{- -shutdownAndWait :: Serf -> Word8 -> IO ExitCode -shutdownAndWait serf code = do - shutdown serf code - waitForExit serf --} +_shutdownAndWait :: HasLogFunc e => Serf -> Word8 -> RIO e ExitCode +_shutdownAndWait serf code = do + shutdown serf code + waitForExit serf -- Basic Send and Receive Operations ------------------------------------------- -withWord64AsByteString :: Word64 -> (ByteString -> IO a) -> IO a +withWord64AsByteString :: Word64 -> (ByteString -> RIO e a) -> RIO e a withWord64AsByteString w k = do - alloca $ \wp -> do - poke wp w - bs <- BS.unsafePackCStringLen (castPtr wp, 8) - k bs + env <- ask + io $ alloca $ \wp -> do + poke wp w + bs <- BS.unsafePackCStringLen (castPtr wp, 8) + runRIO env (k bs) -sendLen :: Serf -> Int -> IO () +sendLen :: Serf -> Int -> RIO e () sendLen s i = do w <- evaluate (fromIntegral i :: Word64) withWord64AsByteString (fromIntegral i) (hPut (sendHandle s)) -sendOrder :: Serf -> Order -> IO () +sendOrder :: HasLogFunc e => Serf -> Order -> RIO e () sendOrder w o = do - debug ("[Serf.sendOrder.toNoun] " <> tshow o) + logDebug $ display ("[Serf.sendOrder.toNoun] " <> tshow o) n <- evaluate (toNoun o) case o of - OWork (DoWork (Work _ _ _ e)) -> do print (toNoun (e :: Ev)) + OWork (DoWork (Work _ _ _ e)) -> do logTrace $ displayShow $ toNoun (e::Ev) _ -> do pure () - debug ("[Serf.sendOrder.jam]") + logDebug "[Serf.sendOrder.jam]" bs <- evaluate (jamBS n) - debug ("[Serf.sendOrder.send]: " <> tshow (length bs)) + logDebug $ display ("[Serf.sendOrder.send]: " <> tshow (length bs)) sendBytes w bs - debug ("[Serf.sendOrder.sent]") + logDebug "[Serf.sendOrder.sent]" -sendBytes :: Serf -> ByteString -> IO () +sendBytes :: Serf -> ByteString -> RIO e () sendBytes s bs = handle ioErr $ do sendLen s (length bs) hFlush (sendHandle s) @@ -252,24 +254,24 @@ sendBytes s bs = handle ioErr $ do hack where - ioErr :: IOError -> IO () + ioErr :: IOError -> RIO e () ioErr _ = throwIO SerfConnectionClosed -- TODO WHY DOES THIS MATTER????? hack = threadDelay 10000 -recvLen :: Serf -> IO Word64 -recvLen w = do +recvLen :: MonadIO m => Serf -> m Word64 +recvLen w = io $ do bs <- hGet (recvHandle w) 8 case length bs of 8 -> unsafeUseAsCString bs (peek . castPtr) _ -> throwIO SerfConnectionClosed -recvBytes :: Serf -> Word64 -> IO ByteString -recvBytes w = do - hGet (recvHandle w) . fromIntegral +recvBytes :: Serf -> Word64 -> RIO e ByteString +recvBytes serf = + io . hGet (recvHandle serf) . fromIntegral -recvAtom :: Serf -> IO Atom +recvAtom :: Serf -> RIO e Atom recvAtom w = do len <- recvLen w bs <- recvBytes w len @@ -284,20 +286,20 @@ cordText = T.strip . unCord -------------------------------------------------------------------------------- -snapshot :: Serf -> SerfState -> IO () +snapshot :: HasLogFunc e => Serf -> SerfState -> RIO e () snapshot serf ss = sendOrder serf $ OSave $ ssLastEv ss -shutdown :: Serf -> Word8 -> IO () +shutdown :: HasLogFunc e => Serf -> Word8 -> RIO e () shutdown serf code = sendOrder serf (OExit code) {- TODO Find a cleaner way to handle `PStdr` Pleas. -} -recvPlea :: Serf -> IO Plea +recvPlea :: HasLogFunc e => Serf -> RIO e Plea recvPlea w = do - debug ("[Vere.Serf.recvPlea] waiting") + logDebug "[Vere.Serf.recvPlea] waiting" a <- recvAtom w - debug ("[Vere.Serf.recvPlea] got atom") + logDebug "[Vere.Serf.recvPlea] got atom" n <- fromRightExn (cue a) (const $ BadPleaAtom a) p <- fromRightExn (fromNounErr n) (\(p,m) -> BadPleaNoun (traceShowId n) p m) @@ -305,13 +307,13 @@ recvPlea w = do recvPlea w PSlog _ pri t -> do printTank pri t recvPlea w - _ -> do debug ("[Serf.recvPlea] Got " <> tshow p) + _ -> do logTrace $ display ("recvPlea got: " <> tshow p) pure p {- Waits for initial plea, and then sends boot IPC if necessary. -} -handshake :: Serf -> LogIdentity -> IO SerfState +handshake :: HasLogFunc e => Serf -> LogIdentity -> RIO e SerfState handshake serf ident = do ss@SerfState{..} <- recvPlea serf >>= \case PPlay Nothing -> pure $ SerfState 1 (Mug 0) @@ -323,27 +325,27 @@ handshake serf ident = do pure ss -sendWork :: Serf -> Job -> IO SerfResp +sendWork :: ∀e. HasLogFunc e => Serf -> Job -> RIO e SerfResp sendWork w job = do sendOrder w (OWork job) res <- loop - debug ("[Vere.Serf.sendWork] Got response") + logTrace ("[sendWork] Got response") pure res where eId = jobId job - produce :: WorkResult -> IO SerfResp + produce :: WorkResult -> RIO e SerfResp produce (ss@SerfState{..}, o) = do guardExn (ssNextEv == (1+eId)) (BadComputeId eId (ss, o)) pure $ Right (ss, o) - replace :: ReplacementEv -> IO SerfResp + replace :: ReplacementEv -> RIO e SerfResp replace job = do guardExn (jobId job == eId) (BadReplacementId eId job) pure (Left job) - loop :: IO SerfResp + loop :: RIO e SerfResp loop = recvPlea w >>= \case PPlay p -> throwIO (UnexpectedPlay eId p) PDone i m o -> produce (SerfState (i+1) m, o) @@ -354,19 +356,19 @@ sendWork w job = -------------------------------------------------------------------------------- -doJob :: Serf -> Job -> IO (Job, SerfState, FX) +doJob :: HasLogFunc e => Serf -> Job -> RIO e (Job, SerfState, FX) doJob serf job = do sendWork serf job >>= \case Left replaced -> doJob serf replaced Right (ss, fx) -> pure (job, ss, fx) -bootJob :: Serf -> Job -> IO (Job, SerfState) +bootJob :: HasLogFunc e => Serf -> Job -> RIO e (Job, SerfState) bootJob serf job = do doJob serf job >>= \case (job, ss, []) -> pure (job, ss) (job, ss, fx) -> throwIO (EffectsDuringBoot (jobId job) fx) -replayJob :: Serf -> Job -> IO SerfState +replayJob :: HasLogFunc e => Serf -> Job -> RIO e SerfState replayJob serf job = do sendWork serf job >>= \case Left replace -> throwIO (ReplacedEventDuringReplay (jobId job) replace) @@ -381,17 +383,17 @@ data BootExn = ShipAlreadyBooted deriving stock (Eq, Ord, Show) deriving anyclass (Exception) -bootFromSeq :: Serf -> BootSeq -> IO ([Job], SerfState) +bootFromSeq :: ∀e. HasLogFunc e => Serf -> BootSeq -> RIO e ([Job], SerfState) bootFromSeq serf (BootSeq ident nocks ovums) = do handshake serf ident >>= \case ss@(SerfState 1 (Mug 0)) -> loop [] ss bootSeqFns _ -> throwIO ShipAlreadyBooted where - loop :: [Job] -> SerfState -> [BootSeqFn] -> IO ([Job], SerfState) + loop :: [Job] -> SerfState -> [BootSeqFn] -> RIO e ([Job], SerfState) loop acc ss = \case [] -> pure (reverse acc, ss) - x:xs -> do wen <- Time.now + x:xs -> do wen <- io Time.now job <- pure $ x (ssNextEv ss) (ssLastMug ss) wen (job, ss) <- bootJob serf job loop (job:acc) ss xs @@ -405,12 +407,13 @@ bootFromSeq serf (BootSeq ident nocks ovums) = do The ship is booted, but it is behind. shove events to the worker until it is caught up. -} -replayJobs :: Serf -> SerfState -> ConduitT Job Void IO SerfState +replayJobs :: HasLogFunc e + => Serf -> SerfState -> ConduitT Job Void (RIO e) SerfState replayJobs serf = go where - go ss = await >>= maybe (pure ss) (liftIO . replayJob serf >=> go) + go ss = await >>= maybe (pure ss) (lift . replayJob serf >=> go) -replay :: Serf -> Log.EventLog -> IO SerfState +replay :: HasLogFunc e => Serf -> Log.EventLog -> RIO e SerfState replay serf log = do ss <- handshake serf (Log.identity log) @@ -418,18 +421,18 @@ replay serf log = do .| toJobs (Log.identity log) (ssNextEv ss) .| replayJobs serf ss -toJobs :: LogIdentity -> EventId -> ConduitT ByteString Job IO () +toJobs :: HasLogFunc e + => LogIdentity -> EventId -> ConduitT ByteString Job (RIO e) () toJobs ident eId = await >>= \case - Nothing -> putStrLn "[toJobs] no more jobs" >> pure () - Just at -> do yield =<< liftIO (fromAtom at) - putStrLn ("[toJobs] " <> tshow eId) + Nothing -> lift $ logTrace "[toJobs] no more jobs" + Just at -> do yield =<< lift (fromAtom at) + lift $ logTrace $ display ("[toJobs] " <> tshow eId) toJobs ident (eId+1) where - isNock = trace ("[toJobs] " <> show (eId, lifecycleLen ident)) - $ eId <= fromIntegral (lifecycleLen ident) + isNock = eId <= fromIntegral (lifecycleLen ident) - fromAtom :: ByteString -> IO Job + fromAtom :: ByteString -> RIO e Job fromAtom bs | isNock = do noun <- cueBSExn bs (mug, nok) <- fromNounExn noun @@ -442,7 +445,7 @@ toJobs ident eId = -- Collect Effects for Parsing ------------------------------------------------- -collectFX :: Serf -> Log.EventLog -> IO () +collectFX :: HasLogFunc e => Serf -> Log.EventLog -> RIO e () collectFX serf log = do ss <- handshake serf (Log.identity log) @@ -451,26 +454,26 @@ collectFX serf log = do .| doCollectFX serf ss .| persistFX log -persistFX :: Log.EventLog -> ConduitT (EventId, FX) Void IO () +persistFX :: Log.EventLog -> ConduitT (EventId, FX) Void (RIO e) () persistFX log = loop where loop = await >>= \case Nothing -> pure () Just (eId, fx) -> do liftIO $ Log.writeEffectsRow log eId (jamBS $ toNoun fx) - putStr "." loop -doCollectFX :: Serf -> SerfState -> ConduitT Job (EventId, FX) IO () +doCollectFX :: ∀e. HasLogFunc e + => Serf -> SerfState -> ConduitT Job (EventId, FX) (RIO e) () doCollectFX serf = go where - go :: SerfState -> ConduitT Job (EventId, FX) IO () + go :: SerfState -> ConduitT Job (EventId, FX) (RIO e) () go ss = await >>= \case Nothing -> pure () Just jb -> do -- jb <- pure $ replaceMug jb (ssLastMug ss) - (_, ss, fx) <- liftIO (doJob serf jb) - liftIO $ print (jobId jb) + (_, ss, fx) <- lift $ doJob serf jb + lift $ logTrace $ displayShow (jobId jb) yield (jobId jb, fx) go ss From de7a087ef5260b9ba3c068cfade6698371d3aefe Mon Sep 17 00:00:00 2001 From: Benjamin Summers Date: Wed, 28 Aug 2019 17:26:59 -0700 Subject: [PATCH 5/5] Logging everywhere + log to file --- pkg/king/app/Main.hs | 34 +++-- pkg/king/lib/Urbit/Timer.hs | 4 +- pkg/king/lib/Vere/Ames.hs | 6 +- pkg/king/lib/Vere/Behn.hs | 8 +- pkg/king/lib/Vere/Http/Server.hs | 173 ++++++++++++----------- pkg/king/lib/Vere/Log.hs | 228 +++++++++++++++++-------------- pkg/king/lib/Vere/Pier.hs | 63 ++++----- pkg/king/lib/Vere/Pier/Types.hs | 2 +- pkg/king/lib/Vere/Serf.hs | 2 +- 9 files changed, 284 insertions(+), 236 deletions(-) diff --git a/pkg/king/app/Main.hs b/pkg/king/app/Main.hs index cca3e0e11..865ec9e8e 100644 --- a/pkg/king/app/Main.hs +++ b/pkg/king/app/Main.hs @@ -122,6 +122,7 @@ import Vere.Serf import Control.Concurrent (runInBoundThread) import Control.Lens ((&)) import System.Directory (doesFileExist, removeFile) +import System.Directory (getHomeDirectory, createDirectoryIfMissing) import System.Environment (getProgName) import Text.Show.Pretty (pPrint) import Urbit.Time (Wen) @@ -152,15 +153,21 @@ instance HasAppName App where runApp :: RIO App a -> IO a runApp inner = do - logOptions <- logOptionsHandle stderr True - <&> setLogUseTime True - <&> setLogUseLoc False + home <- getHomeDirectory + let logDir = home <> "/log" + createDirectoryIfMissing True logDir + withTempFile logDir "king-" $ \tmpFile hFile -> do + hSetBuffering hFile LineBuffering - withLogFunc logOptions $ \logFunc -> do - let app = App { _appLogFunc = logFunc - , _appName = "Alice" - } - runRIO app inner + logOptions <- logOptionsHandle hFile True + <&> setLogUseTime True + <&> setLogUseLoc False + + withLogFunc logOptions $ \logFunc -> do + let app = App { _appLogFunc = logFunc + , _appName = "Alice" + } + runRIO app inner -------------------------------------------------------------------------------- @@ -248,7 +255,7 @@ tryFullReplay shipPath = do checkEvs :: forall e. HasLogFunc e => FilePath -> Word64 -> Word64 -> RIO e () checkEvs pierPath first last = do - rwith (liftAcquire $ Log.existing logPath) $ \log -> do + rwith (Log.existing logPath) $ \log -> do let ident = Log.identity log logTrace (displayShow ident) runConduit $ Log.streamEvents log first @@ -292,7 +299,7 @@ collectAllFx top = do collectedFX :: RAcquire e () collectedFX = do - log <- liftAcquire $ Log.existing (top <> "/.urb/log") + log <- Log.existing (top <> "/.urb/log") serf <- Serf.run (Serf.Config tmpDir serfFlags) rio $ Serf.collectFX serf log @@ -387,14 +394,15 @@ main = CLI.parseArgs >>= runApp . \case checkFx :: HasLogFunc e => FilePath -> Word64 -> Word64 -> RIO e () checkFx pierPath first last = - rwith (liftAcquire $ Log.existing logPath) $ \log -> + rwith (Log.existing logPath) $ \log -> runConduit $ streamFX log first last .| tryParseFXStream where logPath = pierPath <> "/.urb/log" -streamFX :: MonadIO m - => Log.EventLog -> Word64 -> Word64 -> ConduitT () ByteString m () +streamFX :: HasLogFunc e + => Log.EventLog -> Word64 -> Word64 + -> ConduitT () ByteString (RIO e) () streamFX log first last = do Log.streamEffectsRows log first .| loop where diff --git a/pkg/king/lib/Urbit/Timer.hs b/pkg/king/lib/Urbit/Timer.hs index a2518fc9f..8664e8e88 100644 --- a/pkg/king/lib/Urbit/Timer.hs +++ b/pkg/king/lib/Urbit/Timer.hs @@ -33,8 +33,8 @@ start timer@(Timer vSt man) time cb = do stop timer now <- Sys.getSystemTime let sleep = sysTimeGapMicroSecs now time - print (now, time, "->", sleep) - if (sleep <= 0) then (print "ug" >> fire) else do + -- print (now, time, "->", sleep) + if (sleep <= 0) then fire else do key <- Ev.registerTimeout man sleep fire atomicWriteIORef vSt $! Just key diff --git a/pkg/king/lib/Vere/Ames.hs b/pkg/king/lib/Vere/Ames.hs index fd3021333..8e9a632fd 100644 --- a/pkg/king/lib/Vere/Ames.hs +++ b/pkg/king/lib/Vere/Ames.hs @@ -90,17 +90,17 @@ _turfText = intercalate "." . reverse . fmap unCord . unTurf TODO verify that the KingIds match on effects. -} ames :: KingId -> Ship -> Maybe Port -> QueueEv - -> ([Ev], Acquire (EffCb NewtEf)) + -> ([Ev], Acquire (EffCb e NewtEf)) ames inst who mPort enqueueEv = (initialEvents, runAmes) where initialEvents :: [Ev] initialEvents = [barnEv inst] - runAmes :: Acquire (EffCb NewtEf) + runAmes :: Acquire (EffCb e NewtEf) runAmes = do drv <- mkAcquire start stop - pure (handleEffect drv) + pure (io . handleEffect drv) start :: IO AmesDrv start = do diff --git a/pkg/king/lib/Vere/Behn.hs b/pkg/king/lib/Vere/Behn.hs index 41bf1ea80..af6236f01 100644 --- a/pkg/king/lib/Vere/Behn.hs +++ b/pkg/king/lib/Vere/Behn.hs @@ -21,19 +21,19 @@ wakeEv = EvBlip $ BlipEvBehn $ BehnEvWake () () sysTime = view Time.systemTime -behn :: KingId -> QueueEv -> ([Ev], Acquire (EffCb BehnEf)) +behn :: KingId -> QueueEv -> ([Ev], Acquire (EffCb e BehnEf)) behn king enqueueEv = (initialEvents, runBehn) where initialEvents = [bornEv king] - runBehn :: Acquire (EffCb BehnEf) + runBehn :: Acquire (EffCb e BehnEf) runBehn = do tim <- mkAcquire Timer.init Timer.stop pure (handleEf tim) - handleEf :: Timer -> BehnEf -> IO () - handleEf b = \case + handleEf :: Timer -> BehnEf -> RIO e () + handleEf b = io . \case BehnEfVoid v -> absurd v BehnEfDoze (i, ()) mWen -> do when (i == king) (doze b mWen) diff --git a/pkg/king/lib/Vere/Http/Server.hs b/pkg/king/lib/Vere/Http/Server.hs index 830874358..4c2b9a223 100644 --- a/pkg/king/lib/Vere/Http/Server.hs +++ b/pkg/king/lib/Vere/Http/Server.hs @@ -119,36 +119,37 @@ reorgHttpEvent = \case - Keeps the MVar lock until the restart process finishes. -} -restartService :: forall s - . MVar (Maybe s) - -> IO s - -> (s -> IO ()) - -> IO (Either SomeException s) +restartService :: ∀e s. HasLogFunc e + => MVar (Maybe s) + -> RIO e s + -> (s -> RIO e ()) + -> RIO e (Either SomeException s) restartService vServ sstart kkill = do - putStrLn "restartService" + logDebug "restartService" modifyMVar vServ $ \case Nothing -> doStart Just sv -> doRestart sv where - doRestart :: s -> IO (Maybe s, Either SomeException s) + doRestart :: s -> RIO e (Maybe s, Either SomeException s) doRestart serv = do - putStrLn "doStart" + logDebug "doStart" try (kkill serv) >>= \case Left exn -> pure (Nothing, Left exn) Right () -> doStart - doStart :: IO (Maybe s, Either SomeException s) + doStart :: RIO e (Maybe s, Either SomeException s) doStart = do - putStrLn "doStart" + logDebug "doStart" try sstart <&> \case Right s -> (Just s, Right s) Left exn -> (Nothing, Left exn) -stopService :: MVar (Maybe s) - -> (s -> IO ()) - -> IO (Either SomeException ()) +stopService :: HasLogFunc e + => MVar (Maybe s) + -> (s -> RIO e ()) + -> RIO e (Either SomeException ()) stopService vServ kkill = do - putStrLn "stopService" + logDebug "stopService" modifyMVar vServ $ \case Nothing -> pure (Nothing, Right ()) Just sv -> do res <- try (kkill sv) @@ -186,10 +187,10 @@ newLiveReq var = do -- Ports File ------------------------------------------------------------------ -removePortsFile :: FilePath -> IO () +removePortsFile :: FilePath -> RIO e () removePortsFile pax = - doesFileExist pax >>= \case - True -> removeFile pax + io (doesFileExist pax) >>= \case + True -> io $ removeFile pax False -> pure () portsFileText :: Ports -> Text @@ -200,7 +201,7 @@ portsFileText Ports{..} = , Just (tshow (unPort pLoop) <> " insecure loopback") ] -writePortsFile :: FilePath -> Ports -> IO () +writePortsFile :: FilePath -> Ports -> RIO e () writePortsFile f = writeFile f . encodeUtf8 . portsFileText @@ -224,12 +225,12 @@ cookMeth = H.parseMethod . W.requestMethod >>> \case reqIdCord :: ReqId -> Cord reqIdCord = Cord . tshow -reqBody :: W.Request -> IO (Maybe File) +reqBody :: W.Request -> RIO e (Maybe File) reqBody req = do - bodyLbs <- W.strictRequestBody req - if length bodyLbs == 0 - then pure $ Nothing - else pure $ Just $ File $ Octs (toStrict bodyLbs) + bodyLbs <- io $ W.strictRequestBody req + pure $ if length bodyLbs == 0 + then Nothing + else Just $ File $ Octs (toStrict bodyLbs) reqAddr :: W.Request -> Address reqAddr = W.remoteHost >>> \case @@ -295,7 +296,7 @@ data Req - If %bloc before %head, collect it and wait for %head. - If %done before %head, ignore all chunks and produce Nothing. -} -getReq :: TQueue RespAction -> IO Req +getReq :: TQueue RespAction -> RIO e Req getReq tmv = go [] where go çunks = atomically (readTQueue tmv) >>= \case @@ -309,12 +310,15 @@ getReq tmv = go [] - Yield the data from %bloc action. - Close the stream when we hit a %done action. -} -streamBlocks :: [File] -> TQueue RespAction -> ConduitT () (Flush Builder) IO () -streamBlocks init tmv = +streamBlocks :: HasLogFunc e + => e -> [File] -> TQueue RespAction + -> ConduitT () (Flush Builder) IO () +streamBlocks env init tmv = for_ init yieldÇunk >> go where yieldFlush = \x -> yield (Chunk x) >> yield Flush - logDupHead = putStrLn "Multiple %head actions on one request" + logDupHead = runRIO env + $ logError "Multiple %head actions on one request" yieldÇunk = \case "" -> pure () @@ -326,17 +330,19 @@ streamBlocks init tmv = RABloc c -> yieldÇunk c RADone -> pure () -sendResponse :: (W.Response -> IO W.ResponseReceived) - -> TQueue RespAction - -> IO W.ResponseReceived +sendResponse :: HasLogFunc e + => (W.Response -> IO W.ResponseReceived) + -> TQueue RespAction + -> RIO e W.ResponseReceived sendResponse cb tmv = do + env <- ask getReq tmv >>= \case - RNone -> cb $ W.responseLBS (H.mkStatus 444 "No Response") [] - $ "" - RFull h f -> cb $ W.responseLBS (hdrStatus h) (hdrHeaders h) - $ fromStrict $ concat $ unOcts . unFile <$> f - RHead h i -> cb $ W.responseSource (hdrStatus h) (hdrHeaders h) - $ streamBlocks i tmv + RNone -> io $ cb $ W.responseLBS (H.mkStatus 444 "No Response") [] + $ "" + RFull h f -> io $ cb $ W.responseLBS (hdrStatus h) (hdrHeaders h) + $ fromStrict $ concat $ unOcts . unFile <$> f + RHead h i -> io $ cb $ W.responseSource (hdrStatus h) (hdrHeaders h) + $ streamBlocks env i tmv where hdrHeaders :: ResponseHeader -> [H.Header] hdrHeaders = unconvertHeaders . headers @@ -344,15 +350,18 @@ sendResponse cb tmv = do hdrStatus :: ResponseHeader -> H.Status hdrStatus = toEnum . fromIntegral . statusCode -liveReq :: TVar LiveReqs -> Acquire (ReqId, TQueue RespAction) -liveReq vLiv = mkAcquire ins del +liveReq :: TVar LiveReqs -> RAcquire e (ReqId, TQueue RespAction) +liveReq vLiv = mkRAcquire ins del where ins = atomically (newLiveReq vLiv) del = atomically . rmLiveReq vLiv . fst -app :: ServId -> TVar LiveReqs -> (Ev -> STM ()) -> WhichServer -> W.Application -app sId liv plan which req respond = do - with (liveReq liv) $ \(reqId, respVar) -> do +app :: HasLogFunc e + => e -> ServId -> TVar LiveReqs -> (Ev -> STM ()) -> WhichServer + -> W.Application +app env sId liv plan which req respond = + runRIO env $ + rwith (liveReq liv) $ \(reqId, respVar) -> do body <- reqBody req meth <- maybe (error "bad method") pure (cookMeth req) @@ -364,9 +373,10 @@ app sId liv plan which req respond = do try (sendResponse respond respVar) >>= \case Right rr -> pure rr - Left exn -> do atomically $ plan (cancelEv sId reqId) - putStrLn ("Exception during request" <> tshow exn) - throwIO (exn :: SomeException) + Left exn -> do + io $ atomically $ plan (cancelEv sId reqId) + logError $ display ("Exception during request" <> tshow exn) + throwIO (exn :: SomeException) -- Top-Level Driver Interface -------------------------------------------------- @@ -374,20 +384,21 @@ app sId liv plan which req respond = do {- TODO Need to find an open port. -} -startServ :: FilePath -> HttpServerConf -> (Ev -> STM ()) - -> IO Serv +startServ :: HasLogFunc e + => FilePath -> HttpServerConf -> (Ev -> STM ()) + -> RIO e Serv startServ pierPath conf plan = do - putStrLn "startServ" + logDebug "startServ" let tls = hscSecure conf <&> \(PEM key, PEM cert) -> (W.tlsSettingsMemory (cordBytes cert) (cordBytes key)) - sId <- ServId . UV . fromIntegral <$> (randomIO :: IO Word32) + sId <- io $ ServId . UV . fromIntegral <$> (randomIO :: IO Word32) liv <- newTVarIO emptyLiveReqs - (httpPortInt, httpSock) <- W.openFreePort -- 8080 -- 80 if real ship - (httpsPortInt, httpsSock) <- W.openFreePort -- 8443 -- 443 if real ship - (loopPortInt, loopSock) <- W.openFreePort -- 12321 -- ??? if real ship + (httpPortInt, httpSock) <- io $ W.openFreePort -- 8080 -- 80 if real ship + (httpsPortInt, httpsSock) <- io $ W.openFreePort -- 8443 -- 443 if real ship + (loopPortInt, loopSock) <- io $ W.openFreePort -- 12321 -- ??? if real ship let httpPort = Port (fromIntegral httpPortInt) httpsPort = Port (fromIntegral httpsPortInt) @@ -399,29 +410,34 @@ startServ pierPath conf plan = do httpOpts = W.defaultSettings & W.setPort (fromIntegral httpPort) httpsOpts = W.defaultSettings & W.setPort (fromIntegral httpsPort) - putStrLn "Starting loopback server" - loopTid <- async $ W.runSettingsSocket loopOpts loopSock - $ app sId liv plan Loopback + env <- ask - putStrLn "Starting HTTP server" - httpTid <- async $ W.runSettingsSocket httpOpts httpSock - $ app sId liv plan Insecure + logDebug "Starting loopback server" + loopTid <- async $ io + $ W.runSettingsSocket loopOpts loopSock + $ app env sId liv plan Loopback - putStrLn "Starting HTTPS server" + logDebug "Starting HTTP server" + httpTid <- async $ io + $ W.runSettingsSocket httpOpts httpSock + $ app env sId liv plan Insecure + + logDebug "Starting HTTPS server" httpsTid <- for tls $ \tlsOpts -> - async $ W.runTLSSocket tlsOpts httpsOpts httpsSock - $ app sId liv plan Secure + async $ io + $ W.runTLSSocket tlsOpts httpsOpts httpsSock + $ app env sId liv plan Secure let por = Ports (tls <&> const httpsPort) httpPort loopPort fil = pierPath <> "/.http.ports" - print (sId, por, fil) + logDebug $ displayShow (sId, por, fil) - putStrLn "END startServ" + logDebug "Finished started HTTP Servers" pure $ Serv sId conf loopTid httpTid httpsTid por fil liv -killServ :: Serv -> IO () +killServ :: HasLogFunc e => Serv -> RIO e () killServ Serv{..} = do cancel sLoopTid cancel sHttpTid @@ -431,47 +447,50 @@ killServ Serv{..} = do (void . waitCatch) sHttpTid traverse_ (void . waitCatch) sHttpsTid -kill :: Drv -> IO () +kill :: HasLogFunc e => Drv -> RIO e () kill (Drv v) = stopService v killServ >>= fromEither -respond :: Drv -> ReqId -> HttpEvent -> IO () +respond :: HasLogFunc e + => Drv -> ReqId -> HttpEvent -> RIO e () respond (Drv v) reqId ev = do readMVar v >>= \case Nothing -> pure () - Just sv -> do (print (reorgHttpEvent ev)) + Just sv -> do logDebug $ displayShow $ reorgHttpEvent ev for_ (reorgHttpEvent ev) $ atomically . respondToLiveReq (sLiveReqs sv) reqId -serv :: FilePath -> KingId -> QueueEv -> ([Ev], Acquire (EffCb HttpServerEf)) +serv :: ∀e. HasLogFunc e + => FilePath -> KingId -> QueueEv + -> ([Ev], RAcquire e (EffCb e HttpServerEf)) serv pier king plan = (initialEvents, runHttpServer) where initialEvents :: [Ev] initialEvents = [bornEv king] - runHttpServer :: Acquire (EffCb HttpServerEf) - runHttpServer = handleEf <$> mkAcquire (Drv <$> newMVar Nothing) kill + runHttpServer :: RAcquire e (EffCb e HttpServerEf) + runHttpServer = handleEf <$> mkRAcquire (Drv <$> newMVar Nothing) kill - restart :: Drv -> HttpServerConf -> IO Serv + restart :: Drv -> HttpServerConf -> RIO e Serv restart (Drv var) conf = do - putStrLn "Restarting http server" + logDebug "Restarting http server" res <- fromEither =<< restartService var (startServ pier conf plan) killServ - putStrLn "Done restating http server" + logDebug "Done restating http server" pure res - handleEf :: Drv -> HttpServerEf -> IO () + handleEf :: Drv -> HttpServerEf -> RIO e () handleEf drv = \case HSESetConfig (i, ()) conf -> do -- print (i, king) -- when (i == fromIntegral king) $ do - putStrLn "restarting" + logDebug "restarting" Serv{..} <- restart drv conf - putStrLn "Enqueue %live" + logDebug "Enqueue %live" atomically $ plan (liveEv sServId sPorts) - putStrLn "Write ports file" + logDebug "Write ports file" writePortsFile sPortsFile sPorts HSEResponse (i, req, _seq, ()) ev -> do -- print (i, king) -- when (i == fromIntegral king) $ do - putStrLn "respond" + logDebug "respond" respond drv (fromIntegral req) ev diff --git a/pkg/king/lib/Vere/Log.hs b/pkg/king/lib/Vere/Log.hs index e16be6aee..74e79a6c6 100644 --- a/pkg/king/lib/Vere/Log.hs +++ b/pkg/king/lib/Vere/Log.hs @@ -10,7 +10,7 @@ module Vere.Log ( EventLog, identity, nextEv import UrbitPrelude hiding (init) -import Data.Acquire +import Data.RAcquire import Data.Conduit import Database.LMDB.Raw import Foreign.Marshal.Alloc @@ -26,6 +26,7 @@ import qualified Data.Vector as V -- Types ----------------------------------------------------------------------- type Env = MDB_env +type Val = MDB_val type Txn = MDB_txn type Dbi = MDB_dbi type Cur = MDB_cursor @@ -39,10 +40,10 @@ data EventLog = EventLog , numEvents :: IORef EventId } -nextEv :: EventLog -> IO EventId +nextEv :: EventLog -> RIO e EventId nextEv = fmap succ . readIORef . numEvents -lastEv :: EventLog -> IO EventId +lastEv :: EventLog -> RIO e EventId lastEv = readIORef . numEvents data EventLogExn @@ -63,17 +64,18 @@ instance Exception EventLogExn where -- Open/Close an Event Log ----------------------------------------------------- -rawOpen :: FilePath -> IO Env -rawOpen dir = do - putStrLn $ pack ("PAX: " <> dir) +rawOpen :: MonadIO m => FilePath -> m Env +rawOpen dir = io $ do env <- mdb_env_create mdb_env_set_maxdbs env 3 mdb_env_set_mapsize env (40 * 1024 * 1024 * 1024) mdb_env_open env dir [] pure env -create :: FilePath -> LogIdentity -> IO EventLog +create :: HasLogFunc e => FilePath -> LogIdentity -> RIO e EventLog create dir id = do + logDebug $ display (pack @Text $ "Creating LMDB database: " <> dir) + logDebug $ display (pack @Text $ "Log Identity: " <> show id) env <- rawOpen dir (m, e, f) <- createTables env clearEvents env e @@ -81,41 +83,44 @@ create dir id = do EventLog env m e f id <$> newIORef 0 where createTables env = - with (writeTxn env) $ \txn -> + rwith (writeTxn env) $ \txn -> io $ (,,) <$> mdb_dbi_open txn (Just "META") [MDB_CREATE] <*> mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY] <*> mdb_dbi_open txn (Just "EFFECTS") [MDB_CREATE, MDB_INTEGERKEY] -open :: FilePath -> IO EventLog +open :: HasLogFunc e => FilePath -> RIO e EventLog open dir = do + logDebug $ display (pack @Text $ "Opening LMDB database: " <> dir) env <- rawOpen dir (m, e, f) <- openTables env id <- getIdent env m + logDebug $ display (pack @Text $ "Log Identity: " <> show id) numEvs <- getNumEvents env e EventLog env m e f id <$> newIORef numEvs where openTables env = - with (writeTxn env) $ \txn -> + rwith (writeTxn env) $ \txn -> io $ (,,) <$> mdb_dbi_open txn (Just "META") [] <*> mdb_dbi_open txn (Just "EVENTS") [MDB_INTEGERKEY] <*> mdb_dbi_open txn (Just "EFFECTS") [MDB_CREATE, MDB_INTEGERKEY] -close :: EventLog -> IO () -close (EventLog env meta events effects _ _) = do - mdb_dbi_close env meta - mdb_dbi_close env events - mdb_dbi_close env effects - mdb_env_sync_flush env - mdb_env_close env +close :: HasLogFunc e => FilePath -> EventLog -> RIO e () +close dir (EventLog env meta events effects _ _) = do + logDebug $ display (pack @Text $ "Closing LMDB database: " <> dir) + io $ do mdb_dbi_close env meta + mdb_dbi_close env events + mdb_dbi_close env effects + mdb_env_sync_flush env + mdb_env_close env -- Create a new event log or open an existing one. ----------------------------- -existing :: FilePath -> Acquire EventLog -existing dir = mkAcquire (open dir) close +existing :: HasLogFunc e => FilePath -> RAcquire e EventLog +existing dir = mkRAcquire (open dir) (close dir) -new :: FilePath -> LogIdentity -> Acquire EventLog -new dir id = mkAcquire (create dir id) close +new :: HasLogFunc e => FilePath -> LogIdentity -> RAcquire e EventLog +new dir id = mkRAcquire (create dir id) (close dir) -- Read/Write Log Identity ----------------------------------------------------- @@ -125,22 +130,22 @@ new dir id = mkAcquire (create dir id) close Use this when opening database handles. -} -_openTxn :: Env -> Acquire Txn -_openTxn env = mkAcquire begin commit +_openTxn :: Env -> RAcquire e Txn +_openTxn env = mkRAcquire begin commit where - begin = mdb_txn_begin env Nothing True - commit = mdb_txn_commit + begin = io $ mdb_txn_begin env Nothing True + commit = io . mdb_txn_commit {- A read-only transaction that aborts at the end. Use this when reading data from already-opened databases. -} -readTxn :: Env -> Acquire Txn -readTxn env = mkAcquire begin abort +readTxn :: Env -> RAcquire e Txn +readTxn env = mkRAcquire begin abort where - begin = mdb_txn_begin env Nothing True - abort = mdb_txn_abort + begin = io $ mdb_txn_begin env Nothing True + abort = io . mdb_txn_abort {- A read-write transaction that commits upon sucessful completion and @@ -148,42 +153,44 @@ readTxn env = mkAcquire begin abort Use this when reading data from already-opened databases. -} -writeTxn :: Env -> Acquire Txn -writeTxn env = mkAcquireType begin finalize +writeTxn :: Env -> RAcquire e Txn +writeTxn env = mkRAcquireType begin finalize where - begin = mdb_txn_begin env Nothing False - finalize txn = \case + begin = io $ mdb_txn_begin env Nothing False + finalize txn = io . \case ReleaseNormal -> mdb_txn_commit txn ReleaseEarly -> mdb_txn_commit txn ReleaseException -> mdb_txn_abort txn -cursor :: Txn -> Dbi -> Acquire Cur -cursor txn dbi = mkAcquire open close +cursor :: Txn -> Dbi -> RAcquire e Cur +cursor txn dbi = mkRAcquire open close where - open = mdb_cursor_open txn dbi - close = mdb_cursor_close + open = io $ mdb_cursor_open txn dbi + close = io . mdb_cursor_close -getIdent :: Env -> Dbi -> IO LogIdentity -getIdent env dbi = +getIdent :: HasLogFunc e => Env -> Dbi -> RIO e LogIdentity +getIdent env dbi = do + logDebug "Reading log identity" getTbl env >>= traverse decodeIdent >>= \case Nothing -> throwIO NoLogIdentity Just li -> pure li where - decodeIdent :: (Noun, Noun, Noun) -> IO LogIdentity + decodeIdent :: (Noun, Noun, Noun) -> RIO e LogIdentity decodeIdent = fromNounExn . toNoun - getTbl :: Env -> IO (Maybe (Noun, Noun, Noun)) + getTbl :: Env -> RIO e (Maybe (Noun, Noun, Noun)) getTbl env = do - with (readTxn env) $ \txn -> do + rwith (readTxn env) $ \txn -> do who <- getMb txn dbi "who" fake <- getMb txn dbi "is-fake" life <- getMb txn dbi "life" pure $ (,,) <$> who <*> fake <*> life -writeIdent :: Env -> Dbi -> LogIdentity -> IO () +writeIdent :: HasLogFunc e => Env -> Dbi -> LogIdentity -> RIO e () writeIdent env metaTbl ident@LogIdentity{..} = do + logDebug "Writing log identity" let flags = compileWriteFlags [] - with (writeTxn env) $ \txn -> do + rwith (writeTxn env) $ \txn -> do x <- putNoun flags txn metaTbl "who" (toNoun who) y <- putNoun flags txn metaTbl "is-fake" (toNoun isFake) z <- putNoun flags txn metaTbl "life" (toNoun lifecycleLen) @@ -193,30 +200,30 @@ writeIdent env metaTbl ident@LogIdentity{..} = do -- Latest Event Number --------------------------------------------------------- -getNumEvents :: Env -> Dbi -> IO Word64 +getNumEvents :: Env -> Dbi -> RIO e Word64 getNumEvents env eventsTbl = - with (readTxn env) $ \txn -> - with (cursor txn eventsTbl) $ \cur -> - withKVPtrs nullVal nullVal $ \pKey pVal -> - mdb_cursor_get MDB_LAST cur pKey pVal >>= \case + rwith (readTxn env) $ \txn -> + rwith (cursor txn eventsTbl) $ \cur -> + withKVPtrs' nullVal nullVal $ \pKey pVal -> + io $ mdb_cursor_get MDB_LAST cur pKey pVal >>= \case False -> pure 0 True -> peek pKey >>= mdbValToWord64 -- Write Events ---------------------------------------------------------------- -clearEvents :: Env -> Dbi -> IO () +clearEvents :: Env -> Dbi -> RIO e () clearEvents env eventsTbl = - with (writeTxn env) $ \txn -> - with (cursor txn eventsTbl) $ \cur -> - withKVPtrs nullVal nullVal $ \pKey pVal -> do - let loop = mdb_cursor_get MDB_LAST cur pKey pVal >>= \case + rwith (writeTxn env) $ \txn -> + rwith (cursor txn eventsTbl) $ \cur -> + withKVPtrs' nullVal nullVal $ \pKey pVal -> do + let loop = io (mdb_cursor_get MDB_LAST cur pKey pVal) >>= \case False -> pure () - True -> do mdb_cursor_del (compileWriteFlags []) cur + True -> do io $ mdb_cursor_del (compileWriteFlags []) cur loop loop -appendEvents :: EventLog -> Vector ByteString -> IO () +appendEvents :: EventLog -> Vector ByteString -> RIO e () appendEvents log !events = do numEvs <- readIORef (numEvents log) next <- pure (numEvs + 1) @@ -225,15 +232,15 @@ appendEvents log !events = do where flags = compileWriteFlags [MDB_NOOVERWRITE] doAppend = \kvs -> - with (writeTxn $ env log) $ \txn -> + rwith (writeTxn $ env log) $ \txn -> for_ kvs $ \(k,v) -> do putBytes flags txn (eventsTbl log) k v >>= \case True -> pure () False -> throwIO (BadWriteEvent k) -writeEffectsRow :: EventLog -> EventId -> ByteString -> IO () +writeEffectsRow :: EventLog -> EventId -> ByteString -> RIO e () writeEffectsRow log k v = do - with (writeTxn $ env log) $ \txn -> + rwith (writeTxn $ env log) $ \txn -> putBytes flags txn (effectsTbl log) k v >>= \case True -> pure () False -> throwIO (BadWriteEffect k) @@ -244,23 +251,24 @@ writeEffectsRow log k v = do -------------------------------------------------------------------------------- -- Read Events ----------------------------------------------------------------- -streamEvents :: MonadIO m +streamEvents :: HasLogFunc e => EventLog -> Word64 - -> ConduitT () ByteString m () + -> ConduitT () ByteString (RIO e) () streamEvents log first = do - last <- liftIO $ lastEv log - batch <- liftIO (readBatch log first) + last <- lift $ lastEv log + batch <- lift $ readBatch log first unless (null batch) $ do for_ batch yield streamEvents log (first + word (length batch)) -streamEffectsRows :: MonadIO m +streamEffectsRows :: ∀e. HasLogFunc e => EventLog -> EventId - -> ConduitT () (Word64, ByteString) m () + -> ConduitT () (Word64, ByteString) (RIO e) () streamEffectsRows log = go where + go :: EventId -> ConduitT () (Word64, ByteString) (RIO e) () go next = do - batch <- liftIO $ readRowsBatch (env log) (effectsTbl log) next + batch <- lift $ readRowsBatch (env log) (effectsTbl log) next unless (null batch) $ do for_ batch yield go (next + fromIntegral (length batch)) @@ -270,7 +278,7 @@ streamEffectsRows log = go Throws `MissingEvent` if an event was missing from the log. -} -readBatch :: EventLog -> Word64 -> IO (V.Vector ByteString) +readBatch :: EventLog -> Word64 -> RIO e (V.Vector ByteString) readBatch log first = start where start = do @@ -279,59 +287,66 @@ readBatch log first = start then pure mempty else readRows $ fromIntegral $ min 1000 $ ((last+1) - first) - assertFound :: EventId -> Bool -> IO () + assertFound :: EventId -> Bool -> RIO e () assertFound id found = do unless found $ throwIO $ MissingEvent id readRows count = withWordPtr first $ \pIdx -> - withKVPtrs (MDB_val 8 (castPtr pIdx)) nullVal $ \pKey pVal -> - with (readTxn $ env log) $ \txn -> - with (cursor txn $ eventsTbl log) $ \cur -> do - assertFound first =<< mdb_cursor_get MDB_SET_KEY cur pKey pVal + withKVPtrs' (MDB_val 8 (castPtr pIdx)) nullVal $ \pKey pVal -> + rwith (readTxn $ env log) $ \txn -> + rwith (cursor txn $ eventsTbl log) $ \cur -> do + assertFound first =<< io (mdb_cursor_get MDB_SET_KEY cur pKey pVal) fetchRows count cur pKey pVal fetchRows count cur pKey pVal = do - V.generateM count $ \i -> do - key <- peek pKey >>= mdbValToWord64 - val <- peek pVal >>= mdbValToBytes + env <- ask + V.generateM count $ \i -> runRIO env $ do + key <- io $ peek pKey >>= mdbValToWord64 + val <- io $ peek pVal >>= mdbValToBytes idx <- pure (first + word i) unless (key == idx) $ throwIO $ MissingEvent idx when (count /= succ i) $ do - assertFound idx =<< mdb_cursor_get MDB_NEXT cur pKey pVal + assertFound idx =<< io (mdb_cursor_get MDB_NEXT cur pKey pVal) pure val {- Read 1000 rows from the database, starting from key `first`. -} -readRowsBatch :: Env -> Dbi -> Word64 -> IO (V.Vector (Word64, ByteString)) +readRowsBatch :: ∀e. HasLogFunc e + => Env -> Dbi -> Word64 -> RIO e (V.Vector (Word64, ByteString)) readRowsBatch env dbi first = readRows where readRows = do - -- print ("readRows", first) + logDebug $ displayShow ("readRows", first) withWordPtr first $ \pIdx -> - withKVPtrs (MDB_val 8 (castPtr pIdx)) nullVal $ \pKey pVal -> - with (readTxn env) $ \txn -> - with (cursor txn dbi) $ \cur -> - mdb_cursor_get MDB_SET_RANGE cur pKey pVal >>= \case - False -> pure mempty - True -> V.unfoldrM (fetchRows cur pKey pVal) 1000 + withKVPtrs' (MDB_val 8 (castPtr pIdx)) nullVal $ \pKey pVal -> + rwith (readTxn env) $ \txn -> + rwith (cursor txn dbi) $ \cur -> + io (mdb_cursor_get MDB_SET_RANGE cur pKey pVal) >>= \case + False -> pure mempty + True -> V.unfoldrM (fetchRows cur pKey pVal) 1000 - fetchRows :: Cur -> Ptr MDB_val -> Ptr MDB_val - -> Word - -> IO (Maybe ((Word64, ByteString), Word)) + fetchRows :: Cur -> Ptr Val -> Ptr Val -> Word + -> RIO e (Maybe ((Word64, ByteString), Word)) fetchRows cur pKey pVal 0 = pure Nothing fetchRows cur pKey pVal n = do - key <- peek pKey >>= mdbValToWord64 - val <- peek pVal >>= mdbValToBytes - -- print ("fetchRows", n, key, val) - mdb_cursor_get MDB_NEXT cur pKey pVal >>= \case + key <- io $ peek pKey >>= mdbValToWord64 + val <- io $ peek pVal >>= mdbValToBytes + logDebug $ displayShow ("fetchRows", n, key, val) + io $ mdb_cursor_get MDB_NEXT cur pKey pVal >>= \case False -> pure $ Just ((key, val), 0) True -> pure $ Just ((key, val), pred n) -- Utils ----------------------------------------------------------------------- +withKVPtrs' :: (MonadIO m, MonadUnliftIO m) + => Val -> Val -> (Ptr Val -> Ptr Val -> m a) -> m a +withKVPtrs' k v cb = + withRunInIO $ \run -> + withKVPtrs k v $ \x y -> run (cb x y) + nullVal :: MDB_val nullVal = MDB_val 0 nullPtr @@ -355,20 +370,24 @@ mdbValToWord64 (MDB_val sz ptr) = do assertExn (sz == 8) BadKeyInEventLog peek (castPtr ptr) -withWord64AsMDBval :: Word64 -> (MDB_val -> IO a) -> IO a +withWord64AsMDBval :: (MonadIO m, MonadUnliftIO m) + => Word64 -> (MDB_val -> m a) -> m a withWord64AsMDBval w cb = do withWordPtr w $ \p -> cb (MDB_val (fromIntegral (sizeOf w)) (castPtr p)) -withWordPtr :: Word64 -> (Ptr Word64 -> IO a) -> IO a -withWordPtr w cb = do - allocaBytes (sizeOf w) (\p -> poke p w >> cb p) +withWordPtr :: (MonadIO m, MonadUnliftIO m) + => Word64 -> (Ptr Word64 -> m a) -> m a +withWordPtr w cb = + withRunInIO $ \run -> + allocaBytes (sizeOf w) (\p -> poke p w >> run (cb p)) -- Lower-Level Operations ------------------------------------------------------ -getMb :: Txn -> Dbi -> ByteString -> IO (Maybe Noun) +getMb :: MonadIO m => Txn -> Dbi -> ByteString -> m (Maybe Noun) getMb txn db key = + io $ byteStringAsMdbVal key $ \mKey -> mdb_get txn db mKey >>= traverse (mdbValToNoun key) @@ -382,14 +401,19 @@ mdbValToNoun key (MDB_val sz ptr) = do let res = cueBS bs eitherExn res (\err -> BadNounInLogIdentity key err bs) -putNoun :: MDB_WriteFlags -> Txn -> Dbi -> ByteString -> Noun -> IO Bool +putNoun :: MonadIO m + => MDB_WriteFlags -> Txn -> Dbi -> ByteString -> Noun -> m Bool putNoun flags txn db key val = + io $ byteStringAsMdbVal key $ \mKey -> byteStringAsMdbVal (jamBS val) $ \mVal -> mdb_put flags txn db mKey mVal -putBytes :: MDB_WriteFlags -> Txn -> Dbi -> Word64 -> ByteString -> IO Bool -putBytes flags txn db id bs = do - withWord64AsMDBval id $ \idVal -> do - byteStringAsMdbVal bs $ \mVal -> do - mdb_put flags txn db idVal mVal + +putBytes :: MonadIO m + => MDB_WriteFlags -> Txn -> Dbi -> Word64 -> ByteString -> m Bool +putBytes flags txn db id bs = + io $ + withWord64AsMDBval id $ \idVal -> + byteStringAsMdbVal bs $ \mVal -> + mdb_put flags txn db idVal mVal diff --git a/pkg/king/lib/Vere/Pier.hs b/pkg/king/lib/Vere/Pier.hs index dd506eaa7..ba739e1d7 100644 --- a/pkg/king/lib/Vere/Pier.hs +++ b/pkg/king/lib/Vere/Pier.hs @@ -58,9 +58,9 @@ generateBootSeq ship Pill{..} = do writeJobs :: EventLog -> Vector Job -> RIO e () writeJobs log !jobs = do - expect <- io $ Log.nextEv log + expect <- Log.nextEv log events <- fmap fromList $ traverse fromJob (zip [expect..] $ toList jobs) - io $ Log.appendEvents log events + Log.appendEvents log events where fromJob :: (EventId, Job) -> RIO e ByteString fromJob (expectedId, job) = do @@ -93,7 +93,7 @@ booted pillPath pierPath flags ship = do rio $ logTrace "Directory Setup" - log <- liftAcquire $ Log.new (pierPath <> "/.urb/log") ident + log <- Log.new (pierPath <> "/.urb/log") ident rio $ logTrace "Event Log Initialized" @@ -117,7 +117,7 @@ resumed :: HasLogFunc e => FilePath -> Serf.Flags -> RAcquire e (Serf, EventLog, SerfState) resumed top flags = do - log <- liftAcquire $ Log.existing (top <> "/.urb/log") + log <- Log.existing (top <> "/.urb/log") serf <- Serf.run (Serf.Config top flags) serfSt <- rio $ Serf.replay serf log @@ -170,32 +170,29 @@ death tag tid = do -- Start All Drivers ----------------------------------------------------------- -data Drivers = Drivers - { dAmes :: EffCb AmesEf - , dBehn :: EffCb BehnEf - , dHttpClient :: EffCb HttpClientEf - , dHttpServer :: EffCb HttpServerEf - , dNewt :: EffCb NewtEf - , dSync :: EffCb SyncEf - , dTerm :: EffCb TermEf +data Drivers e = Drivers + { dAmes :: EffCb e AmesEf + , dBehn :: EffCb e BehnEf + , dHttpClient :: EffCb e HttpClientEf + , dHttpServer :: EffCb e HttpServerEf + , dNewt :: EffCb e NewtEf + , dSync :: EffCb e SyncEf + , dTerm :: EffCb e TermEf } -drivers :: FilePath - -> KingId - -> Ship - -> Maybe Port - -> (Ev -> STM ()) - -> ([Ev], RAcquire e Drivers) +drivers :: HasLogFunc e + => FilePath -> KingId -> Ship -> Maybe Port -> (Ev -> STM ()) + -> ([Ev], RAcquire e (Drivers e)) drivers pierPath inst who mPort plan = - (initialEvents, liftAcquire runDrivers) + (initialEvents, runDrivers) where (behnBorn, runBehn) = behn inst plan (amesBorn, runAmes) = ames inst who mPort plan (httpBorn, runHttp) = serv pierPath inst plan initialEvents = mconcat [behnBorn, amesBorn, httpBorn] runDrivers = do - dNewt <- runAmes - dBehn <- runBehn + dNewt <- liftAcquire $ runAmes + dBehn <- liftAcquire $ runBehn dAmes <- pure $ const $ pure () dHttpClient <- pure $ const $ pure () dHttpServer <- runHttp @@ -206,7 +203,7 @@ drivers pierPath inst who mPort plan = -- Route Effects to Drivers ---------------------------------------------------- -router :: HasLogFunc e => STM FX -> Drivers -> RAcquire e (Async ()) +router :: HasLogFunc e => STM FX -> Drivers e -> RAcquire e (Async ()) router waitFx Drivers{..} = mkRAcquire start cancel where @@ -217,15 +214,15 @@ router waitFx Drivers{..} = case ef of GoodParse (EfVega _ _) -> error "TODO" GoodParse (EfExit _ _) -> error "TODO" - GoodParse (EfVane (VEAmes ef)) -> io $ dAmes ef - GoodParse (EfVane (VEBehn ef)) -> io $ dBehn ef - GoodParse (EfVane (VEBoat ef)) -> io $ dSync ef - GoodParse (EfVane (VEClay ef)) -> io $ dSync ef - GoodParse (EfVane (VEHttpClient ef)) -> io $ dHttpClient ef - GoodParse (EfVane (VEHttpServer ef)) -> io $ dHttpServer ef - GoodParse (EfVane (VENewt ef)) -> io $ dNewt ef - GoodParse (EfVane (VESync ef)) -> io $ dSync ef - GoodParse (EfVane (VETerm ef)) -> io $ dTerm ef + GoodParse (EfVane (VEAmes ef)) -> dAmes ef + GoodParse (EfVane (VEBehn ef)) -> dBehn ef + GoodParse (EfVane (VEBoat ef)) -> dSync ef + GoodParse (EfVane (VEClay ef)) -> dSync ef + GoodParse (EfVane (VEHttpClient ef)) -> dHttpClient ef + GoodParse (EfVane (VEHttpServer ef)) -> dHttpServer ef + GoodParse (EfVane (VENewt ef)) -> dNewt ef + GoodParse (EfVane (VESync ef)) -> dSync ef + GoodParse (EfVane (VETerm ef)) -> dTerm ef FailParse n -> logError $ display $ pack @Text (ppShow n) @@ -293,12 +290,12 @@ runPersist log inpQ out = runThread = asyncBound $ forever $ do writs <- atomically getBatchFromQueue events <- validateJobsAndGetBytes (toNullable writs) - io $ Log.appendEvents log events + Log.appendEvents log events atomically $ for_ writs $ \(_,fx) -> out fx validateJobsAndGetBytes :: [(Job, FX)] -> RIO e (Vector ByteString) validateJobsAndGetBytes writs = do - expect <- io $ Log.nextEv log + expect <- Log.nextEv log fmap fromList $ for (zip [expect..] writs) $ \(expectedId, (j, fx)) -> do diff --git a/pkg/king/lib/Vere/Pier/Types.hs b/pkg/king/lib/Vere/Pier/Types.hs index 6426a7387..9717ab166 100644 --- a/pkg/king/lib/Vere/Pier/Types.hs +++ b/pkg/king/lib/Vere/Pier/Types.hs @@ -84,7 +84,7 @@ deriveToNoun ''Order type QueueEv = Ev -> STM () -type EffCb a = a -> IO () +type EffCb e a = a -> RIO e () type Perform = Ef -> IO () diff --git a/pkg/king/lib/Vere/Serf.hs b/pkg/king/lib/Vere/Serf.hs index 21aa6068f..3467b716b 100644 --- a/pkg/king/lib/Vere/Serf.hs +++ b/pkg/king/lib/Vere/Serf.hs @@ -460,7 +460,7 @@ persistFX log = loop loop = await >>= \case Nothing -> pure () Just (eId, fx) -> do - liftIO $ Log.writeEffectsRow log eId (jamBS $ toNoun fx) + lift $ Log.writeEffectsRow log eId (jamBS $ toNoun fx) loop doCollectFX :: ∀e. HasLogFunc e