king: Handle replacement events correctly (dont try to parse them).

This commit is contained in:
~siprel 2020-05-27 00:08:07 +00:00
parent 26bd5a9e4b
commit 1f64a528cd
2 changed files with 59 additions and 99 deletions

View File

@ -26,10 +26,12 @@ import Urbit.Arvo
import Urbit.King.Config
import Urbit.Vere.Pier.Types
import Control.Monad.Trans.Maybe
import Data.Conduit
import Data.Text (append)
import System.Posix.Files (ownerModes, setFileMode)
import Urbit.King.App (HasConfigDir(..), HasStderrLogFunc(..))
import Urbit.Time (Wen)
import Urbit.Vere.Ames (ames)
import Urbit.Vere.Behn (behn)
import Urbit.Vere.Clay (clay)
@ -37,7 +39,6 @@ import Urbit.Vere.Http.Client (client)
import Urbit.Vere.Http.Server (serv)
import Urbit.Vere.Log (EventLog)
import Urbit.Vere.Serf (Serf, SerfState(..))
import Data.Conduit
import qualified System.Entropy as Ent
import qualified Urbit.King.API as King
@ -127,21 +128,6 @@ bootSeqJobs now (BootSeq ident nocks ovums) = zipWith ($) bootSeqFns [1 ..]
muckNock nok eId = RunNok $ LifeCyc eId 0 nok
muckOvum ov eId = DoWork $ Work eId 0 (wen eId) ov
{-
loop :: [Job] -> SerfState -> Maybe (ProgressBar ()) -> [BootSeqFn]
-> RIO e ([Job], SerfState)
loop acc ss pb = \case
[] -> do
pb <- logStderr (updateProgressBar 0 bootMsg pb)
pure (reverse acc, ss)
x:xs -> do
wen <- io Time.now
job <- pure $ x (ssNextEv ss) (ssLastMug ss) wen
pb <- logStderr (updateProgressBar (1 + length xs) bootMsg pb)
(job, ss) <- bootJob serf job
loop (job:acc) ss pb xs
-}
bootNewShip
:: (HasPierConfig e, HasStderrLogFunc e, HasLogFunc e)
=> Pill
@ -223,6 +209,9 @@ getSnapshot top last = do
acquireWorker :: RIO e () -> RAcquire e (Async ())
acquireWorker act = mkRAcquire (async act) cancel
acquireWorkerBound :: RIO e () -> RAcquire e (Async ())
acquireWorkerBound act = mkRAcquire (asyncBound act) cancel
pier :: e. (HasConfigDir e, HasLogFunc e, HasNetworkConfig e, HasPierConfig e)
=> (Serf, EventLog)
-> TVar (Text -> IO ())
@ -235,7 +224,7 @@ pier (serf, log) vStderr mStart = do
saveM <- newEmptyTMVarIO
shutdownM <- newEmptyTMVarIO
kapi King.kingAPI
kapi <- King.kingAPI
termApiQ <- atomically $ do
q <- newTQueue
@ -317,15 +306,15 @@ pier (serf, log) vStderr mStart = do
death :: Text -> Async () -> STM (Either (Text, SomeException) Text)
death tag tid = do
waitCatchSTM tid <&> \case
Left exn -> Left (tag, exn)
Right () -> Right tag
Left exn -> Left (tag, exn)
Right () -> Right tag
saveSignalThread :: TMVar () -> RAcquire e (Async ())
saveSignalThread tm = mkRAcquire start cancel
where
start = async $ forever $ do
threadDelay (120 * 1000000) -- 120 seconds
atomically $ putTMVar tm ()
where
start = async $ forever $ do
threadDelay (120 * 1000000) -- 120 seconds
atomically $ putTMVar tm ()
-- Start All Drivers -----------------------------------------------------------
@ -427,10 +416,10 @@ runCompute
-> STM ()
-> (Maybe Text -> STM ())
-> STM ()
-> ((Job, FX) -> STM ())
-> ((Fact, FX) -> STM ())
-> RAcquire e (Async ())
runCompute serf getEvent getSaveSignal getShutdownSignal showSpinner hideSpinner putResult = do
mkRAcquire (async $ newRunCompute serf config) cancel
acquireWorker (newRunCompute serf config)
where
config = ComputeConfig
{ ccOnWork = getEvent
@ -441,17 +430,6 @@ runCompute serf getEvent getSaveSignal getShutdownSignal showSpinner hideSpinner
, ccHideSpinner = hideSpinner
}
-- data RunOutput = RunOutput EventId Mug Wen (Either Noun Ev) [Ef]
-- data Work = Work EventId Mug Wen Ev
{-
data ComputeRequest
= CREvent Ev (Serf.RunError -> IO ())
| CRSave ()
| CRShutdown ()
deriving (Eq, Show)
-}
{-
TODO Pack and Peek
-}
@ -486,11 +464,18 @@ fromRightErr :: Either a b -> IO b
fromRightErr (Left l) = error "unexpected Left value"
fromRightErr (Right r) = pure r
data Fact = Fact
{ factEve :: EventId
, factMug :: Mug
, factWen :: Wen
, factNon :: Noun
}
data ComputeConfig = ComputeConfig
{ ccOnWork :: STM (Ev, Serf.RunError -> IO ())
, ccOnKill :: STM ()
, ccOnSave :: STM ()
, ccPutResult :: (Job, FX) -> STM ()
, ccPutResult :: (Fact, FX) -> STM ()
, ccShowSpinner :: Maybe Text -> STM ()
, ccHideSpinner :: STM ()
}
@ -509,9 +494,7 @@ newRunCompute serf ComputeConfig {..} = do
Nothing -> pure ()
Just (Serf.RunOutput e m w nounEv fx) -> do
lift $ logTrace "newRunCompute: Got play result"
ev <- io $ fromRightErr nounEv -- TODO
let job :: Job = DoWork $ Work e m w ev
atomically (ccPutResult ((job, GoodParse <$> fx))) -- TODO GoodParse
atomically $ ccPutResult (Fact e m w nounEv, GoodParse <$> fx) -- TODO GoodParse
sendResults
onStatusChange :: Maybe Serf.RunInput -> STM ()
@ -521,24 +504,6 @@ newRunCompute serf ComputeConfig {..} = do
_ -> pure ()
{-
FIND ME
send event
push event
start spinner
hook for when event starts running
hook for when no event is running
send another event
first event is done
push to persistQ
update spinner to event #2
second event is done
push to executeQ
remove spinner
-}
-- Persist Thread --------------------------------------------------------------
data PersistExn = BadEventId EventId EventId
@ -550,43 +515,38 @@ instance Exception PersistExn where
, "\tExpected " <> show expected <> " but got " <> show got
]
runPersist :: e. (HasPierConfig e, HasLogFunc e)
=> EventLog
-> TQueue (Job, FX)
-> (FX -> STM ())
-> RAcquire e (Async ())
runPersist log inpQ out =
mkRAcquire runThread cancel
where
runThread :: RIO e (Async ())
runThread = asyncBound $ do
dryRun <- view dryRunL
forever $ do
writs <- atomically getBatchFromQueue
unless dryRun $ do
events <- validateJobsAndGetBytes (toNullable writs)
Log.appendEvents log events
atomically $ for_ writs $ \(_,fx) -> out fx
runPersist
:: forall e
. (HasPierConfig e, HasLogFunc e)
=> EventLog
-> TQueue (Fact, FX)
-> (FX -> STM ())
-> RAcquire e (Async ())
runPersist log inpQ out = mkRAcquire runThread cancel
where
runThread :: RIO e (Async ())
runThread = asyncBound $ do
dryRun <- view dryRunL
forever $ do
writs <- atomically getBatchFromQueue
events <- validateFactsAndGetBytes (fst <$> toNullable writs)
unless dryRun (Log.appendEvents log events)
atomically $ for_ writs $ \(_, fx) -> do
out fx
validateJobsAndGetBytes :: [(Job, FX)] -> RIO e (Vector ByteString)
validateJobsAndGetBytes writs = do
expect <- Log.nextEv log
fmap fromList
$ for (zip [expect..] writs)
$ \(expectedId, (j, fx)) -> do
unless (expectedId == jobId j) $
throwIO (BadEventId expectedId (jobId j))
case j of
RunNok _ ->
error "This shouldn't happen here!"
DoWork (Work eId mug wen ev) ->
pure $ jamBS $ toNoun (mug, wen, ev)
validateFactsAndGetBytes :: [Fact] -> RIO e (Vector ByteString)
validateFactsAndGetBytes facts = do
expect <- Log.nextEv log
lis <- for (zip [expect ..] facts) $ \(expectedId, Fact eve mug wen non) ->
do
unless (expectedId == eve) $ do
throwIO (BadEventId expectedId eve)
pure $ jamBS $ toNoun (mug, wen, non)
pure (fromList lis)
getBatchFromQueue :: STM (NonNull [(Job, FX)])
getBatchFromQueue =
readTQueue inpQ >>= go . singleton
where
go acc =
tryReadTQueue inpQ >>= \case
Nothing -> pure (reverse acc)
Just item -> go (item <| acc)
getBatchFromQueue :: STM (NonNull [(Fact, FX)])
getBatchFromQueue = readTQueue inpQ >>= go . singleton
where
go acc = tryReadTQueue inpQ >>= \case
Nothing -> pure (reverse acc)
Just item -> go (item <| acc)

View File

@ -167,7 +167,7 @@ data RunInput
| RunPeek Wen Gang Path (Maybe (Term, Noun) -> IO ())
| RunWork Ev (RunError -> IO ())
data RunOutput = RunOutput EventId Mug Wen (Either Noun Ev) [Ef]
data RunOutput = RunOutput EventId Mug Wen Noun [Ef]
-- Exceptions ------------------------------------------------------------------
@ -432,11 +432,11 @@ running serf notice = do
io (sendWrit serf (WWork wen evn))
io (recvWork serf) >>= \case
WDone eid hash fx -> do
yield (RunOutput eid hash wen (Right evn) fx)
yield (RunOutput eid hash wen (toNoun evn) fx)
loop hash eid
WSwap eid hash (wen, noun) fx -> do
io $ err (RunSwap eid hash wen noun fx)
yield (RunOutput eid hash wen (Left noun) fx)
yield (RunOutput eid hash wen noun fx)
loop hash eid
WBail goofs -> do
io $ err (RunBail goofs)