From 6565c06fd4708396ba206fcf8b28b72ba5384ca2 Mon Sep 17 00:00:00 2001 From: Benjamin Summers Date: Mon, 24 Jun 2019 18:10:41 -0700 Subject: [PATCH] Got something working: Can "replay" event log for ship whos snapshot is already up to date.. --- pkg/hs-urbit/lib/Data/Noun/Poet.hs | 10 +- pkg/hs-urbit/lib/Vere/Log.hs | 286 ++++++++----------- pkg/hs-urbit/lib/Vere/Persist.hs | 77 +++++ pkg/hs-urbit/lib/Vere/Pier.hs | 90 +++--- pkg/hs-urbit/lib/Vere/Pier/Types.hs | 2 + pkg/hs-urbit/lib/Vere/{Worker.hs => Serf.hs} | 187 +++++++----- pkg/hs-vere/app/test/Main.hs | 81 ++++-- pkg/urbit/vere/pier.c | 8 +- stack.yaml | 2 + 9 files changed, 427 insertions(+), 316 deletions(-) create mode 100644 pkg/hs-urbit/lib/Vere/Persist.hs rename pkg/hs-urbit/lib/Vere/{Worker.hs => Serf.hs} (64%) diff --git a/pkg/hs-urbit/lib/Data/Noun/Poet.hs b/pkg/hs-urbit/lib/Data/Noun/Poet.hs index 7c99cfc372..45247c5393 100644 --- a/pkg/hs-urbit/lib/Data/Noun/Poet.hs +++ b/pkg/hs-urbit/lib/Data/Noun/Poet.hs @@ -79,7 +79,7 @@ instance Applicative IResult where (<*>) = ap instance Fail.MonadFail IResult where - fail err = IError [] err + fail err = traceM ("!" <> err <> "!") >> IError [] err instance Monad IResult where return = pure @@ -203,6 +203,12 @@ fromNoun n = runParser (parseNoun n) [] onFail onSuccess onFail p m = Nothing onSuccess x = Just x +fromNounErr :: FromNoun a => Noun -> Either Text a +fromNounErr n = runParser (parseNoun n) [] onFail onSuccess + where + onFail p m = Left (pack m) + onSuccess x = Right x + _Poet :: (ToNoun a, FromNoun a) => Prism' Noun a _Poet = prism' toNoun fromNoun @@ -287,7 +293,7 @@ instance ToNoun a => ToNoun (Nullable a) where instance FromNoun a => FromNoun (Nullable a) where parseNoun (Atom 0) = pure Nil - parseNoun (Atom n) = fail ("Expected ?@(~ ^), but got " <> show n) + parseNoun (Atom n) = fail ("Nullable: expected ?@(~ ^), but got " <> show n) parseNoun n = NotNil <$> parseNoun n diff --git a/pkg/hs-urbit/lib/Vere/Log.hs b/pkg/hs-urbit/lib/Vere/Log.hs index 71f0e8719c..c7748c460b 100644 --- a/pkg/hs-urbit/lib/Vere/Log.hs +++ b/pkg/hs-urbit/lib/Vere/Log.hs @@ -1,17 +1,19 @@ --- TODO: Make sure transaction closed in all error cases --- TODO: Don't allow writing non-contiguous events -module Vere.Log ( - init, - shutdown, - -- we don't export write; you use the queue - readEvents, - latestEventNumber, - readLogIdentity, - writeLogIdentity -) where +{- + TODO: Make sure transaction closed in all error cases + TODO: Don't allow writing non-contiguous events +-} + +module Vere.Log ( open + , close + , readEvents + , latestEventNumber + , readIdent + , writeIdent + , putJam + ) where import ClassyPrelude hiding (init) -import Control.Lens hiding ((<|)) +import Control.Lens hiding ((<|)) import Data.Noun import Data.Noun.Atom @@ -32,131 +34,73 @@ import qualified Data.ByteString as B import qualified Data.Vector as V import qualified Data.Vector.Mutable as MV --------------------------------------------------------------------------------- --- TODO: Handle throws on the async -init :: FilePath -> TQueue (Writ [Eff]) -> (Writ [Eff] -> STM ()) - -> IO LogState -init dir inp cb = do +-- Open/Close an Event Log ----------------------------------------------------- + +open :: FilePath -> IO EventLog +open dir = do env <- mdb_env_create mdb_env_set_maxdbs env 3 mdb_env_set_mapsize env (40 * 1024 * 1024 * 1024) mdb_env_open env dir [] - writer <- persistThread env inp cb - pure (LogState env inp cb writer) + pure (EventLog env) --- TODO: properly handle shutdowns during write -shutdown :: LogState -> IO () -shutdown s = do - void $ waitCancel (writer s) - mdb_env_close (env s) +close :: EventLog -> IO () +close (EventLog env) = mdb_env_close env -waitCancel :: Async a -> IO (Either SomeException a) -waitCancel async = cancel async >> waitCatch async --------------------------------------------------------------------------------- +-- Read/Write Log Identity ----------------------------------------------------- -{- - Read one or more items from a TQueue, only blocking on the first item. --} -readQueue :: TQueue a -> STM (NonNull [a]) -readQueue q = - readTQueue q >>= go . singleton +readIdent :: EventLog -> IO LogIdentity +readIdent (EventLog env) = do + txn <- mdb_txn_begin env Nothing True + db <- mdb_dbi_open txn (Just "META") [] + who <- get txn db "who" + is_fake <- get txn db "is-fake" + life <- get txn db "life" + mdb_txn_abort txn + pure (LogIdentity who is_fake life) + +writeIdent :: EventLog -> LogIdentity -> IO () +writeIdent (EventLog env) LogIdentity{..} = do + txn <- mdb_txn_begin env Nothing False + db <- mdb_dbi_open txn (Just "META") [MDB_CREATE] + let flags = compileWriteFlags [] + putNoun flags txn db "who" who + putNoun flags txn db "is-fake" is_fake + putNoun flags txn db "life" life + mdb_txn_commit txn + pure () + + +-- Latest Event Number --------------------------------------------------------- + +latestEventNumber :: EventLog -> IO Word64 +latestEventNumber (EventLog env) = + do + txn <- mdb_txn_begin env Nothing True + db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY] + cur <- mdb_cursor_open txn db + res <- fetch txn db cur + mdb_cursor_close cur + mdb_txn_abort txn + pure res where - go acc = - tryReadTQueue q >>= \case - Nothing -> pure (reverse acc) - Just item -> go (item <| acc) + key = MDB_val 0 nullPtr + val = MDB_val 0 nullPtr + fetch txn db cur = + withKVPtrs key val $ \pKey pVal -> + mdb_cursor_get MDB_LAST cur pKey pVal >>= \case + False -> pure 0 + True -> peek pKey >>= mdbValToWord64 -byteStringAsMdbVal :: ByteString -> (MDB_val -> IO a) -> IO a -byteStringAsMdbVal bs k = - BU.unsafeUseAsCStringLen bs \(ptr,sz) -> - k (MDB_val (fromIntegral sz) (castPtr ptr)) -mdbValToAtom :: MDB_val -> IO Atom -mdbValToAtom (MDB_val sz ptr) = do - bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz) - pure (bs ^. from (pill . pillBS)) - -maybeErr :: Maybe a -> String -> IO a -maybeErr (Just x) _ = pure x -maybeErr Nothing msg = error msg - -mdbValToNoun :: MDB_val -> IO Noun -mdbValToNoun (MDB_val sz ptr) = do - bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz) - let res = (bs ^? from pillBS . from pill . _Cue) - maybeErr res "mdb bad cue" - -putRaw :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> MDB_val -> MDB_val -> IO () -putRaw flags txn db key val = - mdb_put flags txn db key val >>= \case - True -> pure () - False -> error "mdb bad put" - -putNoun :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> ByteString -> Noun -> IO () -putNoun flags txn db key val = - byteStringAsMdbVal key $ \mKey -> - byteStringAsMdbVal (val ^. re _CueBytes) $ \mVal -> - putRaw flags txn db mKey mVal - -putJam :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> Word64 -> Jam -> IO () -putJam flags txn db id (Jam atom) = do - withWord64AsMDBval id $ \idVal -> do - let !bs = atom ^. pill . pillBS - byteStringAsMdbVal bs $ \mVal -> do - putRaw flags txn db idVal mVal - -get :: MDB_txn -> MDB_dbi -> ByteString -> IO Noun -get txn db key = - byteStringAsMdbVal key \mKey -> - mdb_get txn db mKey >>= maybe (error "mdb bad get") mdbValToNoun - -mdbValToWord64 :: MDB_val -> IO Word64 -mdbValToWord64 (MDB_val sz ptr) = do - assertErr (sz == 8) "wrong size in mdbValToWord64" - peek (castPtr ptr) - -withWord64AsMDBval :: Word64 -> (MDB_val -> IO a) -> IO a -withWord64AsMDBval w cb = do - withWordPtr w $ \p -> - cb (MDB_val (fromIntegral (sizeOf w)) (castPtr p)) - --------------------------------------------------------------------------------- - -withWordPtr :: Word64 -> (Ptr Word64 -> IO a) -> IO a -withWordPtr w cb = do - allocaBytes (sizeOf w) (\p -> poke p w >> cb p) - --- TODO: We need to be able to send back an exception to the main thread on an --- exception on the persistence thread. -persistThread :: MDB_env - -> TQueue (Writ [Eff]) - -> (Writ [Eff] -> STM ()) - -> IO (Async ()) -persistThread env inputQueue onPersist = asyncBound $ - forever do - writs <- atomically $ readQueue inputQueue - writeEvents writs - atomically $ traverse_ onPersist writs - where - writeEvents writs = do - txn <- mdb_txn_begin env Nothing False - db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY] - - let flags = compileWriteFlags [MDB_NOOVERWRITE] - - for_ writs $ \w -> do - putJam flags txn db (eventId w) (event w) - - mdb_txn_commit txn - -deriving instance Show MDB_val +-- Read Events ----------------------------------------------------------------- -- TODO: This will read len items and will error if there are less than that -- available. This differs from the current pier.c's expectations. -readEvents :: LogState -> Word64 -> Word64 -> IO (V.Vector (Word64,Atom)) -readEvents (LogState env _ _ _) first len = +readEvents :: EventLog -> Word64 -> Word64 -> IO (V.Vector (Word64,Atom)) +readEvents (EventLog env) first len = withWordPtr first $ \pIdx -> withKVPtrs (MDB_val 8 (castPtr pIdx)) (MDB_val 0 nullPtr) $ \pKey pVal -> do @@ -186,6 +130,9 @@ readEvents (LogState env _ _ _) first len = pure vec + +-- Utils ----------------------------------------------------------------------- + int :: Word64 -> Int int = fromIntegral @@ -193,44 +140,63 @@ assertErr :: Bool -> String -> IO () assertErr True _ = pure () assertErr False m = error m -latestEventNumber :: LogState -> IO Word64 -latestEventNumber (LogState env _ _ _) = - do - txn <- mdb_txn_begin env Nothing True - db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY] - cur <- mdb_cursor_open txn db - res <- fetch txn db cur - mdb_cursor_close cur - mdb_txn_abort txn - pure res - where - key = MDB_val 0 nullPtr - val = MDB_val 0 nullPtr - fetch txn db cur = - withKVPtrs key val $ \pKey pVal -> - mdb_cursor_get MDB_LAST cur pKey pVal >>= \case - False -> pure 0 - True -> peek pKey >>= mdbValToWord64 +maybeErr :: Maybe a -> String -> IO a +maybeErr (Just x) _ = pure x +maybeErr Nothing msg = error msg --------------------------------------------------------------------------------- +byteStringAsMdbVal :: ByteString -> (MDB_val -> IO a) -> IO a +byteStringAsMdbVal bs k = + BU.unsafeUseAsCStringLen bs \(ptr,sz) -> + k (MDB_val (fromIntegral sz) (castPtr ptr)) -readLogIdentity :: LogState -> IO LogIdentity -readLogIdentity (LogState env _ _ _) = do - txn <- mdb_txn_begin env Nothing True - db <- mdb_dbi_open txn (Just "META") [] - who <- get txn db "who" - is_fake <- get txn db "is-fake" - life <- get txn db "life" - mdb_txn_abort txn - pure (LogIdentity who is_fake life) +mdbValToWord64 :: MDB_val -> IO Word64 +mdbValToWord64 (MDB_val sz ptr) = do + assertErr (sz == 8) "wrong size in mdbValToWord64" + peek (castPtr ptr) -writeLogIdentity :: LogState -> LogIdentity -> IO () -writeLogIdentity (LogState env _ _ _) LogIdentity{..} = do - txn <- mdb_txn_begin env Nothing False - db <- mdb_dbi_open txn (Just "META") [MDB_CREATE] - let flags = compileWriteFlags [] - putNoun flags txn db "who" who - putNoun flags txn db "is-fake" is_fake - putNoun flags txn db "life" life - mdb_txn_commit txn - pure () +withWord64AsMDBval :: Word64 -> (MDB_val -> IO a) -> IO a +withWord64AsMDBval w cb = do + withWordPtr w $ \p -> + cb (MDB_val (fromIntegral (sizeOf w)) (castPtr p)) + +withWordPtr :: Word64 -> (Ptr Word64 -> IO a) -> IO a +withWordPtr w cb = do + allocaBytes (sizeOf w) (\p -> poke p w >> cb p) + + +-- Lower-Level Operations ------------------------------------------------------ + +get :: MDB_txn -> MDB_dbi -> ByteString -> IO Noun +get txn db key = + byteStringAsMdbVal key \mKey -> + mdb_get txn db mKey >>= maybe (error "mdb bad get") mdbValToNoun + +mdbValToAtom :: MDB_val -> IO Atom +mdbValToAtom (MDB_val sz ptr) = do + bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz) + pure (bs ^. from (pill . pillBS)) + +mdbValToNoun :: MDB_val -> IO Noun +mdbValToNoun (MDB_val sz ptr) = do + bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz) + let res = (bs ^? from pillBS . from pill . _Cue) + maybeErr res "mdb bad cue" + +putRaw :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> MDB_val -> MDB_val -> IO () +putRaw flags txn db key val = + mdb_put flags txn db key val >>= \case + True -> pure () + False -> error "mdb bad put" + +putNoun :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> ByteString -> Noun -> IO () +putNoun flags txn db key val = + byteStringAsMdbVal key $ \mKey -> + byteStringAsMdbVal (val ^. re _CueBytes) $ \mVal -> + putRaw flags txn db mKey mVal + +putJam :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> Word64 -> Jam -> IO () +putJam flags txn db id (Jam atom) = do + withWord64AsMDBval id $ \idVal -> do + let !bs = atom ^. pill . pillBS + byteStringAsMdbVal bs $ \mVal -> do + putRaw flags txn db idVal mVal diff --git a/pkg/hs-urbit/lib/Vere/Persist.hs b/pkg/hs-urbit/lib/Vere/Persist.hs new file mode 100644 index 0000000000..8a6678f59b --- /dev/null +++ b/pkg/hs-urbit/lib/Vere/Persist.hs @@ -0,0 +1,77 @@ +{- + TODO Close the database on uncaught exception. + TODO `Persist` should just be the thread id. + the thread should close the database when it is killed. +-} + +module Vere.Persist (start, stop) where + +import ClassyPrelude hiding (init) + +import Vere.Log +import Vere.Pier.Types +import Database.LMDB.Raw + + +-- Types ----------------------------------------------------------------------- + +data Persist = Persist EventLog (Async ()) + + +-- Start and Stop -------------------------------------------------------------- + +start :: EventLog + -> TQueue (Writ [Eff]) + -> (Writ [Eff] -> STM ()) + -> IO Persist +start log inp cb = do + tid <- asyncBound (persistThread log inp cb) + pure (Persist log tid) + +-- TODO: properly handle shutdowns during write +stop :: Persist -> IO () +stop (Persist log tid) = do + void (cancel tid) + void (waitCatch tid) + close log + + +-- Persist Thread -------------------------------------------------------------- + +-- TODO: We need to be able to send back an exception to the main thread on an +-- exception on the persistence thread. +persistThread :: EventLog + -> TQueue (Writ [Eff]) + -> (Writ [Eff] -> STM ()) + -> IO () +persistThread (EventLog env) inputQueue onPersist = + forever do + writs <- atomically $ readQueue inputQueue + writeEvents writs + atomically $ traverse_ onPersist writs + where + writeEvents writs = do + txn <- mdb_txn_begin env Nothing False + db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY] + + let flags = compileWriteFlags [MDB_NOOVERWRITE] + + for_ writs $ \w -> do + putJam flags txn db (eventId w) (event w) + + mdb_txn_commit txn + + +-- Get eventhing from the input queue. ----------------------------------------- + +{- + Read one or more items from a TQueue, only blocking on the first item. +-} +readQueue :: TQueue a -> STM (NonNull [a]) +readQueue q = + readTQueue q >>= go . singleton + where + go acc = + tryReadTQueue q >>= \case + Nothing -> pure (reverse acc) + Just item -> go (item <| acc) diff --git a/pkg/hs-urbit/lib/Vere/Pier.hs b/pkg/hs-urbit/lib/Vere/Pier.hs index 3a9ae2b968..ad08a92e6f 100644 --- a/pkg/hs-urbit/lib/Vere/Pier.hs +++ b/pkg/hs-urbit/lib/Vere/Pier.hs @@ -7,87 +7,75 @@ import Data.Noun.Pill import Vere import Vere.Pier.Types -import qualified Vere.Log as Log -import qualified Vere.Worker as Worker +import qualified Vere.Log as Log +import qualified Vere.Persist as Persist +import qualified Vere.Serf as Serf + +import Vere.Serf (Serf, EventId) + + +-------------------------------------------------------------------------------- ioDrivers = [] :: [IODriver] + +-------------------------------------------------------------------------------- + -- This is called to make a freshly booted pier. It assigns an identity to an -- event log and takes a chill pill. -newPier :: Pill -> FilePath -> LogIdentity -> IO Pier -newPier pill top id = do +boot :: Pill -> FilePath -> LogIdentity -> IO (Serf, EventLog, EventId, Mug) +boot pill top id = do let logPath = top <> "/log" - computeQueue <- newTQueueIO - persistQueue <- newTQueueIO - releaseQueue <- newTQueueIO + log <- Log.open logPath - -- What we really want to do is write the log identity and then do normal - -- startup, but writeLogIdentity requires a full log state including - -- input/output queues. - logState <- Log.init logPath persistQueue (writeTQueue releaseQueue) + Log.writeIdent log id - -- In first boot, we need to write this! - Log.writeLogIdentity logState id + serf <- Serf.startSerfProcess top + (e, m) <- Serf.bootSerf serf id pill - let logLatestEventNumber = 0 - let getEvents = Log.readEvents logState - - workerState <- Worker.startWorkerProcess - - Worker.bootWorker workerState id pill - - performCommonPierStartup workerState computeQueue persistQueue releaseQueue logState + pure (serf, log, e, m) --- This reads in a pier -runPierFromDisk :: FilePath -> IO Pier -runPierFromDisk top = do - let logPath = top <> "/log" +{- + What we really want to do is write the log identity and then do + normal startup, but writeIdent requires a full log state + including input/output queues. +-} +resume :: FilePath -> IO (Serf, EventLog, EventId, Mug) +resume top = do + log <- Log.open (top <> "/.urb/log") + ident <- Log.readIdent log + lastEv <- Log.latestEventNumber log + serf <- Serf.startSerfProcess top + (e, m) <- Serf.replay serf ident lastEv (Log.readEvents log) - computeQueue <- newTQueueIO - persistQueue <- newTQueueIO - releaseQueue <- newTQueueIO + pure (serf, log, e, m) - -- What we really want to do is write the log identity and then do normal - -- startup, but writeLogIdentity requires a full log state including - -- input/output queues. - logState <- Log.init logPath persistQueue (writeTQueue releaseQueue) - - -- In first boot, we need to write this! - id <- Log.readLogIdentity logState - logLatestEventNumber <- Log.latestEventNumber logState - - let getEvents = Log.readEvents logState - - workerState <- Worker.startWorkerProcess - Worker.resumeWorker workerState id logLatestEventNumber getEvents - - performCommonPierStartup workerState computeQueue persistQueue releaseQueue logState - - -performCommonPierStartup :: Worker.Worker +{- +performCommonPierStartup :: Serf.Serf -> TQueue Ovum -> TQueue (Writ [Eff]) -> TQueue (Writ [Eff]) -> LogState -> IO Pier -performCommonPierStartup workerState computeQueue persistQueue releaseQueue logState = do +performCommonPierStartup serf computeQ persistQ releaseQ logState = do for ioDrivers $ \x -> do bootMessage <- bornEvent x - atomically $ writeTQueue computeQueue bootMessage + atomically $ writeTQueue computeQ bootMessage driverThreads <- for ioDrivers $ \x -> do - startDriver x (writeTQueue computeQueue) + startDriver x (writeTQueue computeQ) -- TODO: Don't do a bunch of extra work; we send all events to all drivers portingThread <- async $ do forever $ do - r <- atomically (readTQueue releaseQueue) + r <- atomically (readTQueue releaseQ) for_ driverThreads $ \(_, k) -> for_ (payload r) $ \eff -> k eff - Worker.workerThread workerState (readTQueue computeQueue) undefined + Serf.workerThread serf (readTQueue computeQ) undefined pure (Pier{..}) +-} diff --git a/pkg/hs-urbit/lib/Vere/Pier/Types.hs b/pkg/hs-urbit/lib/Vere/Pier/Types.hs index 20128806d6..bd6bbbce5e 100644 --- a/pkg/hs-urbit/lib/Vere/Pier/Types.hs +++ b/pkg/hs-urbit/lib/Vere/Pier/Types.hs @@ -75,6 +75,8 @@ data Pier = Pier , portingThread :: Async () } +newtype EventLog = EventLog MDB_env + -- TODO: We are uncertain about q's type. There's some serious entanglement -- with u3_pier in this logic in the C code, and you might not be able to get -- away with anything less than passing the full u3_writ around. diff --git a/pkg/hs-urbit/lib/Vere/Worker.hs b/pkg/hs-urbit/lib/Vere/Serf.hs similarity index 64% rename from pkg/hs-urbit/lib/Vere/Worker.hs rename to pkg/hs-urbit/lib/Vere/Serf.hs index a9354194ab..67adbad31a 100644 --- a/pkg/hs-urbit/lib/Vere/Worker.hs +++ b/pkg/hs-urbit/lib/Vere/Serf.hs @@ -1,4 +1,4 @@ -module Vere.Worker where +module Vere.Serf where import ClassyPrelude import Control.Lens @@ -23,7 +23,7 @@ import Foreign.Storable (peek) import qualified Vere.Log as Log -data Worker = Worker +data Serf = Serf { sendHandle :: Handle , recvHandle :: Handle , process :: ProcessHandle @@ -31,7 +31,7 @@ data Worker = Worker -- , getInput :: STM (Writ ()) -- , onComputed :: Writ [Effect] -> STM () --- , onExit :: Worker -> IO () +-- , onExit :: Serf -> IO () -- , task :: Async () } @@ -39,20 +39,27 @@ data Worker = Worker -------------------------------------------------------------------------------- --- Think about how to handle process exit --- Tear down subprocess on exit? (terminiteProcess) -startWorkerProcess :: IO Worker -startWorkerProcess = +{- + TODO Think about how to handle process exit + TODO Tear down subprocess on exit? (terminiteProcess) + TODO `config` is a stub, fill it in. +-} +startSerfProcess :: FilePath -> IO Serf +startSerfProcess pier = do (Just i, Just o, _, p) <- createProcess pSpec - pure (Worker i o p) + pure (Serf i o p) where - pSpec = - (proc "urbit-worker" []) { std_in = CreatePipe - , std_out = CreatePipe - } + chkDir = traceShowId pier + diskKey = "" + config = "0" + args = [chkDir, diskKey, config] + pSpec = (proc "urbit-worker" args) + { std_in = CreatePipe + , std_out = CreatePipe + } -kill :: Worker -> IO ExitCode +kill :: Serf -> IO ExitCode kill w = do terminateProcess (process w) waitForProcess (process w) @@ -73,7 +80,7 @@ newtype ShipId = ShipId (Ship, Bool) -------------------------------------------------------------------------------- -type Play = Nullable (EventId, Mug, ShipId) +type Play = Maybe (EventId, Mug, ShipId) data Plea = Play Play @@ -106,27 +113,27 @@ instance FromNoun Plea where type CompletedEventId = Word64 type NextEventId = Word64 -type WorkerState = (EventId, Mug) +type SerfState = (EventId, Mug) type ReplacementEv = (EventId, Mug, Job) -type WorkResult = (EventId, Mug, [Eff]) -type WorkerResp = (Either ReplacementEv WorkResult) +type WorkResult = (EventId, Mug, [Eff]) +type SerfResp = (Either ReplacementEv WorkResult) -- Exceptions ------------------------------------------------------------------ -data WorkerExn +data SerfExn = BadComputeId EventId WorkResult | BadReplacementId EventId ReplacementEv | UnexpectedPlay EventId Play | BadPleaAtom Atom - | BadPleaNoun Noun + | BadPleaNoun Noun Text | ReplacedEventDuringReplay EventId ReplacementEv - | WorkerConnectionClosed + | SerfConnectionClosed | UnexpectedPleaOnNewShip Plea | InvalidInitialPlea Plea deriving (Show) -instance Exception WorkerExn +instance Exception SerfExn -- Utils ----------------------------------------------------------------------- @@ -140,25 +147,32 @@ fromJustExn :: Exception e => Maybe a -> e -> IO a fromJustExn Nothing exn = throwIO exn fromJustExn (Just x) exn = pure x +fromRightExn :: Exception e => Either Text a -> (Text -> e) -> IO a +fromRightExn (Left m) exn = throwIO (exn m) +fromRightExn (Right x) _ = pure x + -------------------------------------------------------------------------------- -sendAndRecv :: Worker -> EventId -> Atom -> IO WorkerResp +sendAndRecv :: Serf -> EventId -> Atom -> IO SerfResp sendAndRecv w eventId event = do + traceM ("sendAndRecv: " <> show eventId) sendAtom w $ work eventId (Jam event) - loop + res <- loop + traceM "sendAndRecv.done" + pure res where - produce :: WorkResult -> IO WorkerResp + produce :: WorkResult -> IO SerfResp produce (i, m, o) = do guardExn (i /= eventId) (BadComputeId eventId (i, m, o)) pure $ Right (i, m, o) - replace :: ReplacementEv -> IO WorkerResp + replace :: ReplacementEv -> IO SerfResp replace (i, m, j) = do guardExn (i /= eventId) (BadReplacementId eventId (i, m, j)) pure (Left (i, m, j)) - loop :: IO WorkerResp + loop :: IO SerfResp loop = recvPlea w >>= \case Play p -> throwIO (UnexpectedPlay eventId p) Done i m o -> produce (i, m, o) @@ -166,32 +180,43 @@ sendAndRecv w eventId event = Stdr _ cord -> print cord >> loop Slog _ pri t -> printTank pri t >> loop -sendBootEvent :: LogIdentity -> Worker -> IO () +sendBootEvent :: LogIdentity -> Serf -> IO () sendBootEvent id w = do sendAtom w $ jam $ toNoun (Cord "boot", id) -- the ship is booted, but it is behind. shove events to the worker until it is -- caught up. -replay :: Worker - -> WorkerState - -> LogIdentity - -> EventId - -> (EventId -> Word64 -> IO (Vector (EventId, Atom))) - -> IO (EventId, Mug) -replay w (wid, wmug) identity lastCommitedId getEvents = do +replayEvents :: Serf + -> SerfState + -> LogIdentity + -> EventId + -> (EventId -> Word64 -> IO (Vector (EventId, Atom))) + -> IO (EventId, Mug) +replayEvents w (wid, wmug) identity lastCommitedId getEvents = do + traceM ("replayEvents: " <> show wid <> " " <> show wmug) + when (wid == 1) (sendBootEvent identity w) vLast <- newIORef (wid, wmug) loop vLast wid - readIORef vLast + + res <- readIORef vLast + traceM ("replayEvents.return " <> show res) + pure res + where -- Replay events in batches of 1000. loop vLast curEvent = do + traceM ("replayEvents.loop: " <> show curEvent) let toRead = min 1000 (1 + lastCommitedId - curEvent) when (toRead > 0) do + traceM ("replayEvents.loop.getEvents " <> show toRead) + events <- getEvents curEvent toRead + traceM ("got events " <> show (length events)) + for_ events $ \(eventId, event) -> do sendAndRecv w eventId event >>= \case Left ev -> throwIO (ReplacedEventDuringReplay eventId ev) @@ -200,45 +225,35 @@ replay w (wid, wmug) identity lastCommitedId getEvents = do loop vLast (curEvent + toRead) -bootWorker :: Worker - -> LogIdentity - -> Pill - -> IO () -bootWorker w identity pill = +bootSerf :: Serf -> LogIdentity -> Pill -> IO (EventId, Mug) +bootSerf w ident pill = do recvPlea w >>= \case - Play Nil -> pure () - x@(Play _) -> throwIO (UnexpectedPleaOnNewShip x) - x -> throwIO (InvalidInitialPlea x) + Play Nothing -> pure () + x@(Play _) -> throwIO (UnexpectedPleaOnNewShip x) + x -> throwIO (InvalidInitialPlea x) -- TODO: actually boot the pill undefined - requestSnapshot w - -- Maybe return the current event id ? But we'll have to figure that out -- later. - pure () + pure undefined -resumeWorker :: Worker - -> LogIdentity - -> EventId - -> (EventId -> Word64 -> IO (Vector (EventId, Atom))) - -> IO (EventId, Mug) -resumeWorker w identity logLatestEventNumber eventFetcher = - do +type GetEvents = EventId -> Word64 -> IO (Vector (EventId, Atom)) + +replay :: Serf -> LogIdentity -> EventId -> GetEvents -> IO (EventId, Mug) +replay w ident lastEv getEvents = do ws@(eventId, mug) <- recvPlea w >>= \case - Play Nil -> pure (1, Mug 0) - Play (NotNil (e, m, _)) -> pure (e, m) - x -> throwIO (InvalidInitialPlea x) + Play Nothing -> pure (1, Mug 0) + Play (Just (e, m, _)) -> pure (e, m) + x -> throwIO (InvalidInitialPlea x) - r <- replay w ws identity logLatestEventNumber eventFetcher + traceM ("got plea! " <> show eventId <> " " <> show mug) - requestSnapshot w + replayEvents w ws ident lastEv getEvents - pure r - -workerThread :: Worker -> STM Ovum -> (EventId, Mug) -> IO (Async ()) +workerThread :: Serf -> STM Ovum -> (EventId, Mug) -> IO (Async ()) workerThread w getEvent (evendId, mug) = async $ forever do ovum <- atomically $ getEvent @@ -247,15 +262,15 @@ workerThread w getEvent (evendId, mug) = async $ forever do let mat = jam (undefined (mug, currentDate, ovum)) undefined - + -- Writ (eventId + 1) Nothing mat -- -- assign a new event id. -- -- assign a date -- -- get current mug state -- -- (jam [mug event]) - -- sendAndRecv + -- sendAndRecv -requestSnapshot :: Worker -> IO () +requestSnapshot :: Serf -> IO () requestSnapshot w = undefined -- The flow here is that we start the worker and then we receive a play event @@ -263,7 +278,7 @@ requestSnapshot w = undefined -- -- <- [%play ...] -- --- Base on this, the main flow is +-- Base on this, the main flow is -- -- [%work ] -> @@ -281,8 +296,12 @@ requestSnapshot w = undefined -- Basic Send and Receive Operations ------------------------------------------- -sendAtom :: Worker -> Atom -> IO () -sendAtom w a = hPut (sendHandle w) (unpackAtom a) +sendAtom :: Serf -> Atom -> IO () +sendAtom w a = do + traceM "sendAtom" + hPut (sendHandle w) (unpackAtom a) + hFlush (sendHandle w) + traceM "sendAtom.return ()" atomBytes :: Iso' Atom ByteString atomBytes = pill . pillBS @@ -292,26 +311,44 @@ packAtom = view (from atomBytes) unpackAtom :: Atom -> ByteString unpackAtom = view atomBytes -recvLen :: Worker -> IO Word64 +recvLen :: Serf -> IO Word64 recvLen w = do + traceM "recvLen.wait" bs <- hGet (recvHandle w) 8 + traceM "recvLen.got" case length bs of -- This is not big endian safe 8 -> unsafeUseAsCString bs (peek . castPtr) - _ -> throwIO WorkerConnectionClosed + _ -> throwIO SerfConnectionClosed -recvBytes :: Worker -> Word64 -> IO ByteString -recvBytes w = hGet (recvHandle w) . fromIntegral +recvBytes :: Serf -> Word64 -> IO ByteString +recvBytes w = do + traceM "recvBytes" + hGet (recvHandle w) . fromIntegral -recvAtom :: Worker -> IO Atom +recvAtom :: Serf -> IO Atom recvAtom w = do + traceM "recvAtom" len <- recvLen w bs <- recvBytes w len pure (packAtom bs) -recvPlea :: Worker -> IO Plea +cordString :: Cord -> String +cordString (Cord bs) = unpack $ decodeUtf8 bs + +recvPlea :: Serf -> IO Plea recvPlea w = do + traceM "recvPlea" + a <- recvAtom w + traceM ("recvPlea.cue " <> show (length $ a ^. atomBytes)) n <- fromJustExn (cue a) (BadPleaAtom a) - p <- fromJustExn (fromNoun n) (BadPleaNoun n) - pure p + traceM "recvPlea.doneCue" + p <- fromRightExn (fromNounErr n) (BadPleaNoun n) + + traceM "recvPlea.done" + + -- TODO Hack! + case p of + Stdr e msg -> traceM (cordString msg) >> recvPlea w + _ -> pure p diff --git a/pkg/hs-vere/app/test/Main.hs b/pkg/hs-vere/app/test/Main.hs index 645674b1f8..3c79130023 100644 --- a/pkg/hs-vere/app/test/Main.hs +++ b/pkg/hs-vere/app/test/Main.hs @@ -2,45 +2,78 @@ module Main where import ClassyPrelude import Vere.Pier.Types -import Data.Noun.Jam hiding (main) -import qualified Vere.Log as Log + +import Data.Noun.Jam () + +import qualified Vere.Log as Log +import qualified Vere.Persist as Persist +import qualified Vere.Pier as Pier + + +-------------------------------------------------------------------------------- main :: IO () main = do - let logPath = "/Users/erg/src/urbit/zod/.urb/falselog/" + (s,l,e,m) <- Pier.resume "/home/benjamin/r/urbit/zod/" + + putStrLn "Resumed!" + + pure () + +-------------------------------------------------------------------------------- + +tryCopyLog :: IO () +tryCopyLog = do + let logPath = "/Users/erg/src/urbit/zod/.urb/falselog/" falselogPath = "/Users/erg/src/urbit/zod/.urb/falselog2/" - -- junk - persistQueue <- newTQueueIO - releaseQueue <- newTQueueIO - logState <- Log.init logPath persistQueue (writeTQueue releaseQueue) + ---------------------------------------- - -- - logId <- Log.readLogIdentity logState - print logId + persistQ <- newTQueueIO + releaseQ <- newTQueueIO + log <- Log.open logPath + persist <- Persist.start log persistQ (writeTQueue releaseQ) + ident <- Log.readIdent log - -- - latestEvent <- Log.latestEventNumber logState - print latestEvent + ---------------------------------------- - -- - events <- Log.readEvents logState 1 3142 - --print $ cue . snd <$> events + lastEv <- Log.latestEventNumber log + events <- Log.readEvents log 1 3142 - -- - persistQueue2 <- newTQueueIO - releaseQueue2 <- newTQueueIO - falseLogState <- Log.init falselogPath persistQueue2 (writeTQueue releaseQueue2) + ---------------------------------------- - Log.writeLogIdentity falseLogState logId + print ident + print lastEv + print (length events) + + ---------------------------------------- + + persistQ2 <- newTQueueIO + releaseQ2 <- newTQueueIO + log2 <- Log.open falselogPath + persist2 <- Persist.start log2 persistQ2 (writeTQueue releaseQ2) + + ---------------------------------------- + + Log.writeIdent log2 ident let writs = events <&> \(id, a) -> - Writ id Nothing (Jam a) [] + Writ id Nothing (Jam a) [] + + ---------------------------------------- print "About to write" - for_ writs $ \w -> atomically $ writeTQueue persistQueue2 w + + for_ writs $ \w -> + atomically (writeTQueue persistQ2 w) + + ---------------------------------------- print "About to wait" - replicateM_ 100 $ atomically $ readTQueue releaseQueue2 + replicateM_ 100 $ do + atomically $ readTQueue releaseQ2 + + ---------------------------------------- + print "Done" diff --git a/pkg/urbit/vere/pier.c b/pkg/urbit/vere/pier.c index f69c9f7a71..2873daf17f 100644 --- a/pkg/urbit/vere/pier.c +++ b/pkg/urbit/vere/pier.c @@ -1047,10 +1047,10 @@ _pier_work_create(u3_pier* pir_u) sprintf(wag_c, "%u", pir_u->wag_w); - arg_c[0] = bin_c; // executable - arg_c[1] = pax_c; // path to checkpoint directory - arg_c[2] = key_c; // disk key - arg_c[3] = wag_c; // runtime config + arg_c[0] = bin_c; // executable + arg_c[1] = pax_c; // path to checkpoint directory (might be the pier, might be $pier/chk) + arg_c[2] = key_c; // disk key (ignored) + arg_c[3] = wag_c; // runtime config arg_c[4] = 0; uv_pipe_init(u3L, &god_u->inn_u.pyp_u, 0); diff --git a/stack.yaml b/stack.yaml index 152439f36f..00bb63e7f2 100644 --- a/stack.yaml +++ b/stack.yaml @@ -18,6 +18,8 @@ nix: - SDL2_image - zlib +ghc-options: + urbit: '-fobject-code' # build: # library-profiling: true