king: Event prioritization and error handling for born events.

This commit is contained in:
~siprel 2020-06-10 19:22:45 +00:00
parent cc772da03c
commit c57c3023f9
13 changed files with 284 additions and 205 deletions

View File

@ -20,6 +20,8 @@ Bugs:
- [x] `king new` should reject pier directories that already exist.
- [x] In non-daemon-mode, ^D doesn't bring down Urbit properly.
- [ ] Spinner updated multiple times with the same event, and this causes
logging of events to contain duplicates.
King-Haskell specific features:
@ -32,6 +34,10 @@ Performance:
- [x] Batching during replay.
- [x] Batching during normal operation.
Optimization:
- [x] IO Driver Event Prioritization
Polish:
- [x] Cleanup batching flow.
@ -67,55 +73,14 @@ Polish:
- [ ] Spin off per-pier logic into it's own package.
- Probably `urbit-pier`
# Event Prioritization
- Instead of each IO driver being passed a TQueue EvErr, each IO driver
produces a (STM (Maybe RunReq)).
- Each driver has it's own event queue that feeds this action.
- Pier has a thread that pulls from these actions with prioritization.
- Priority:
- If any terminal events are available, send it.
- If serf queue is full, abort transaction and retry.
- If no terminal events are available, do the same thing with sync driver.
- Next, same thing for behn.
- Next, same thing for iris.
- Next, same thing for ames.
- Next, same thing for eyre.
# Better IO Driver Startup Flow Separation
Should have a io-driver-boot stage.
- IO drivers do their boot flows.
- When they're done, they signal that they're running.
- No semantically important communication without outside world can
happen until all drivers are up.
Current IO Driver interface is something like:
```
behn :: KingId -> (EvErr -> STM ()) -> ([EvErr], Acquire (BehnEf -> IO ()))
```
New Interface should be something like:
```
data DriverApi = DriverApi
{ eventQueue :: STM (Maybe RunReq)
, effectSink :: Effect -> STM ()
, blockUntilBorn :: STM ()
}
behn :: HasPierEnv e => RAcquire e DriverApi
```
where `PierEnv` contains `blockUntilAllDriversBorn :: STM ()`.
# Finding the Serf Executable
Right now, `urbit-worker` is found by looking it up in the PATH. This
is wrong, but what is right?
# Further IO Driver Startup Flow Betterment
- Implement Pier-wide process start events
- [ ] Entropy injection.
- [ ] Verbose flag.
- [ ] CLI event injection.

View File

@ -82,22 +82,6 @@ data SyncEf
deriveNoun ''SyncEf
-- UDP Effects -----------------------------------------------------------------
{-|
%init -- "I don't think that's something that can happen"
%west -- "Those also shouldn't happen"
%woot -- "Those also shouldn't happen"
-}
data AmesEf
= AmesEfInit Path ()
| AmesEfWest Path Ship Path Noun
| AmesEfWoot Path Ship (Maybe (Maybe (Term, [Tank])))
deriving (Eq, Ord, Show)
deriveNoun ''AmesEf
-- Timer Effects ---------------------------------------------------------------
{-|
@ -171,7 +155,6 @@ data VaneEf
| VEHttpClient HttpClientEf
| VEHttpServer HttpServerEf
| VEBehn BehnEf
| VEAmes AmesEf
| VETerm TermEf
| VEClay SyncEf
| VESync SyncEf

View File

@ -350,6 +350,7 @@ instance FromNoun Ev where
ReOrg "vane" s t p v -> fmap EvVane $ parseNoun $ toNoun (s,t,p,v)
ReOrg _ _ _ _ _ -> fail "First path-elem must be ?($ %vane)"
-- Short Event Names -----------------------------------------------------------
{-

View File

@ -2,7 +2,7 @@
Ames IO Driver
-}
module Urbit.Vere.Ames (ames) where
module Urbit.Vere.Ames (ames, ames') where
import Urbit.Prelude
@ -11,7 +11,7 @@ import Urbit.Arvo hiding (Fake)
import Urbit.King.Config
import Urbit.Vere.Pier.Types
import Urbit.King.App (HasKingId(..))
import Urbit.King.App (HasKingId(..), HasPierEnv(..))
import Urbit.Vere.Ames.DNS (NetworkMode(..), ResolvServ(..))
import Urbit.Vere.Ames.DNS (galaxyPort, resolvServ)
import Urbit.Vere.Ames.UDP (UdpServ(..), fakeUdpServ, realUdpServ)
@ -31,7 +31,7 @@ data AmesDrv = AmesDrv
listenPort :: NetworkMode -> Ship -> PortNumber
listenPort m s | s < 256 = galaxyPort m (fromIntegral s)
listenPort m _ = 0
listenPort m _ = 0 -- I don't care, just give me any port.
localhost :: HostAddress
localhost = tupleToHostAddress (127, 0, 0, 1)
@ -95,10 +95,29 @@ udpServ isFake who = do
Nothing -> fakeUdpServ
Just host -> realUdpServ port host
bornFailed :: e -> WorkError -> IO ()
bornFailed env _ = runRIO env $ do
_bornFailed :: e -> WorkError -> IO ()
_bornFailed env _ = runRIO env $ do
pure () -- TODO What can we do?
ames'
:: HasPierEnv e
=> Ship
-> Bool
-> (Text -> RIO e ())
-> RIO e ([Ev], RAcquire e (DriverApi NewtEf))
ames' who isFake stderr = do
ventQ :: TQueue EvErr <- newTQueueIO
env <- ask
let (bornEvs, startDriver) = ames env who isFake (writeTQueue ventQ) stderr
let runDriver = do
diOnEffect <- startDriver
let diEventSource = fmap RRWork <$> tryReadTQueue ventQ
pure (DriverApi {..})
pure (bornEvs, runDriver)
{-|
inst -- Process instance number.
who -- Which ship are we?
@ -118,13 +137,13 @@ ames
-> Bool
-> (EvErr -> STM ())
-> (Text -> RIO e ())
-> ([EvErr], RAcquire e (NewtEf -> IO ()))
-> ([Ev], RAcquire e (NewtEf -> IO ()))
ames env who isFake enqueueEv stderr = (initialEvents, runAmes)
where
king = fromIntegral (env ^. kingIdL)
initialEvents :: [EvErr]
initialEvents = [EvErr (bornEv king) (bornFailed env)]
initialEvents :: [Ev]
initialEvents = [bornEv king]
runAmes :: RAcquire e (NewtEf -> IO ())
runAmes = do

View File

@ -18,25 +18,15 @@ import qualified Urbit.Timer as Timer
-- Behn Stuff ------------------------------------------------------------------
behn' :: HasPierEnv e => RAcquire e DriverApi
behn' :: HasPierEnv e => RIO e ([Ev], RAcquire e (DriverApi BehnEf))
behn' = do
ventQ <- newTQueueIO
bornM <- newEmptyTMVarIO
fectM <- newEmptyTMVarIO
env <- ask
let (bootEvs, start) = behn env (writeTQueue ventQ)
for_ bootEvs (atomically . writeTQueue ventQ)
diOnEffect <- liftAcquire start
pure ([bornEv (fromIntegral (env ^. kingIdL))], runDriver env)
where
runDriver env = do
ventQ :: TQueue EvErr <- newTQueueIO
diOnEffect <- liftAcquire (behn env (writeTQueue ventQ))
let diEventSource = fmap RRWork <$> tryReadTQueue ventQ
let diBlockUntilBorn = readTMVar bornM
-- TODO Do this after successful born event.
atomically $ putTMVar bornM ()
pure (DriverApi {..})
bornEv :: KingId -> Ev
@ -47,10 +37,6 @@ wakeEv = EvBlip $ BlipEvBehn $ BehnEvWake () ()
sysTime = view Time.systemTime
bornFailed :: e -> WorkError -> IO ()
bornFailed env _ = runRIO env $ do
pure () -- TODO Ship is fucked. Kill it?
wakeErr :: WorkError -> IO ()
wakeErr _ = pure ()
@ -58,14 +44,11 @@ behn
:: HasKingId e
=> e
-> (EvErr -> STM ())
-> ([EvErr], Acquire (BehnEf -> IO ()))
behn env enqueueEv =
(initialEvents, runBehn)
-> Acquire (BehnEf -> IO ())
behn env enqueueEv = runBehn
where
king = fromIntegral (env ^. kingIdL)
initialEvents = [EvErr (bornEv king) (bornFailed env)]
runBehn :: Acquire (BehnEf -> IO ())
runBehn = do
tim <- mkAcquire Timer.init Timer.stop

View File

@ -2,11 +2,14 @@
UNIX Filesystem Driver
-}
module Urbit.Vere.Clay (clay) where
module Urbit.Vere.Clay
( clay
, clay'
)
where
import Urbit.Arvo hiding (Term)
import Urbit.King.App (HasKingId(..))
import Urbit.King.Config
import Urbit.King.App
import Urbit.Prelude
import Urbit.Vere.Pier.Types
@ -113,16 +116,32 @@ buildActionListFromDifferences fp snapshot = do
--------------------------------------------------------------------------------
boatFailed :: e -> WorkError -> IO ()
boatFailed env _ = runRIO env $ do
_boatFailed :: e -> WorkError -> IO ()
_boatFailed env _ = runRIO env $ do
pure () -- TODO What can we do?
clay'
:: HasPierEnv e
=> RIO e ([Ev], RAcquire e (DriverApi SyncEf))
clay' = do
ventQ :: TQueue EvErr <- newTQueueIO
env <- ask
let (bornEvs, startDriver) = clay env (writeTQueue ventQ)
let runDriver = do
diOnEffect <- startDriver
let diEventSource = fmap RRWork <$> tryReadTQueue ventQ
pure (DriverApi {..})
pure (bornEvs, runDriver)
clay
:: forall e
. (HasPierConfig e, HasLogFunc e, HasKingId e)
=> e
-> (EvErr -> STM ())
-> ([EvErr], RAcquire e (SyncEf -> IO ()))
-> ([Ev], RAcquire e (SyncEf -> IO ()))
clay env plan =
(initialEvents, runSync)
where
@ -132,7 +151,7 @@ clay env plan =
-- TODO: In the case of -A, we need to read all the data from the
-- specified directory and shove it into an %into event.
initialEvents = [EvErr boatEv (boatFailed env)]
initialEvents = [boatEv]
runSync :: RAcquire e (SyncEf -> IO ())
runSync = handleEffect <$> mkRAcquire start stop

View File

@ -4,13 +4,14 @@
module Urbit.Vere.Eyre
( eyre
, eyre'
)
where
import Urbit.Prelude hiding (Builder)
import Urbit.Arvo hiding (ServerId, reqUrl, secure)
import Urbit.King.App (HasKingId(..))
import Urbit.King.App (HasKingId(..), HasPierEnv(..))
import Urbit.King.Config
import Urbit.Vere.Eyre.Multi
import Urbit.Vere.Eyre.PortsFile
@ -275,25 +276,56 @@ startServ multi who isFake conf plan = do
-- Eyre Driver -----------------------------------------------------------------
bornFailed :: e -> WorkError -> IO ()
bornFailed env _ = runRIO env $ do
_bornFailed :: e -> WorkError -> IO ()
_bornFailed env _ = runRIO env $ do
pure () -- TODO What should this do?
eyre'
:: HasPierEnv e
=> MultiEyreApi
-> Ship
-> Bool
-> RIO e ([Ev], RAcquire e (DriverApi HttpServerEf))
eyre' multi who isFake = do
ventQ :: TQueue EvErr <- newTQueueIO
env <- ask
let (bornEvs, startDriver) = eyre env multi who (writeTQueue ventQ) isFake
let runDriver = do
diOnEffect <- startDriver
let diEventSource = fmap RRWork <$> tryReadTQueue ventQ
pure (DriverApi {..})
pure (bornEvs, runDriver)
{-|
Eyre -- HTTP Server Driver
Inject born events.
Until born events succeeds, ignore effects.
Wait until born event callbacks invoked.
If success, signal success.
If failure, try again several times.
If still failure, bring down ship.
Once born event succeeds:
- Begin normal operation (start accepting requests)
-}
eyre
:: forall e
. (HasShipEnv e, HasKingId e)
. (HasPierEnv e)
=> e
-> MultiEyreApi
-> Ship
-> (EvErr -> STM ())
-> Bool
-> ([EvErr], RAcquire e (HttpServerEf -> IO ()))
-> ([Ev], RAcquire e (HttpServerEf -> IO ()))
eyre env multi who plan isFake = (initialEvents, runHttpServer)
where
king = fromIntegral (env ^. kingIdL)
initialEvents :: [EvErr]
initialEvents = [EvErr (bornEv king) (bornFailed env)]
initialEvents :: [Ev]
initialEvents = [bornEv king]
runHttpServer :: RAcquire e (HttpServerEf -> IO ())
runHttpServer = handleEf <$> mkRAcquire

View File

@ -11,11 +11,11 @@ import Urbit.Prelude hiding (Builder)
import Urbit.Vere.Http
import Urbit.Vere.Pier.Types
import Urbit.King.App
import Urbit.Arvo (BlipEv(..), Ev(..), HttpClientEf(..), HttpClientEv(..),
HttpClientReq(..), HttpEvent(..), KingId, ResponseHeader(..))
import Urbit.King.App (HasKingId(..))
import qualified Data.Map as M
import qualified Network.HTTP.Client as H
@ -57,22 +57,52 @@ bornEv king =
--------------------------------------------------------------------------------
bornFailed :: e -> WorkError -> IO ()
bornFailed env _ = runRIO env $ do
_bornFailed :: e -> WorkError -> IO ()
_bornFailed env _ = runRIO env $ do
pure () -- TODO What to do in this case?
client'
:: HasPierEnv e
=> RIO e ([Ev], RAcquire e (DriverApi HttpClientEf))
client' = do
ventQ :: TQueue EvErr <- newTQueueIO
env <- ask
let (bornEvs, startDriver) = client env (writeTQueue ventQ)
let runDriver = do
diOnEffect <- startDriver
let diEventSource = fmap RRWork <$> tryReadTQueue ventQ
pure (DriverApi {..})
pure (bornEvs, runDriver)
{-|
Iris -- HTTP Client Driver
Until born events succeeds, ignore effects.
Wait until born event callbacks invoked.
If success, signal success.
If failure, try again several times.
If still failure, bring down ship.
Once born event succeeds, hold on to effects.
Once all other drivers have booted:
- Execute stashed effects.
- Begin normal operation (start accepting requests)
-}
client
:: forall e
. (HasLogFunc e, HasKingId e)
=> e
-> (EvErr -> STM ())
-> ([EvErr], RAcquire e (HttpClientEf -> IO ()))
-> ([Ev], RAcquire e (HttpClientEf -> IO ()))
client env plan = (initialEvents, runHttpClient)
where
kingId = view (kingIdL . to fromIntegral) env
initialEvents :: [EvErr]
initialEvents = [EvErr (bornEv kingId) (bornFailed env)]
initialEvents :: [Ev]
initialEvents = [bornEv kingId]
runHttpClient :: RAcquire e (HttpClientEf -> IO ())
runHttpClient = handleEffect <$> mkRAcquire start stop

View File

@ -24,24 +24,25 @@ import Urbit.Arvo
import Urbit.King.Config
import Urbit.Vere.Pier.Types
import Control.Monad.STM (retry)
import System.Posix.Files (ownerModes, setFileMode)
import Urbit.EventLog.LMDB (EventLog)
import Urbit.King.API (TermConn)
import Urbit.King.App (HasKingEnv, HasPierEnv(..), PierEnv)
import Urbit.King.App (onKillPierSigL)
import Urbit.Noun.Time (Wen)
import Urbit.Vere.Ames (ames)
import Urbit.Vere.Behn (behn)
import Urbit.Vere.Clay (clay)
import Urbit.Vere.Eyre (eyre)
import Urbit.Vere.Behn (behn')
import Urbit.Vere.Eyre.Multi (MultiEyreApi)
import Urbit.Vere.Http.Client (client)
import Urbit.Vere.Serf (Serf)
import qualified System.Entropy as Ent
import qualified Urbit.EventLog.LMDB as Log
import qualified Urbit.King.API as King
import qualified Urbit.Noun.Time as Time
import qualified Urbit.Vere.Ames as Ames
import qualified Urbit.Vere.Clay as Clay
import qualified Urbit.Vere.Eyre as Eyre
import qualified Urbit.Vere.Http.Client as Iris
import qualified Urbit.Vere.Serf as Serf
import qualified Urbit.Vere.Term as Term
import qualified Urbit.Vere.Term.API as Term
@ -88,8 +89,9 @@ genBootSeq ship Pill {..} lite boot = io $ do
_ -> False
-- Write a batch of jobs into the event log ------------------------------------
-- Write to the log. -----------------------------------------------------------
-- | Write a batch of jobs to the event log.
writeJobs :: EventLog -> Vector Job -> RIO e ()
writeJobs log !jobs = do
expect <- atomically (Log.nextEv log)
@ -110,7 +112,7 @@ writeJobs log !jobs = do
-- Acquire a running serf. -----------------------------------------------------
printTank :: (Text -> IO ()) -> Atom -> Tank -> IO ()
printTank f _ = io . f . unlines . fmap unTape . wash (WashCfg 0 80)
printTank f _priority = f . unlines . fmap unTape . wash (WashCfg 0 80)
runSerf
:: HasLogFunc e
@ -122,7 +124,7 @@ runSerf vSlog pax fax = do
env <- ask
Serf.withSerf (config env)
where
slog txt = join $ atomically (readTVar vSlog >>= pure . ($ txt))
slog txt = atomically (readTVar vSlog) >>= (\f -> f txt)
config env = Serf.Config
{ scSerf = "urbit-worker" -- TODO Find the executable in some proper way.
, scPier = pax
@ -154,10 +156,10 @@ bootSeqJobs now (BootSeq ident nocks ovums) = zipWith ($) bootSeqFns [1 ..]
wen off = Time.addGap now ((fromIntegral off - 1) ^. from Time.microSecs)
bootSeqFns :: [EventId -> Job]
bootSeqFns = fmap muckNock nocks <> fmap muckOvum ovums
bootSeqFns = fmap nockJob nocks <> fmap ovumJob ovums
where
muckNock nok eId = RunNok $ LifeCyc eId 0 nok
muckOvum ov eId = DoWork $ Work eId 0 (wen eId) ov
nockJob nok eId = RunNok $ LifeCyc eId 0 nok
ovumJob ov eId = DoWork $ Work eId 0 (wen eId) ov
bootNewShip
:: HasPierEnv e
@ -173,10 +175,12 @@ bootNewShip pill lite flags ship bootEv = do
pierPath <- view pierPathL
liftRIO (setupPierDirectory pierPath)
rio (setupPierDirectory pierPath)
logDebug "Directory setup."
rwith (Log.new (pierPath <> "/.urb/log") ident) $ \log -> do
let logPath = (pierPath </> ".urb/log")
rwith (Log.new logPath ident) $ \log -> do
logDebug "Event log initialized."
jobs <- (\now -> bootSeqJobs now seq) <$> io Time.now
writeJobs log (fromList jobs)
@ -198,10 +202,11 @@ resumed vSlog replayUntil flags = do
ev <- MaybeT (pure replayUntil)
MaybeT (getSnapshot top ev)
rio $ logTrace $ display @Text ("pier: " <> pack top)
rio $ logTrace $ display @Text ("running serf in: " <> pack tap)
rio $ do
logTrace $ display @Text ("pier: " <> pack top)
logTrace $ display @Text ("running serf in: " <> pack tap)
log <- Log.existing (top <> "/.urb/log")
log <- Log.existing (top </> ".urb/log")
serf <- runSerf vSlog tap flags
rio $ do
@ -217,6 +222,7 @@ resumed vSlog replayUntil flags = do
pure (serf, log)
-- | Get a fake pier directory for partial snapshots.
getSnapshot :: forall e . FilePath -> Word64 -> RIO e (Maybe FilePath)
getSnapshot top last = do
lastSnapshot <- lastMay <$> listReplays
@ -261,7 +267,10 @@ pier (serf, log) vSlog startedSig multi = do
let logId = Log.identity log :: LogIdentity
let ship = who logId :: Ship
computeQ :: TQueue Serf.EvErr <- newTQueueIO
-- TODO Instead of using a TMVar, pull directly from the IO driver
-- event sources.
computeQ :: TMVar RunReq <- newEmptyTMVarIO
persistQ :: TQueue (Fact, FX) <- newTQueueIO
executeQ :: TQueue FX <- newTQueueIO
saveSig :: TMVar () <- newEmptyTMVarIO
@ -294,7 +303,7 @@ pier (serf, log) vSlog startedSig multi = do
-- Serf doesn't have the appended \r\n because those \r\n s are added in
-- the c serf code. Logging output from our haskell process must manually
-- add them.
let compute = writeTQueue computeQ
let compute = putTMVar computeQ
let execute = writeTQueue executeQ
let persist = writeTQueue persistQ
@ -303,15 +312,12 @@ pier (serf, log) vSlog startedSig multi = do
let err = atomically . Term.trace muxed . (<> "\r\n")
let siz = Term.TSize { tsWide = 80, tsTall = 24 }
let fak = isFake logId
pure $ drivers env multi ship fak compute (siz, muxed) err
-- Fill event queue with initial events.
io $ atomically $ for_ bootEvents compute
drivers env multi ship fak compute (siz, muxed) err
scrySig <- newEmptyTMVarIO
onKill <- view onKillPierSigL
let computeConfig = ComputeConfig { ccOnWork = readTQueue computeQ
let computeConfig = ComputeConfig { ccOnWork = takeTMVar computeQ
, ccOnKill = onKill
, ccOnSave = takeTMVar saveSig
, ccOnScry = takeTMVar scrySig
@ -321,10 +327,28 @@ pier (serf, log) vSlog startedSig multi = do
, ccLastEvInLog = Log.lastEv log
}
tSerf <- acquireWorker "Serf" (runCompute serf computeConfig)
-- Run all born events and retry them until they succeed.
rio $ for_ bootEvents $ \ev -> do
okaySig <- newEmptyMVar
let inject n = atomically $ compute $ RRWork $ EvErr ev $ cb n
-- TODO Make sure this dies cleanly.
cb :: Int -> WorkError -> IO ()
cb n | n >= 3 = error ("boot event failed: " <> show ev)
cb n = \case
RunOkay _ -> putMVar okaySig ()
RunSwap _ _ _ _ _ -> putMVar okaySig ()
RunBail _ -> inject (n + 1)
logTrace ("Boot Event" <> displayShow ev)
io (inject 0)
drivz <- startDrivers
tExec <- acquireWorker "Effects" (router (readTQueue executeQ) drivz)
tDisk <- acquireWorkerBound "Persist" (runPersist log persistQ execute)
tSerf <- acquireWorker "Serf" (runCompute serf computeConfig)
let snapshotEverySecs = 120
@ -333,8 +357,9 @@ pier (serf, log) vSlog startedSig multi = do
void $ atomically $ tryPutTMVar saveSig ()
-- TODO bullshit scry tester
void $ acquireWorker "bullshit scry tester" $ forever $ do
void $ acquireWorker "bullshit scry tester" $ do
env <- ask
forever $ do
threadDelay 15_000_000
wen <- io Time.now
let kal = \mTermNoun -> runRIO env $ do
@ -354,8 +379,9 @@ pier (serf, log) vSlog startedSig multi = do
]
atomically ded >>= \case
Left (txt, exn) -> logError $ displayShow ("Somthing died", txt, exn)
Right tag -> logError $ displayShow ("Something simply exited", tag)
Left (tag, exn) -> logError $ displayShow (tag, "crashed", exn)
Right "compute thread" -> pure ()
Right tag -> logError $ displayShow (tag, "exited unexpectly")
atomically $ (Term.spin muxed) (Just "shutdown")
@ -368,9 +394,8 @@ death tag tid = do
-- Start All Drivers -----------------------------------------------------------
data Drivers e = Drivers
{ dAmes :: AmesEf -> IO ()
, dBehn :: BehnEf -> IO ()
data Drivers = Drivers
{ dBehn :: BehnEf -> IO ()
, dIris :: HttpClientEf -> IO ()
, dEyre :: HttpServerEf -> IO ()
, dNewt :: NewtEf -> IO ()
@ -384,36 +409,58 @@ drivers
-> MultiEyreApi
-> Ship
-> Bool
-> (EvErr -> STM ())
-> (RunReq -> STM ())
-> (Term.TSize, Term.Client)
-> (Text -> RIO e ())
-> ([EvErr], RAcquire e (Drivers e))
drivers env multi who isFake plan termSys stderr =
(initialEvents, runDrivers)
where
(behnBorn, runBehn) = behn env plan
(amesBorn, runAmes) = ames env who isFake plan stderr
(httpBorn, runEyre) = eyre env multi who plan isFake
(clayBorn, runClay) = clay env plan
(irisBorn, runIris) = client env plan
(termBorn, runTerm) = Term.term env termSys plan
initialEvents = mconcat [behnBorn, clayBorn, amesBorn, httpBorn,
termBorn, irisBorn]
-> RAcquire e ([Ev], RAcquire e Drivers)
drivers env multi who isFake plan termSys stderr = do
(behnBorn, runBehn) <- rio behn'
(termBorn, runTerm) <- rio (Term.term' termSys)
(amesBorn, runAmes) <- rio (Ames.ames' who isFake stderr)
(httpBorn, runEyre) <- rio (Eyre.eyre' multi who isFake)
(clayBorn, runClay) <- rio Clay.clay'
(irisBorn, runIris) <- rio Iris.client'
runDrivers = do
dNewt <- runAmes
dBehn <- liftAcquire $ runBehn
dAmes <- pure $ const $ pure ()
dIris <- runIris
dEyre <- runEyre
dSync <- runClay
dTerm <- runTerm
pure (Drivers{..})
let initialEvents = mconcat [behnBorn,clayBorn,amesBorn,httpBorn,irisBorn,termBorn]
let runDrivers = do
behn <- runBehn
term <- runTerm
ames <- runAmes
iris <- runIris
eyre <- runEyre
clay <- runClay
-- Sources lower in the list are starved until sources above them
-- have no events to offer.
acquireWorker "Event Prioritization" $ forever $ atomically $ do
let x = diEventSource
let eventSources = [x term, x clay, x behn, x iris, x eyre, x ames]
pullEvent eventSources >>= \case
Nothing -> retry
Just rr -> plan rr
pure $ Drivers
{ dTerm = diOnEffect term
, dBehn = diOnEffect behn
, dNewt = diOnEffect ames
, dIris = diOnEffect iris
, dEyre = diOnEffect eyre
, dSync = diOnEffect clay
}
pure (initialEvents, runDrivers)
where
pullEvent :: [STM (Maybe a)] -> STM (Maybe a)
pullEvent [] = pure Nothing
pullEvent (d:ds) = d >>= \case
Just r -> pure (Just r)
Nothing -> pullEvent ds
-- Route Effects to Drivers ----------------------------------------------------
router :: HasLogFunc e => STM FX -> Drivers e -> RIO e ()
router :: HasLogFunc e => STM FX -> Drivers -> RIO e ()
router waitFx Drivers {..} = forever $ do
fx <- atomically waitFx
for_ fx $ \ef -> do
@ -421,7 +468,6 @@ router waitFx Drivers {..} = forever $ do
case ef of
GoodParse (EfVega _ _ ) -> error "TODO"
GoodParse (EfExit _ _ ) -> error "TODO"
GoodParse (EfVane (VEAmes ef)) -> io (dAmes ef)
GoodParse (EfVane (VEBehn ef)) -> io (dBehn ef)
GoodParse (EfVane (VEBoat ef)) -> io (dSync ef)
GoodParse (EfVane (VEClay ef)) -> io (dSync ef)
@ -450,7 +496,7 @@ logEffect ef = logDebug $ display $ "[EFFECT]\n" <> pretty ef
FailParse n -> pack $ unlines $ fmap ("\t" <>) $ lines $ ppShow n
data ComputeConfig = ComputeConfig
{ ccOnWork :: STM Serf.EvErr
{ ccOnWork :: STM RunReq
, ccOnKill :: STM ()
, ccOnSave :: STM ()
, ccOnScry :: STM (Wen, Gang, Path, Maybe (Term, Noun) -> IO ())
@ -464,9 +510,9 @@ runCompute :: forall e . HasKingEnv e => Serf.Serf -> ComputeConfig -> RIO e ()
runCompute serf ComputeConfig {..} = do
logDebug "runCompute"
let onCR = asum [ ccOnKill <&> Serf.RRKill
let onRR = asum [ ccOnKill <&> Serf.RRKill
, ccOnSave <&> Serf.RRSave
, ccOnWork <&> Serf.RRWork
, ccOnWork
, ccOnScry <&> \(w,g,p,k) -> Serf.RRScry w g p k
]
@ -483,7 +529,7 @@ runCompute serf ComputeConfig {..} = do
let maxBatchSize = 10
io (Serf.run serf maxBatchSize ccLastEvInLog onCR ccPutResult onSpin)
io (Serf.run serf maxBatchSize ccLastEvInLog onRR ccPutResult onSpin)
-- Event-Log Persistence Thread ------------------------------------------------

View File

@ -81,10 +81,9 @@ jobMug (DoWork (Work _ mug _ _ )) = mug
-- API To IO Drivers -----------------------------------------------------------
data DriverApi = DriverApi
data DriverApi ef = DriverApi
{ diEventSource :: STM (Maybe RunReq)
, diOnEffect :: BehnEf -> IO ()
, diBlockUntilBorn :: STM ()
, diOnEffect :: ef -> IO ()
}

View File

@ -556,6 +556,7 @@ run serf maxBatchSize getLastEvInLog onInput sendOn spin = topLoop
onWorkResp :: Wen -> EvErr -> Work -> IO ()
onWorkResp wen (EvErr evn err) = \case
WDone eid hash fx -> do
io $ err (RunOkay eid)
atomically $ sendOn ((Fact eid hash wen (toNoun evn)), fx)
WSwap eid hash (wen, noun) fx -> do
io $ err (RunSwap eid hash wen noun fx)

View File

@ -79,9 +79,10 @@ data EvErr = EvErr Ev (WorkError -> IO ())
- `RunBail`: Event processing failed and all attempt to replace it
with a failure-notice event also caused crashes. We are really fucked.
-}
data WorkError
= RunSwap EventId Mug Wen Noun FX
data WorkError -- TODO Rename type and constructors
= RunSwap EventId Mug Wen Noun FX -- TODO Maybe provide less info here?
| RunBail [Goof]
| RunOkay EventId
{-
- RRWork: Ask the serf to do work, will output (Fact, FX) if work