Got something working: Can "replay" event log for ship whos snapshot is already up to date..

This commit is contained in:
Benjamin Summers 2019-06-24 18:10:41 -07:00
parent f6c6cb3e71
commit 6565c06fd4
9 changed files with 427 additions and 316 deletions

View File

@ -79,7 +79,7 @@ instance Applicative IResult where
(<*>) = ap
instance Fail.MonadFail IResult where
fail err = IError [] err
fail err = traceM ("!" <> err <> "!") >> IError [] err
instance Monad IResult where
return = pure
@ -203,6 +203,12 @@ fromNoun n = runParser (parseNoun n) [] onFail onSuccess
onFail p m = Nothing
onSuccess x = Just x
fromNounErr :: FromNoun a => Noun -> Either Text a
fromNounErr n = runParser (parseNoun n) [] onFail onSuccess
where
onFail p m = Left (pack m)
onSuccess x = Right x
_Poet :: (ToNoun a, FromNoun a) => Prism' Noun a
_Poet = prism' toNoun fromNoun
@ -287,7 +293,7 @@ instance ToNoun a => ToNoun (Nullable a) where
instance FromNoun a => FromNoun (Nullable a) where
parseNoun (Atom 0) = pure Nil
parseNoun (Atom n) = fail ("Expected ?@(~ ^), but got " <> show n)
parseNoun (Atom n) = fail ("Nullable: expected ?@(~ ^), but got " <> show n)
parseNoun n = NotNil <$> parseNoun n

View File

@ -1,13 +1,15 @@
-- TODO: Make sure transaction closed in all error cases
-- TODO: Don't allow writing non-contiguous events
module Vere.Log (
init,
shutdown,
-- we don't export write; you use the queue
readEvents,
latestEventNumber,
readLogIdentity,
writeLogIdentity
{-
TODO: Make sure transaction closed in all error cases
TODO: Don't allow writing non-contiguous events
-}
module Vere.Log ( open
, close
, readEvents
, latestEventNumber
, readIdent
, writeIdent
, putJam
) where
import ClassyPrelude hiding (init)
@ -32,131 +34,73 @@ import qualified Data.ByteString as B
import qualified Data.Vector as V
import qualified Data.Vector.Mutable as MV
--------------------------------------------------------------------------------
-- TODO: Handle throws on the async
init :: FilePath -> TQueue (Writ [Eff]) -> (Writ [Eff] -> STM ())
-> IO LogState
init dir inp cb = do
-- Open/Close an Event Log -----------------------------------------------------
open :: FilePath -> IO EventLog
open dir = do
env <- mdb_env_create
mdb_env_set_maxdbs env 3
mdb_env_set_mapsize env (40 * 1024 * 1024 * 1024)
mdb_env_open env dir []
writer <- persistThread env inp cb
pure (LogState env inp cb writer)
pure (EventLog env)
-- TODO: properly handle shutdowns during write
shutdown :: LogState -> IO ()
shutdown s = do
void $ waitCancel (writer s)
mdb_env_close (env s)
close :: EventLog -> IO ()
close (EventLog env) = mdb_env_close env
waitCancel :: Async a -> IO (Either SomeException a)
waitCancel async = cancel async >> waitCatch async
--------------------------------------------------------------------------------
-- Read/Write Log Identity -----------------------------------------------------
{-
Read one or more items from a TQueue, only blocking on the first item.
-}
readQueue :: TQueue a -> STM (NonNull [a])
readQueue q =
readTQueue q >>= go . singleton
where
go acc =
tryReadTQueue q >>= \case
Nothing -> pure (reverse acc)
Just item -> go (item <| acc)
readIdent :: EventLog -> IO LogIdentity
readIdent (EventLog env) = do
txn <- mdb_txn_begin env Nothing True
db <- mdb_dbi_open txn (Just "META") []
who <- get txn db "who"
is_fake <- get txn db "is-fake"
life <- get txn db "life"
mdb_txn_abort txn
pure (LogIdentity who is_fake life)
byteStringAsMdbVal :: ByteString -> (MDB_val -> IO a) -> IO a
byteStringAsMdbVal bs k =
BU.unsafeUseAsCStringLen bs \(ptr,sz) ->
k (MDB_val (fromIntegral sz) (castPtr ptr))
mdbValToAtom :: MDB_val -> IO Atom
mdbValToAtom (MDB_val sz ptr) = do
bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
pure (bs ^. from (pill . pillBS))
maybeErr :: Maybe a -> String -> IO a
maybeErr (Just x) _ = pure x
maybeErr Nothing msg = error msg
mdbValToNoun :: MDB_val -> IO Noun
mdbValToNoun (MDB_val sz ptr) = do
bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
let res = (bs ^? from pillBS . from pill . _Cue)
maybeErr res "mdb bad cue"
putRaw :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> MDB_val -> MDB_val -> IO ()
putRaw flags txn db key val =
mdb_put flags txn db key val >>= \case
True -> pure ()
False -> error "mdb bad put"
putNoun :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> ByteString -> Noun -> IO ()
putNoun flags txn db key val =
byteStringAsMdbVal key $ \mKey ->
byteStringAsMdbVal (val ^. re _CueBytes) $ \mVal ->
putRaw flags txn db mKey mVal
putJam :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> Word64 -> Jam -> IO ()
putJam flags txn db id (Jam atom) = do
withWord64AsMDBval id $ \idVal -> do
let !bs = atom ^. pill . pillBS
byteStringAsMdbVal bs $ \mVal -> do
putRaw flags txn db idVal mVal
get :: MDB_txn -> MDB_dbi -> ByteString -> IO Noun
get txn db key =
byteStringAsMdbVal key \mKey ->
mdb_get txn db mKey >>= maybe (error "mdb bad get") mdbValToNoun
mdbValToWord64 :: MDB_val -> IO Word64
mdbValToWord64 (MDB_val sz ptr) = do
assertErr (sz == 8) "wrong size in mdbValToWord64"
peek (castPtr ptr)
withWord64AsMDBval :: Word64 -> (MDB_val -> IO a) -> IO a
withWord64AsMDBval w cb = do
withWordPtr w $ \p ->
cb (MDB_val (fromIntegral (sizeOf w)) (castPtr p))
--------------------------------------------------------------------------------
withWordPtr :: Word64 -> (Ptr Word64 -> IO a) -> IO a
withWordPtr w cb = do
allocaBytes (sizeOf w) (\p -> poke p w >> cb p)
-- TODO: We need to be able to send back an exception to the main thread on an
-- exception on the persistence thread.
persistThread :: MDB_env
-> TQueue (Writ [Eff])
-> (Writ [Eff] -> STM ())
-> IO (Async ())
persistThread env inputQueue onPersist = asyncBound $
forever do
writs <- atomically $ readQueue inputQueue
writeEvents writs
atomically $ traverse_ onPersist writs
where
writeEvents writs = do
writeIdent :: EventLog -> LogIdentity -> IO ()
writeIdent (EventLog env) LogIdentity{..} = do
txn <- mdb_txn_begin env Nothing False
db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY]
let flags = compileWriteFlags [MDB_NOOVERWRITE]
for_ writs $ \w -> do
putJam flags txn db (eventId w) (event w)
db <- mdb_dbi_open txn (Just "META") [MDB_CREATE]
let flags = compileWriteFlags []
putNoun flags txn db "who" who
putNoun flags txn db "is-fake" is_fake
putNoun flags txn db "life" life
mdb_txn_commit txn
pure ()
deriving instance Show MDB_val
-- Latest Event Number ---------------------------------------------------------
latestEventNumber :: EventLog -> IO Word64
latestEventNumber (EventLog env) =
do
txn <- mdb_txn_begin env Nothing True
db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY]
cur <- mdb_cursor_open txn db
res <- fetch txn db cur
mdb_cursor_close cur
mdb_txn_abort txn
pure res
where
key = MDB_val 0 nullPtr
val = MDB_val 0 nullPtr
fetch txn db cur =
withKVPtrs key val $ \pKey pVal ->
mdb_cursor_get MDB_LAST cur pKey pVal >>= \case
False -> pure 0
True -> peek pKey >>= mdbValToWord64
-- Read Events -----------------------------------------------------------------
-- TODO: This will read len items and will error if there are less than that
-- available. This differs from the current pier.c's expectations.
readEvents :: LogState -> Word64 -> Word64 -> IO (V.Vector (Word64,Atom))
readEvents (LogState env _ _ _) first len =
readEvents :: EventLog -> Word64 -> Word64 -> IO (V.Vector (Word64,Atom))
readEvents (EventLog env) first len =
withWordPtr first $ \pIdx ->
withKVPtrs (MDB_val 8 (castPtr pIdx)) (MDB_val 0 nullPtr) $ \pKey pVal ->
do
@ -186,6 +130,9 @@ readEvents (LogState env _ _ _) first len =
pure vec
-- Utils -----------------------------------------------------------------------
int :: Word64 -> Int
int = fromIntegral
@ -193,44 +140,63 @@ assertErr :: Bool -> String -> IO ()
assertErr True _ = pure ()
assertErr False m = error m
latestEventNumber :: LogState -> IO Word64
latestEventNumber (LogState env _ _ _) =
do
txn <- mdb_txn_begin env Nothing True
db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY]
cur <- mdb_cursor_open txn db
res <- fetch txn db cur
mdb_cursor_close cur
mdb_txn_abort txn
pure res
where
key = MDB_val 0 nullPtr
val = MDB_val 0 nullPtr
fetch txn db cur =
withKVPtrs key val $ \pKey pVal ->
mdb_cursor_get MDB_LAST cur pKey pVal >>= \case
False -> pure 0
True -> peek pKey >>= mdbValToWord64
maybeErr :: Maybe a -> String -> IO a
maybeErr (Just x) _ = pure x
maybeErr Nothing msg = error msg
--------------------------------------------------------------------------------
byteStringAsMdbVal :: ByteString -> (MDB_val -> IO a) -> IO a
byteStringAsMdbVal bs k =
BU.unsafeUseAsCStringLen bs \(ptr,sz) ->
k (MDB_val (fromIntegral sz) (castPtr ptr))
readLogIdentity :: LogState -> IO LogIdentity
readLogIdentity (LogState env _ _ _) = do
txn <- mdb_txn_begin env Nothing True
db <- mdb_dbi_open txn (Just "META") []
who <- get txn db "who"
is_fake <- get txn db "is-fake"
life <- get txn db "life"
mdb_txn_abort txn
pure (LogIdentity who is_fake life)
mdbValToWord64 :: MDB_val -> IO Word64
mdbValToWord64 (MDB_val sz ptr) = do
assertErr (sz == 8) "wrong size in mdbValToWord64"
peek (castPtr ptr)
writeLogIdentity :: LogState -> LogIdentity -> IO ()
writeLogIdentity (LogState env _ _ _) LogIdentity{..} = do
txn <- mdb_txn_begin env Nothing False
db <- mdb_dbi_open txn (Just "META") [MDB_CREATE]
let flags = compileWriteFlags []
putNoun flags txn db "who" who
putNoun flags txn db "is-fake" is_fake
putNoun flags txn db "life" life
mdb_txn_commit txn
pure ()
withWord64AsMDBval :: Word64 -> (MDB_val -> IO a) -> IO a
withWord64AsMDBval w cb = do
withWordPtr w $ \p ->
cb (MDB_val (fromIntegral (sizeOf w)) (castPtr p))
withWordPtr :: Word64 -> (Ptr Word64 -> IO a) -> IO a
withWordPtr w cb = do
allocaBytes (sizeOf w) (\p -> poke p w >> cb p)
-- Lower-Level Operations ------------------------------------------------------
get :: MDB_txn -> MDB_dbi -> ByteString -> IO Noun
get txn db key =
byteStringAsMdbVal key \mKey ->
mdb_get txn db mKey >>= maybe (error "mdb bad get") mdbValToNoun
mdbValToAtom :: MDB_val -> IO Atom
mdbValToAtom (MDB_val sz ptr) = do
bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
pure (bs ^. from (pill . pillBS))
mdbValToNoun :: MDB_val -> IO Noun
mdbValToNoun (MDB_val sz ptr) = do
bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
let res = (bs ^? from pillBS . from pill . _Cue)
maybeErr res "mdb bad cue"
putRaw :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> MDB_val -> MDB_val -> IO ()
putRaw flags txn db key val =
mdb_put flags txn db key val >>= \case
True -> pure ()
False -> error "mdb bad put"
putNoun :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> ByteString -> Noun -> IO ()
putNoun flags txn db key val =
byteStringAsMdbVal key $ \mKey ->
byteStringAsMdbVal (val ^. re _CueBytes) $ \mVal ->
putRaw flags txn db mKey mVal
putJam :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> Word64 -> Jam -> IO ()
putJam flags txn db id (Jam atom) = do
withWord64AsMDBval id $ \idVal -> do
let !bs = atom ^. pill . pillBS
byteStringAsMdbVal bs $ \mVal -> do
putRaw flags txn db idVal mVal

View File

@ -0,0 +1,77 @@
{-
TODO Close the database on uncaught exception.
TODO `Persist` should just be the thread id.
the thread should close the database when it is killed.
-}
module Vere.Persist (start, stop) where
import ClassyPrelude hiding (init)
import Vere.Log
import Vere.Pier.Types
import Database.LMDB.Raw
-- Types -----------------------------------------------------------------------
data Persist = Persist EventLog (Async ())
-- Start and Stop --------------------------------------------------------------
start :: EventLog
-> TQueue (Writ [Eff])
-> (Writ [Eff] -> STM ())
-> IO Persist
start log inp cb = do
tid <- asyncBound (persistThread log inp cb)
pure (Persist log tid)
-- TODO: properly handle shutdowns during write
stop :: Persist -> IO ()
stop (Persist log tid) = do
void (cancel tid)
void (waitCatch tid)
close log
-- Persist Thread --------------------------------------------------------------
-- TODO: We need to be able to send back an exception to the main thread on an
-- exception on the persistence thread.
persistThread :: EventLog
-> TQueue (Writ [Eff])
-> (Writ [Eff] -> STM ())
-> IO ()
persistThread (EventLog env) inputQueue onPersist =
forever do
writs <- atomically $ readQueue inputQueue
writeEvents writs
atomically $ traverse_ onPersist writs
where
writeEvents writs = do
txn <- mdb_txn_begin env Nothing False
db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY]
let flags = compileWriteFlags [MDB_NOOVERWRITE]
for_ writs $ \w -> do
putJam flags txn db (eventId w) (event w)
mdb_txn_commit txn
-- Get eventhing from the input queue. -----------------------------------------
{-
Read one or more items from a TQueue, only blocking on the first item.
-}
readQueue :: TQueue a -> STM (NonNull [a])
readQueue q =
readTQueue q >>= go . singleton
where
go acc =
tryReadTQueue q >>= \case
Nothing -> pure (reverse acc)
Just item -> go (item <| acc)

View File

@ -8,86 +8,74 @@ import Vere
import Vere.Pier.Types
import qualified Vere.Log as Log
import qualified Vere.Worker as Worker
import qualified Vere.Persist as Persist
import qualified Vere.Serf as Serf
import Vere.Serf (Serf, EventId)
--------------------------------------------------------------------------------
ioDrivers = [] :: [IODriver]
--------------------------------------------------------------------------------
-- This is called to make a freshly booted pier. It assigns an identity to an
-- event log and takes a chill pill.
newPier :: Pill -> FilePath -> LogIdentity -> IO Pier
newPier pill top id = do
boot :: Pill -> FilePath -> LogIdentity -> IO (Serf, EventLog, EventId, Mug)
boot pill top id = do
let logPath = top <> "/log"
computeQueue <- newTQueueIO
persistQueue <- newTQueueIO
releaseQueue <- newTQueueIO
log <- Log.open logPath
-- What we really want to do is write the log identity and then do normal
-- startup, but writeLogIdentity requires a full log state including
-- input/output queues.
logState <- Log.init logPath persistQueue (writeTQueue releaseQueue)
Log.writeIdent log id
-- In first boot, we need to write this!
Log.writeLogIdentity logState id
serf <- Serf.startSerfProcess top
(e, m) <- Serf.bootSerf serf id pill
let logLatestEventNumber = 0
let getEvents = Log.readEvents logState
workerState <- Worker.startWorkerProcess
Worker.bootWorker workerState id pill
performCommonPierStartup workerState computeQueue persistQueue releaseQueue logState
pure (serf, log, e, m)
-- This reads in a pier
runPierFromDisk :: FilePath -> IO Pier
runPierFromDisk top = do
let logPath = top <> "/log"
{-
What we really want to do is write the log identity and then do
normal startup, but writeIdent requires a full log state
including input/output queues.
-}
resume :: FilePath -> IO (Serf, EventLog, EventId, Mug)
resume top = do
log <- Log.open (top <> "/.urb/log")
ident <- Log.readIdent log
lastEv <- Log.latestEventNumber log
serf <- Serf.startSerfProcess top
(e, m) <- Serf.replay serf ident lastEv (Log.readEvents log)
computeQueue <- newTQueueIO
persistQueue <- newTQueueIO
releaseQueue <- newTQueueIO
pure (serf, log, e, m)
-- What we really want to do is write the log identity and then do normal
-- startup, but writeLogIdentity requires a full log state including
-- input/output queues.
logState <- Log.init logPath persistQueue (writeTQueue releaseQueue)
-- In first boot, we need to write this!
id <- Log.readLogIdentity logState
logLatestEventNumber <- Log.latestEventNumber logState
let getEvents = Log.readEvents logState
workerState <- Worker.startWorkerProcess
Worker.resumeWorker workerState id logLatestEventNumber getEvents
performCommonPierStartup workerState computeQueue persistQueue releaseQueue logState
performCommonPierStartup :: Worker.Worker
{-
performCommonPierStartup :: Serf.Serf
-> TQueue Ovum
-> TQueue (Writ [Eff])
-> TQueue (Writ [Eff])
-> LogState
-> IO Pier
performCommonPierStartup workerState computeQueue persistQueue releaseQueue logState = do
performCommonPierStartup serf computeQ persistQ releaseQ logState = do
for ioDrivers $ \x -> do
bootMessage <- bornEvent x
atomically $ writeTQueue computeQueue bootMessage
atomically $ writeTQueue computeQ bootMessage
driverThreads <- for ioDrivers $ \x -> do
startDriver x (writeTQueue computeQueue)
startDriver x (writeTQueue computeQ)
-- TODO: Don't do a bunch of extra work; we send all events to all drivers
portingThread <- async $ do
forever $ do
r <- atomically (readTQueue releaseQueue)
r <- atomically (readTQueue releaseQ)
for_ driverThreads $ \(_, k) ->
for_ (payload r) $ \eff ->
k eff
Worker.workerThread workerState (readTQueue computeQueue) undefined
Serf.workerThread serf (readTQueue computeQ) undefined
pure (Pier{..})
-}

View File

@ -75,6 +75,8 @@ data Pier = Pier
, portingThread :: Async ()
}
newtype EventLog = EventLog MDB_env
-- TODO: We are uncertain about q's type. There's some serious entanglement
-- with u3_pier in this logic in the C code, and you might not be able to get
-- away with anything less than passing the full u3_writ around.

View File

@ -1,4 +1,4 @@
module Vere.Worker where
module Vere.Serf where
import ClassyPrelude
import Control.Lens
@ -23,7 +23,7 @@ import Foreign.Storable (peek)
import qualified Vere.Log as Log
data Worker = Worker
data Serf = Serf
{ sendHandle :: Handle
, recvHandle :: Handle
, process :: ProcessHandle
@ -31,7 +31,7 @@ data Worker = Worker
-- , getInput :: STM (Writ ())
-- , onComputed :: Writ [Effect] -> STM ()
-- , onExit :: Worker -> IO ()
-- , onExit :: Serf -> IO ()
-- , task :: Async ()
}
@ -39,20 +39,27 @@ data Worker = Worker
--------------------------------------------------------------------------------
-- Think about how to handle process exit
-- Tear down subprocess on exit? (terminiteProcess)
startWorkerProcess :: IO Worker
startWorkerProcess =
{-
TODO Think about how to handle process exit
TODO Tear down subprocess on exit? (terminiteProcess)
TODO `config` is a stub, fill it in.
-}
startSerfProcess :: FilePath -> IO Serf
startSerfProcess pier =
do
(Just i, Just o, _, p) <- createProcess pSpec
pure (Worker i o p)
pure (Serf i o p)
where
pSpec =
(proc "urbit-worker" []) { std_in = CreatePipe
chkDir = traceShowId pier
diskKey = ""
config = "0"
args = [chkDir, diskKey, config]
pSpec = (proc "urbit-worker" args)
{ std_in = CreatePipe
, std_out = CreatePipe
}
kill :: Worker -> IO ExitCode
kill :: Serf -> IO ExitCode
kill w = do
terminateProcess (process w)
waitForProcess (process w)
@ -73,7 +80,7 @@ newtype ShipId = ShipId (Ship, Bool)
--------------------------------------------------------------------------------
type Play = Nullable (EventId, Mug, ShipId)
type Play = Maybe (EventId, Mug, ShipId)
data Plea
= Play Play
@ -106,27 +113,27 @@ instance FromNoun Plea where
type CompletedEventId = Word64
type NextEventId = Word64
type WorkerState = (EventId, Mug)
type SerfState = (EventId, Mug)
type ReplacementEv = (EventId, Mug, Job)
type WorkResult = (EventId, Mug, [Eff])
type WorkerResp = (Either ReplacementEv WorkResult)
type SerfResp = (Either ReplacementEv WorkResult)
-- Exceptions ------------------------------------------------------------------
data WorkerExn
data SerfExn
= BadComputeId EventId WorkResult
| BadReplacementId EventId ReplacementEv
| UnexpectedPlay EventId Play
| BadPleaAtom Atom
| BadPleaNoun Noun
| BadPleaNoun Noun Text
| ReplacedEventDuringReplay EventId ReplacementEv
| WorkerConnectionClosed
| SerfConnectionClosed
| UnexpectedPleaOnNewShip Plea
| InvalidInitialPlea Plea
deriving (Show)
instance Exception WorkerExn
instance Exception SerfExn
-- Utils -----------------------------------------------------------------------
@ -140,25 +147,32 @@ fromJustExn :: Exception e => Maybe a -> e -> IO a
fromJustExn Nothing exn = throwIO exn
fromJustExn (Just x) exn = pure x
fromRightExn :: Exception e => Either Text a -> (Text -> e) -> IO a
fromRightExn (Left m) exn = throwIO (exn m)
fromRightExn (Right x) _ = pure x
--------------------------------------------------------------------------------
sendAndRecv :: Worker -> EventId -> Atom -> IO WorkerResp
sendAndRecv :: Serf -> EventId -> Atom -> IO SerfResp
sendAndRecv w eventId event =
do
traceM ("sendAndRecv: " <> show eventId)
sendAtom w $ work eventId (Jam event)
loop
res <- loop
traceM "sendAndRecv.done"
pure res
where
produce :: WorkResult -> IO WorkerResp
produce :: WorkResult -> IO SerfResp
produce (i, m, o) = do
guardExn (i /= eventId) (BadComputeId eventId (i, m, o))
pure $ Right (i, m, o)
replace :: ReplacementEv -> IO WorkerResp
replace :: ReplacementEv -> IO SerfResp
replace (i, m, j) = do
guardExn (i /= eventId) (BadReplacementId eventId (i, m, j))
pure (Left (i, m, j))
loop :: IO WorkerResp
loop :: IO SerfResp
loop = recvPlea w >>= \case
Play p -> throwIO (UnexpectedPlay eventId p)
Done i m o -> produce (i, m, o)
@ -166,32 +180,43 @@ sendAndRecv w eventId event =
Stdr _ cord -> print cord >> loop
Slog _ pri t -> printTank pri t >> loop
sendBootEvent :: LogIdentity -> Worker -> IO ()
sendBootEvent :: LogIdentity -> Serf -> IO ()
sendBootEvent id w = do
sendAtom w $ jam $ toNoun (Cord "boot", id)
-- the ship is booted, but it is behind. shove events to the worker until it is
-- caught up.
replay :: Worker
-> WorkerState
replayEvents :: Serf
-> SerfState
-> LogIdentity
-> EventId
-> (EventId -> Word64 -> IO (Vector (EventId, Atom)))
-> IO (EventId, Mug)
replay w (wid, wmug) identity lastCommitedId getEvents = do
replayEvents w (wid, wmug) identity lastCommitedId getEvents = do
traceM ("replayEvents: " <> show wid <> " " <> show wmug)
when (wid == 1) (sendBootEvent identity w)
vLast <- newIORef (wid, wmug)
loop vLast wid
readIORef vLast
res <- readIORef vLast
traceM ("replayEvents.return " <> show res)
pure res
where
-- Replay events in batches of 1000.
loop vLast curEvent = do
traceM ("replayEvents.loop: " <> show curEvent)
let toRead = min 1000 (1 + lastCommitedId - curEvent)
when (toRead > 0) do
traceM ("replayEvents.loop.getEvents " <> show toRead)
events <- getEvents curEvent toRead
traceM ("got events " <> show (length events))
for_ events $ \(eventId, event) -> do
sendAndRecv w eventId event >>= \case
Left ev -> throwIO (ReplacedEventDuringReplay eventId ev)
@ -200,45 +225,35 @@ replay w (wid, wmug) identity lastCommitedId getEvents = do
loop vLast (curEvent + toRead)
bootWorker :: Worker
-> LogIdentity
-> Pill
-> IO ()
bootWorker w identity pill =
bootSerf :: Serf -> LogIdentity -> Pill -> IO (EventId, Mug)
bootSerf w ident pill =
do
recvPlea w >>= \case
Play Nil -> pure ()
Play Nothing -> pure ()
x@(Play _) -> throwIO (UnexpectedPleaOnNewShip x)
x -> throwIO (InvalidInitialPlea x)
-- TODO: actually boot the pill
undefined
requestSnapshot w
-- Maybe return the current event id ? But we'll have to figure that out
-- later.
pure ()
pure undefined
resumeWorker :: Worker
-> LogIdentity
-> EventId
-> (EventId -> Word64 -> IO (Vector (EventId, Atom)))
-> IO (EventId, Mug)
resumeWorker w identity logLatestEventNumber eventFetcher =
do
type GetEvents = EventId -> Word64 -> IO (Vector (EventId, Atom))
replay :: Serf -> LogIdentity -> EventId -> GetEvents -> IO (EventId, Mug)
replay w ident lastEv getEvents = do
ws@(eventId, mug) <- recvPlea w >>= \case
Play Nil -> pure (1, Mug 0)
Play (NotNil (e, m, _)) -> pure (e, m)
Play Nothing -> pure (1, Mug 0)
Play (Just (e, m, _)) -> pure (e, m)
x -> throwIO (InvalidInitialPlea x)
r <- replay w ws identity logLatestEventNumber eventFetcher
traceM ("got plea! " <> show eventId <> " " <> show mug)
requestSnapshot w
replayEvents w ws ident lastEv getEvents
pure r
workerThread :: Worker -> STM Ovum -> (EventId, Mug) -> IO (Async ())
workerThread :: Serf -> STM Ovum -> (EventId, Mug) -> IO (Async ())
workerThread w getEvent (evendId, mug) = async $ forever do
ovum <- atomically $ getEvent
@ -255,7 +270,7 @@ workerThread w getEvent (evendId, mug) = async $ forever do
-- -- (jam [mug event])
-- sendAndRecv
requestSnapshot :: Worker -> IO ()
requestSnapshot :: Serf -> IO ()
requestSnapshot w = undefined
-- The flow here is that we start the worker and then we receive a play event
@ -281,8 +296,12 @@ requestSnapshot w = undefined
-- Basic Send and Receive Operations -------------------------------------------
sendAtom :: Worker -> Atom -> IO ()
sendAtom w a = hPut (sendHandle w) (unpackAtom a)
sendAtom :: Serf -> Atom -> IO ()
sendAtom w a = do
traceM "sendAtom"
hPut (sendHandle w) (unpackAtom a)
hFlush (sendHandle w)
traceM "sendAtom.return ()"
atomBytes :: Iso' Atom ByteString
atomBytes = pill . pillBS
@ -292,26 +311,44 @@ packAtom = view (from atomBytes)
unpackAtom :: Atom -> ByteString
unpackAtom = view atomBytes
recvLen :: Worker -> IO Word64
recvLen :: Serf -> IO Word64
recvLen w = do
traceM "recvLen.wait"
bs <- hGet (recvHandle w) 8
traceM "recvLen.got"
case length bs of
-- This is not big endian safe
8 -> unsafeUseAsCString bs (peek . castPtr)
_ -> throwIO WorkerConnectionClosed
_ -> throwIO SerfConnectionClosed
recvBytes :: Worker -> Word64 -> IO ByteString
recvBytes w = hGet (recvHandle w) . fromIntegral
recvBytes :: Serf -> Word64 -> IO ByteString
recvBytes w = do
traceM "recvBytes"
hGet (recvHandle w) . fromIntegral
recvAtom :: Worker -> IO Atom
recvAtom :: Serf -> IO Atom
recvAtom w = do
traceM "recvAtom"
len <- recvLen w
bs <- recvBytes w len
pure (packAtom bs)
recvPlea :: Worker -> IO Plea
cordString :: Cord -> String
cordString (Cord bs) = unpack $ decodeUtf8 bs
recvPlea :: Serf -> IO Plea
recvPlea w = do
traceM "recvPlea"
a <- recvAtom w
traceM ("recvPlea.cue " <> show (length $ a ^. atomBytes))
n <- fromJustExn (cue a) (BadPleaAtom a)
p <- fromJustExn (fromNoun n) (BadPleaNoun n)
pure p
traceM "recvPlea.doneCue"
p <- fromRightExn (fromNounErr n) (BadPleaNoun n)
traceM "recvPlea.done"
-- TODO Hack!
case p of
Stdr e msg -> traceM (cordString msg) >> recvPlea w
_ -> pure p

View File

@ -2,45 +2,78 @@ module Main where
import ClassyPrelude
import Vere.Pier.Types
import Data.Noun.Jam hiding (main)
import Data.Noun.Jam ()
import qualified Vere.Log as Log
import qualified Vere.Persist as Persist
import qualified Vere.Pier as Pier
--------------------------------------------------------------------------------
main :: IO ()
main = do
(s,l,e,m) <- Pier.resume "/home/benjamin/r/urbit/zod/"
putStrLn "Resumed!"
pure ()
--------------------------------------------------------------------------------
tryCopyLog :: IO ()
tryCopyLog = do
let logPath = "/Users/erg/src/urbit/zod/.urb/falselog/"
falselogPath = "/Users/erg/src/urbit/zod/.urb/falselog2/"
-- junk
persistQueue <- newTQueueIO
releaseQueue <- newTQueueIO
logState <- Log.init logPath persistQueue (writeTQueue releaseQueue)
----------------------------------------
--
logId <- Log.readLogIdentity logState
print logId
persistQ <- newTQueueIO
releaseQ <- newTQueueIO
log <- Log.open logPath
persist <- Persist.start log persistQ (writeTQueue releaseQ)
ident <- Log.readIdent log
--
latestEvent <- Log.latestEventNumber logState
print latestEvent
----------------------------------------
--
events <- Log.readEvents logState 1 3142
--print $ cue . snd <$> events
lastEv <- Log.latestEventNumber log
events <- Log.readEvents log 1 3142
--
persistQueue2 <- newTQueueIO
releaseQueue2 <- newTQueueIO
falseLogState <- Log.init falselogPath persistQueue2 (writeTQueue releaseQueue2)
----------------------------------------
Log.writeLogIdentity falseLogState logId
print ident
print lastEv
print (length events)
----------------------------------------
persistQ2 <- newTQueueIO
releaseQ2 <- newTQueueIO
log2 <- Log.open falselogPath
persist2 <- Persist.start log2 persistQ2 (writeTQueue releaseQ2)
----------------------------------------
Log.writeIdent log2 ident
let writs = events <&> \(id, a) ->
Writ id Nothing (Jam a) []
----------------------------------------
print "About to write"
for_ writs $ \w -> atomically $ writeTQueue persistQueue2 w
for_ writs $ \w ->
atomically (writeTQueue persistQ2 w)
----------------------------------------
print "About to wait"
replicateM_ 100 $ atomically $ readTQueue releaseQueue2
replicateM_ 100 $ do
atomically $ readTQueue releaseQ2
----------------------------------------
print "Done"

View File

@ -1048,8 +1048,8 @@ _pier_work_create(u3_pier* pir_u)
sprintf(wag_c, "%u", pir_u->wag_w);
arg_c[0] = bin_c; // executable
arg_c[1] = pax_c; // path to checkpoint directory
arg_c[2] = key_c; // disk key
arg_c[1] = pax_c; // path to checkpoint directory (might be the pier, might be $pier/chk)
arg_c[2] = key_c; // disk key (ignored)
arg_c[3] = wag_c; // runtime config
arg_c[4] = 0;

View File

@ -18,6 +18,8 @@ nix:
- SDL2_image
- zlib
ghc-options:
urbit: '-fobject-code'
# build:
# library-profiling: true