king: Each event comes with error callback, but all do nothing for now.

This commit is contained in:
~siprel 2020-06-02 20:48:07 +00:00
parent 61bdb3cac0
commit 28f464fc42
12 changed files with 155 additions and 85 deletions

View File

@ -1,7 +1,9 @@
# New IPC
Stubbed out:
- [x] Handle replacement events (stubbed out now b/c interface can't
handle unparsed nouns)
handle unparsed nouns)
- [x] Handle IPC errors by killing serf process.
- [x] Handle `peek` and `pack` in `swimming` flow.
- [x] Documentation for `Urbit.Vere.Serf.IPC`.
@ -11,10 +13,12 @@ Stubbed out:
- [x] Handle serf stderr message correctly.
- [x] Bring back `logEvent`.
- [ ] Snapshots should block until that event is commited to disk.
- [ ] Hook up error callbacks to IO Drivers.
King-Haskell specific features:
- [x] Re-implement `collectFX` flow.
- [x] Re-implement `collectFX` flow in Serf/Pier.
- [ ] Hook up `collectFX` to CLI.
- [ ] Test new `collectFX` flow
Performance:
@ -29,11 +33,17 @@ Polish:
- [ ] Logging for new IPC flow.
- [ ] Logging for boot sequence.
- [ ] Bring back progress bars.
- [ ] Hook up error callbacks to IO Drivers.
Unrelated bugs:
# Misc Bugs
- [ ] Handle ^C in connected terminals.
- [ ] terminal driver seems to have a race condition when spinner changed
too quickly.
- [ ] Handle ^C in connected terminals. It should interrupt current event.
- [ ] The terminal driver seems to have a race condition when spinner
changed too quickly.
- [ ] King should shutdown promptly on ^C. Always takes 2s in practice.
# Cleanup
- [ ] Break most logic from `Main.hs` out into modules.
- [ ] Simplify `Main.hs` flows.
- [ ] Cleanup Terminal Driver code.
- [ ] Spin off `Urbit.Noun` into it's own package.

View File

@ -95,6 +95,10 @@ udpServ isFake who = do
Nothing -> fakeUdpServ
Just host -> realUdpServ port host
bornFailed :: e -> WorkError -> IO ()
bornFailed env _ = runRIO env $ do
pure () -- TODO What can we do?
{-|
inst -- Process instance number.
who -- Which ship are we?
@ -112,15 +116,15 @@ ames
=> e
-> Ship
-> Bool
-> QueueEv
-> (EvErr -> STM ())
-> (Text -> RIO e ())
-> ([Ev], RAcquire e (EffCb e NewtEf))
-> ([EvErr], RAcquire e (EffCb e NewtEf))
ames env who isFake enqueueEv stderr = (initialEvents, runAmes)
where
king = fromIntegral (env ^. kingIdL)
initialEvents :: [Ev]
initialEvents = [bornEv king]
initialEvents :: [EvErr]
initialEvents = [EvErr (bornEv king) (bornFailed env)]
runAmes :: RAcquire e (EffCb e NewtEf)
runAmes = do
@ -136,10 +140,12 @@ ames env who isFake enqueueEv stderr = (initialEvents, runAmes)
aResolvr <- resolvServ aTurfs (usSend aUdpServ) stderr
pure (AmesDrv { .. })
hearFailed _ = pure ()
queuePacketsThread :: UdpServ -> RIO e (Async ())
queuePacketsThread UdpServ {..} = async $ forever $ atomically $ do
(p, a, b) <- usRecv
enqueueEv (hearEv p a b)
enqueueEv (EvErr (hearEv p a b) hearFailed)
stop :: AmesDrv -> RIO e ()
stop AmesDrv {..} = io $ do

View File

@ -26,13 +26,21 @@ wakeEv = EvBlip $ BlipEvBehn $ BehnEvWake () ()
sysTime = view Time.systemTime
behn :: HasKingId e => e -> QueueEv -> ([Ev], Acquire (EffCb e BehnEf))
bornFailed :: e -> WorkError -> IO ()
bornFailed env _ = runRIO env $ do
pure () -- TODO Ship is fucked. Kill it?
wakeErr :: WorkError -> IO ()
wakeErr _ = pure ()
behn
:: HasKingId e => e -> (EvErr -> STM ()) -> ([EvErr], Acquire (EffCb e BehnEf))
behn env enqueueEv =
(initialEvents, runBehn)
where
king = fromIntegral (env ^. kingIdL)
initialEvents = [bornEv king]
initialEvents = [EvErr (bornEv king) (bornFailed env)]
runBehn :: Acquire (EffCb e BehnEf)
runBehn = do
@ -48,4 +56,4 @@ behn env enqueueEv =
doze :: Timer -> Maybe Wen -> IO ()
doze tim = \case
Nothing -> Timer.stop tim
Just t -> Timer.start tim (sysTime t) $ atomically (enqueueEv wakeEv)
Just t -> Timer.start tim (sysTime t) $ atomically (enqueueEv (EvErr wakeEv wakeErr))

View File

@ -113,18 +113,26 @@ buildActionListFromDifferences fp snapshot = do
--------------------------------------------------------------------------------
clay :: forall e. (HasPierConfig e, HasLogFunc e, HasKingId e)
=> e -> QueueEv -> ([Ev], RAcquire e (EffCb e SyncEf))
clay env enqueueEv =
boatFailed :: e -> WorkError -> IO ()
boatFailed env _ = runRIO env $ do
pure () -- TODO What can we do?
clay
:: forall e
. (HasPierConfig e, HasLogFunc e, HasKingId e)
=> e
-> (EvErr -> STM ())
-> ([EvErr], RAcquire e (EffCb e SyncEf))
clay env plan =
(initialEvents, runSync)
where
king = fromIntegral (env ^. kingIdL)
initialEvents = [
EvBlip $ BlipEvBoat $ BoatEvBoat () ()
-- TODO: In the case of -A, we need to read all the data from the
-- specified directory and shove it into an %into event.
]
boatEv = EvBlip $ BlipEvBoat $ BoatEvBoat () ()
-- TODO: In the case of -A, we need to read all the data from the
-- specified directory and shove it into an %into event.
initialEvents = [EvErr boatEv (boatFailed env)]
runSync :: RAcquire e (EffCb e SyncEf)
runSync = handleEffect <$> mkRAcquire start stop
@ -154,8 +162,15 @@ clay env enqueueEv =
logDebug $ displayShow ("(clay) dirk actions: ", actions)
let !intoList = map (actionsToInto dir) actions
atomically $ enqueueEv $ EvBlip $ BlipEvSync $
SyncEvInto (Some (king, ())) desk False intoList
let syncEv = EvBlip
$ BlipEvSync
$ SyncEvInto (Some (king, ())) desk False intoList
let syncFailed _ = pure ()
atomically $ plan (EvErr syncEv syncFailed)
atomically $ modifyTVar
(cdMountPoints cd)

View File

@ -70,8 +70,12 @@ bornEv king = servEv $ HttpServerEvBorn (king, ()) ()
liveEv :: ServId -> Ports -> Ev
liveEv sId Ports {..} = servEv $ HttpServerEvLive (sId, ()) pHttp pHttps
cancelEv :: ServId -> ReqId -> Ev
cancelEv sId reqId = servEv $ HttpServerEvCancelRequest (sId, reqId, 1, ()) ()
cancelEv :: ServId -> ReqId -> EvErr
cancelEv sId reqId =
EvErr (servEv (HttpServerEvCancelRequest (sId, reqId, 1, ()) ())) cancelFailed
cancelFailed :: WorkError -> IO ()
cancelFailed _ = pure ()
reqEv :: ServId -> ReqId -> WhichServer -> Address -> HttpRequest -> Ev
reqEv sId reqId which addr req = case which of
@ -170,7 +174,7 @@ startServ
-> Ship
-> Bool
-> HttpServerConf
-> (Ev -> STM ())
-> (EvErr -> STM ())
-> RIO e Serv
startServ multi who isFake conf plan = do
logTrace (displayShow ("EYRE", "startServ"))
@ -205,9 +209,11 @@ startServ multi who isFake conf plan = do
noHttp <- view (networkConfigL . ncNoHttp)
noHttps <- view (networkConfigL . ncNoHttps)
let reqEvFailed _ = pure ()
let onReq :: WhichServer -> Ship -> Word64 -> ReqInfo -> STM ()
onReq which _ship reqId reqInfo =
plan (requestEvent srvId which reqId reqInfo)
plan $ EvErr (requestEvent srvId which reqId reqInfo) reqEvFailed
let onKilReq :: Ship -> Word64 -> STM ()
onKilReq _ship = plan . cancelEv srvId . fromIntegral
@ -269,21 +275,25 @@ startServ multi who isFake conf plan = do
-- Eyre Driver -----------------------------------------------------------------
bornFailed :: e -> WorkError -> IO ()
bornFailed env _ = runRIO env $ do
pure () -- TODO What should this do?
eyre
:: forall e
. (HasShipEnv e, HasKingId e)
=> e
-> MultiEyreApi
-> Ship
-> QueueEv
-> (EvErr -> STM ())
-> Bool
-> ([Ev], RAcquire e (EffCb e HttpServerEf))
-> ([EvErr], RAcquire e (EffCb e HttpServerEf))
eyre env multi who plan isFake = (initialEvents, runHttpServer)
where
king = fromIntegral (env ^. kingIdL)
initialEvents :: [Ev]
initialEvents = [bornEv king]
initialEvents :: [EvErr]
initialEvents = [EvErr (bornEv king) (bornFailed env)]
runHttpServer :: RAcquire e (EffCb e HttpServerEf)
runHttpServer = handleEf <$> mkRAcquire
@ -306,13 +316,15 @@ eyre env multi who plan isFake = (initialEvents, runHttpServer)
logDebug "Done restating http server"
pure res
liveFailed _ = pure ()
handleEf :: Drv -> HttpServerEf -> RIO e ()
handleEf drv = \case
HSESetConfig (i, ()) conf -> do
logDebug (displayShow ("EYRE", "%set-config"))
Serv {..} <- restart drv conf
logDebug (displayShow ("EYRE", "%set-config", "Sending %live"))
atomically $ plan (liveEv sServId sPorts)
atomically $ plan (EvErr (liveEv sServId sPorts) liveFailed)
logDebug "Write ports file"
io (writePortsFile sPortsFile sPorts)
HSEResponse (i, req, _seq, ()) ev -> do

View File

@ -57,18 +57,22 @@ bornEv king =
--------------------------------------------------------------------------------
bornFailed :: e -> WorkError -> IO ()
bornFailed env _ = runRIO env $ do
pure () -- TODO What to do in this case?
client
:: forall e
. (HasLogFunc e, HasKingId e)
=> e
-> QueueEv
-> ([Ev], RAcquire e (EffCb e HttpClientEf))
client env enqueueEv = (initialEvents, runHttpClient)
-> (EvErr -> STM ())
-> ([EvErr], RAcquire e (EffCb e HttpClientEf))
client env plan = (initialEvents, runHttpClient)
where
kingId = view (kingIdL . to fromIntegral) env
initialEvents :: [Ev]
initialEvents = [bornEv kingId]
initialEvents :: [EvErr]
initialEvents = [EvErr (bornEv kingId) (bornFailed env)]
runHttpClient :: RAcquire e (EffCb e HttpClientEf)
runHttpClient = handleEffect <$> mkRAcquire start stop
@ -133,8 +137,14 @@ client env enqueueEv = (initialEvents, runHttpClient)
planEvent :: ReqId -> HttpEvent -> RIO e ()
planEvent id ev = do
logDebug $ displayShow ("(http client response)", id, (describe ev))
atomically $ enqueueEv $ EvBlip $ BlipEvHttpClient $
HttpClientEvReceive (kingId, ()) (fromIntegral id) ev
let recvEv = EvBlip
$ BlipEvHttpClient
$ HttpClientEvReceive (kingId, ()) (fromIntegral id) ev
let recvFailed _ = pure ()
atomically $ plan (EvErr recvEv recvFailed)
-- show an HttpEvent with byte count instead of raw data
describe :: HttpEvent -> String

View File

@ -231,7 +231,7 @@ pier
-> MultiEyreApi
-> RAcquire PierEnv ()
pier (serf, log) vSlog mStart multi = do
computeQ <- newTQueueIO
computeQ <- newTQueueIO @_ @Serf.EvErr
persistQ <- newTQueueIO
executeQ <- newTQueueIO
saveM <- newEmptyTMVarIO
@ -292,10 +292,8 @@ pier (serf, log) vSlog mStart multi = do
io $ atomically $ for_ bootEvents (writeTQueue computeQ)
let stubErrCallback = \_ -> pure ()
let computeConfig = ComputeConfig
{ ccOnWork = (`Serf.EvErr` stubErrCallback) <$> readTQueue computeQ
{ ccOnWork = readTQueue computeQ
, ccOnKill = takeTMVar shutdownM
, ccOnSave = takeTMVar saveM
, ccPutResult = writeTQueue persistQ
@ -357,13 +355,13 @@ drivers
-> MultiEyreApi
-> Ship
-> Bool
-> (Ev -> STM ())
-> (EvErr -> STM ())
-> STM ()
-> (Term.TSize, Term.Client)
-> (Text -> RIO e ())
-> ([Ev], RAcquire e (Drivers e))
-> ([EvErr], RAcquire e (Drivers e))
drivers env multi who isFake plan shutdownSTM termSys stderr =
(initialEvents, runDrivers)
(initialEvents, runDrivers) -- TODO
where
(behnBorn, runBehn) = behn env plan
(amesBorn, runAmes) = ames env who isFake plan stderr

View File

@ -7,6 +7,7 @@ module Urbit.Vere.Pier.Types where
import Urbit.Prelude hiding (Term)
import Urbit.Noun (Term)
import Urbit.Arvo
import Urbit.Time
@ -27,6 +28,28 @@ instance Show Nock where
show _ = "Nock"
-- Events With Error Callbacks -------------------------------------------------
type Gang = Maybe (HoonSet Ship)
type Goof = (Term, [Tank])
{-|
Two types of serf failures.
- `RunSwap`: Event processing failed, but the serf replaced it with
another event which succeeded.
- `RunBail`: Event processing failed and all attempt to replace it
with a failure-notice event also caused crashes. We are really fucked.
-}
data WorkError
= RunSwap EventId Mug Wen Noun FX
| RunBail [Goof]
data EvErr = EvErr Ev (WorkError -> IO ())
--------------------------------------------------------------------------------
type EventId = Word64
@ -84,8 +107,6 @@ data Order
deriveToNoun ''Order
type QueueEv = Ev -> STM ()
type EffCb e a = a -> RIO e ()
type Perform = Ef -> IO ()

View File

@ -22,8 +22,8 @@ import qualified Data.Conduit.Combinators as CC
import qualified Urbit.Vere.Log as Log
import qualified Urbit.Vere.Serf.IPC as X (Config(..), EvErr(..), Flag(..),
RunReq(..), Serf, run, snapshot,
start, stop)
RunReq(..), Serf, WorkError(..), run,
snapshot, start, stop)
-- ort System.ProgressBar
-- ort Urbit.King.App (HasStderrLogFunc(..))

View File

@ -93,10 +93,6 @@ import qualified Urbit.Time as Time
-- IPC Types -------------------------------------------------------------------
type Gang = Maybe (HoonSet Ship)
type Goof = (Term, [Tank])
data Live
= LExit Atom -- exit status code
| LSave EventId
@ -546,24 +542,6 @@ swim serf = do
-- Running Ship Flow -----------------------------------------------------------
{-|
Two types of serf failures.
- `RunSwap`: Event processing failed, but the serf replaced it with
another event which succeeded.
- `RunBail`: Event processing failed and all attempt to replace it
with a failure-notice event also caused crashes. We are really fucked.
-}
data WorkError
= RunSwap EventId Mug Wen Noun FX
| RunBail [Goof]
{-
An event and a callback to inform the IO Driver about failures.
-}
data EvErr = EvErr Ev (WorkError -> IO ())
{-
- RRWork: Ask the serf to do work, will output (Fact, FX) if work
succeeded and call callback on failure.

View File

@ -491,6 +491,14 @@ localClient doneSignal = fst <$> mkRAcquire start stop
--------------------------------------------------------------------------------
initialBlewFailed :: e -> WorkError -> IO ()
initialBlewFailed env _ = runRIO env $ do
pure () -- TODO What do?
initialHailFailed :: e -> WorkError -> IO ()
initialHailFailed env _ = runRIO env $ do
pure () -- TODO What do?
{-|
Terminal Driver
-}
@ -498,14 +506,17 @@ term :: forall e. (HasPierConfig e, HasLogFunc e, HasKingId e)
=> e
-> (T.TSize, Client)
-> (STM ())
-> QueueEv
-> ([Ev], RAcquire e (EffCb e TermEf))
term env (tsize, Client{..}) shutdownSTM enqueueEv =
-> (EvErr -> STM ())
-> ([EvErr], RAcquire e (EffCb e TermEf))
term env (tsize, Client{..}) shutdownSTM plan =
(initialEvents, runTerm)
where
T.TSize wi hi = tsize
initialEvents = [(initialBlew wi hi), initialHail]
initialEvents =
[ EvErr (initialBlew wi hi) (initialBlewFailed env)
, EvErr initialHail (initialHailFailed env)
]
runTerm :: RAcquire e (EffCb e TermEf)
runTerm = do
@ -521,8 +532,9 @@ term env (tsize, Client{..}) shutdownSTM enqueueEv =
atomically take >>= \case
Nothing -> pure ()
Just b -> do
let blip = EvBlip $ BlipEvTerm $ TermEvBelt (UD 1, ()) $ b
atomically $ enqueueEv $ blip
let beltEv = EvBlip $ BlipEvTerm $ TermEvBelt (UD 1, ()) $ b
let beltFailed _ = pure ()
atomically $ plan (EvErr beltEv beltFailed)
handleEffect :: TermEf -> RIO e ()
handleEffect = \case

View File

@ -73,7 +73,7 @@ runNetworkApp = runRIO NetworkTestApp
}
runGala
:: forall e . HasAmes e => Word8 -> RAcquire e (TQueue Ev, EffCb e NewtEf)
:: forall e . HasAmes e => Word8 -> RAcquire e (TQueue EvErr, EffCb e NewtEf)
runGala point = do
env <- ask
que <- newTQueueIO
@ -85,14 +85,14 @@ runGala point = do
where
noStderr _ = pure ()
waitForPacket :: TQueue Ev -> Bytes -> IO Bool
waitForPacket :: TQueue EvErr -> Bytes -> IO Bool
waitForPacket q val = go
where
go =
atomically (readTQueue q) >>= \case
EvBlip (BlipEvNewt (NewtEvBorn (_, ()) ())) -> go
EvBlip (BlipEvAmes (AmesEvHear () _ bs)) -> pure (bs == val)
_ -> pure False
EvErr (EvBlip (BlipEvNewt (NewtEvBorn (_, ()) ()))) _ -> go
EvErr (EvBlip (BlipEvAmes (AmesEvHear () _ bs))) _ -> pure (bs == val)
_ -> pure False
runRAcquire :: RAcquire e a -> RIO e a
runRAcquire acq = rwith acq pure