Minor cleanup

This commit is contained in:
Benjamin Summers 2019-07-15 17:05:42 -07:00
parent 9872ea6e92
commit 430b180f0c

View File

@ -146,41 +146,13 @@ fromRightExn (Right x) _ = pure x
--------------------------------------------------------------------------------
sendAndRecv :: Serf -> EventId -> Atom -> IO SerfResp
sendAndRecv w eventId event =
sendAndRecv :: Serf -> EventId -> Order -> IO SerfResp
sendAndRecv w eventId order =
do
traceM ("sendAndRecv: " <> show eventId)
sendOrder w (OWork eventId event)
res <- loop
traceM ("sendAndRecv.done " <> show res)
pure res
where
produce :: WorkResult -> IO SerfResp
produce (i, m, o) = do
guardExn (i == eventId) (BadComputeId eventId (i, m, o))
pure $ Right (i, m, o)
replace :: ReplacementEv -> IO SerfResp
replace (i, m, j) = do
guardExn (i == eventId) (BadReplacementId eventId (i, m, j))
pure (Left (i, m, j))
loop :: IO SerfResp
loop = recvPlea w >>= \case
Play p -> throwIO (UnexpectedPlay eventId p)
Done i m o -> produce (i, m, o)
Work i m j -> replace (i, m, j)
Stdr _ cord -> putStrLn (pack ("[SERF] " <> cordString cord)) >> loop
Slog _ pri t -> printTank pri t >> loop
sendAndRecvOrder :: Serf -> EventId -> Order -> IO SerfResp
sendAndRecvOrder w eventId order =
do
traceM ("sendAndRecvOrder: " <> show eventId)
sendOrder w order
res <- loop
traceM ("sendAndRecvOrder.done " <> show res)
traceM ("sendAndRecv.done " <> show res)
pure res
where
produce :: WorkResult -> IO SerfResp
@ -211,29 +183,42 @@ muckBootSeq (BootSeq _ nocks ovums) =
muckNock nok eId mug _ = OWork eId $ jam $ toNoun (mug, nok)
muckOvum ov eId mug wen = OWork eId $ jam $ toNoun (mug, wen, ov)
bootFromSeq :: Serf -> LogIdentity -> [EventId -> Mug -> Time.Wen -> Order]
-> IO [Order]
bootFromSeq w ident seq = do
ws@(eventId, mug) <- recvPlea w >>= \case
{-
Waits for initial plea, and then sends boot IPC if necessary.
-}
handshake :: Serf -> LogIdentity -> IO (EventId, Mug)
handshake serf ident = do
(eventId, mug) <- recvPlea serf >>= \case
Play Nothing -> pure (1, Mug 0)
Play (Just (e, m, _)) -> error "ship already booted"
Play (Just (e, m, _)) -> pure (e, m)
x -> throwIO (InvalidInitialPlea x)
traceM ("got plea! " <> show eventId <> " " <> show mug)
traceM ("handshake: got plea! " <> show eventId <> " " <> show mug)
when (eventId == 1) $ do
sendOrder serf (OBoot ident)
traceM ("handshake: Sent %boot IPC")
pure (eventId, mug)
bootFromSeq :: Serf -> LogIdentity -> [EventId -> Mug -> Time.Wen -> Order]
-> IO [Order]
bootFromSeq serf ident seq = do
handshake serf ident >>= \case
(1, Mug 0) -> pure ()
_ -> error "ship already booted"
sendOrder w (OBoot ident)
loop [] 1 (Mug 0) seq
where
loop acc eId lastMug [] = pure $ reverse acc
loop acc eId lastMug (x:xs) = do
wen <- Time.now
let order = x eId lastMug wen
sendAndRecvOrder w eId order >>= \case
Left badEv -> throwIO (ReplacedEventDuringBoot eId badEv)
Right (id, newMug, f:fs) -> throwIO (EffectsDuringBoot eId (f:fs))
Right (id, newMug, []) -> do
loop (order : acc) (eId+1) newMug xs
wen <- Time.now
let order = x eId lastMug wen
sendAndRecv serf eId order >>= \case
Left badEv -> throwIO (ReplacedEventDuringBoot eId badEv)
Right (id, mug, []) -> loop (order : acc) (eId+1) mug xs
Right (id, mug, fx) -> throwIO (EffectsDuringBoot eId fx)
-- the ship is booted, but it is behind. shove events to the worker until it is
-- caught up.
@ -268,7 +253,7 @@ replayEvents w (wid, wmug) ident lastCommitedId getEvents = do
traceM ("got events " <> show (length events))
for_ events $ \(eventId, event) -> do
sendAndRecv w eventId event >>= \case
sendAndRecv w eventId (OWork eventId event) >>= \case
Left ev -> throwIO (ReplacedEventDuringReplay eventId ev)
Right (id, mug, _) -> writeIORef vLast (id, mug)