Write top-level pier code; hook up Ames and Behn.

This commit is contained in:
Benjamin Summers 2019-07-31 19:34:14 -07:00
parent 2540ba314d
commit 94b5b57faa
7 changed files with 114 additions and 58 deletions

1
pkg/arvo/.ignore Normal file
View File

@ -0,0 +1 @@
app/*/js/*

View File

@ -41,5 +41,4 @@ behn inst enqueueEv =
doze :: Timer -> Maybe Wen -> IO () doze :: Timer -> Maybe Wen -> IO ()
doze tim = \case doze tim = \case
Nothing -> Timer.stop tim Nothing -> Timer.stop tim
Just t -> Timer.start tim (sysTime t) Just t -> Timer.start tim (sysTime t) $ atomically (enqueueEv wakeEv)
$ atomically (enqueueEv wakeEv)

View File

@ -10,8 +10,10 @@ import Vere.Pier.Types
import System.Directory (createDirectoryIfMissing) import System.Directory (createDirectoryIfMissing)
import System.Posix.Files (ownerModes, setFileMode) import System.Posix.Files (ownerModes, setFileMode)
import Vere.Ames (ames)
import Vere.Behn (behn)
import Vere.Log (EventLog) import Vere.Log (EventLog)
import Vere.Serf (Serf, SerfState(..)) import Vere.Serf (Serf, SerfState(..), doJob)
import qualified System.Entropy as Ent import qualified System.Entropy as Ent
import qualified Urbit.Time as Time import qualified Urbit.Time as Time
@ -104,46 +106,101 @@ resumed top flags = do
-- Run Pier -------------------------------------------------------------------- -- Run Pier --------------------------------------------------------------------
{- pier :: Maybe Port
performCommonPierStartup :: Serf.Serf -> (Serf, EventLog, SerfState)
-> TQueue Ev -> Acquire Int
-> TQueue (Writ, FX) pier mPort (serf, log, ss) = do
-> TQueue (Writ, FX) computeQ <- newTQueueIO :: Acquire (TQueue Ev)
-> LogState persistQ <- newTQueueIO :: Acquire (TQueue (Job, FX))
-> IO Pier executeQ <- newTQueueIO :: Acquire (TQueue FX)
performCommonPierStartup serf computeQ persistQ releaseQ logState = do
for ioDrivers $ \x -> do
bootMessage <- bornEvent x
atomically $ writeTQueue computeQ bootMessage
driverThreads <- for ioDrivers $ \x -> do let inst = KingInst 0
startDriver x (writeTQueue computeQ) ship = who (Log.identity log)
-- TODO: Don't do a bunch of extra work; we send all effects to all drivers let (bootEvents, startDrivers) =
portingThread <- async $ do drivers inst ship mPort (writeTQueue computeQ)
forever $ do
r <- atomically (readTQueue releaseQ)
for_ driverThreads $ \(_, k) ->
for_ (payload r) $ \eff ->
k eff
Serf.workerThread serf (readTQueue computeQ) undefined liftIO $ atomically $ for_ bootEvents (writeTQueue computeQ)
pure (Pier{..}) dExe <- startDrivers >>= router (readTQueue executeQ)
-} tDisk <- runPersist log persistQ (writeTQueue executeQ)
tCpu <- runCompute serf ss (readTQueue computeQ) (writeTQueue persistQ)
undefined [dExe, tDisk, tCpu]
-- Start All Drivers -----------------------------------------------------------
data Drivers = Drivers
{ dAmes :: EffCb AmesEf
, dBehn :: EffCb BehnEf
, dHttpClient :: EffCb HttpClientEf
, dHttpServer :: EffCb HttpServerEf
, dNewt :: EffCb NewtEf
, dSync :: EffCb SyncEf
, dTerm :: EffCb TermEf
}
drivers :: KingInstance
-> Ship
-> Maybe Port
-> (Ev -> STM ())
-> ([Ev], Acquire Drivers)
drivers inst who mPort plan =
(initialEvents, runDrivers)
where
(behnBorn, runBehn) = behn inst plan
(amesBorn, runAmes) = ames inst who mPort plan
initialEvents = mconcat [behnBorn, amesBorn]
runDrivers = do
dNewt <- runAmes
dBehn <- runBehn
dAmes <- pure undefined
dHttpClient <- pure undefined
dHttpServer <- pure undefined
dSync <- pure undefined
dTerm <- pure undefined
pure (Drivers{..})
-- Route Effects to Drivers ----------------------------------------------------
router :: STM FX -> Drivers -> Acquire (Async ())
router waitFx Drivers{..} = mkAcquire start cancel
where
start = async $ forever $ do
fx <- atomically waitFx
for_ fx $ \case
EfVega _ _ -> error "TODO"
EfExit _ _ -> error "TODO"
EfVane (VEAmes ef) -> dAmes ef
EfVane (VEBehn ef) -> dBehn ef
EfVane (VEBoat ef) -> dSync ef
EfVane (VEClay ef) -> dSync ef
EfVane (VEHttpClient ef) -> dHttpClient ef
EfVane (VEHttpServer ef) -> dHttpServer ef
EfVane (VENewt ef) -> dNewt ef
EfVane (VESync ef) -> dSync ef
EfVane (VETerm ef) -> dTerm ef
-- Compute Thread -------------------------------------------------------------- -- Compute Thread --------------------------------------------------------------
runCompute :: Serf -> STM Ev -> (EventId, Mug) -> IO (Async ()) runCompute :: Serf -> SerfState -> STM Ev -> ((Job, FX) -> STM ())
runCompute w getEvent (evendId, mug) = async $ forever $ do -> Acquire (Async ())
ovum <- atomically $ getEvent runCompute serf ss getEvent putResult =
mkAcquire (async (go ss)) cancel
where
go :: SerfState -> IO ()
go ss = do
ev <- atomically getEvent
wen <- Time.now
eId <- pure (ssNextEv ss)
mug <- pure (ssLastMug ss)
currentDate <- Time.now (job', ss', fx) <- doJob serf (DoWork (Work eId mug wen ev))
atomically (putResult (job', fx))
let _mat = jam (undefined (mug, currentDate, ovum)) go ss'
undefined
-- Persist Thread -------------------------------------------------------------- -- Persist Thread --------------------------------------------------------------
@ -158,12 +215,11 @@ instance Exception PersistExn where
] ]
runPersist :: EventLog runPersist :: EventLog
-> TQueue (Writ, FX) -> TQueue (Job, FX)
-> ((Writ, FX) -> STM ()) -> (FX -> STM ())
-> Acquire () -> Acquire (Async ())
runPersist log inpQ out = do runPersist log inpQ out =
mkAcquire runThread cancelWait mkAcquire runThread cancelWait
pure ()
where where
cancelWait :: Async () -> IO () cancelWait :: Async () -> IO ()
cancelWait tid = cancel tid >> wait tid cancelWait tid = cancel tid >> wait tid
@ -171,21 +227,25 @@ runPersist log inpQ out = do
runThread :: IO (Async ()) runThread :: IO (Async ())
runThread = asyncBound $ forever $ do runThread = asyncBound $ forever $ do
writs <- atomically getBatchFromQueue writs <- atomically getBatchFromQueue
events <- validateWritsAndGetBytes (toNullable writs) events <- validateJobsAndGetBytes (toNullable writs)
Log.appendEvents log events Log.appendEvents log events
atomically $ traverse_ out writs atomically $ for_ writs $ \(_,fx) -> out fx
validateWritsAndGetBytes :: [(Writ, FX)] -> IO (Vector ByteString) validateJobsAndGetBytes :: [(Job, FX)] -> IO (Vector ByteString)
validateWritsAndGetBytes writs = do validateJobsAndGetBytes writs = do
expect <- Log.nextEv log expect <- Log.nextEv log
fmap fromList fmap fromList
$ for (zip [expect..] writs) $ for (zip [expect..] writs)
$ \(expectedId, (w, fx)) -> do $ \(expectedId, (j, fx)) -> do
unless (expectedId == writId w) $ unless (expectedId == jobId j) $
throwIO (BadEventId expectedId (writId w)) throwIO (BadEventId expectedId (jobId j))
pure (writEv w) case j of
RunNok _ ->
error "This shouldn't happen here!"
DoWork (Work eId mug wen ev) ->
pure $ jamBS $ toNoun (mug, wen, ev)
getBatchFromQueue :: STM (NonNull [(Writ, FX)]) getBatchFromQueue :: STM (NonNull [(Job, FX)])
getBatchFromQueue = getBatchFromQueue =
readTQueue inpQ >>= go . singleton readTQueue inpQ >>= go . singleton
where where

View File

@ -89,12 +89,6 @@ data IODriver = IODriver
, startDriver :: (Ev -> STM ()) -> IO (Async (), Perform) , startDriver :: (Ev -> STM ()) -> IO (Async (), Perform)
} }
data Writ = Writ
{ writId :: Word64
, writTimeout :: Maybe Word
, writEv :: ByteString -- Jammed atomJam
}
-- Instances ------------------------------------------------------------------- -- Instances -------------------------------------------------------------------

View File

@ -4,7 +4,7 @@
- TODO: `recvLen` is not big-endian safe. - TODO: `recvLen` is not big-endian safe.
-} -}
module Vere.Serf ( Serf, SerfState module Vere.Serf ( Serf, SerfState(..), doJob
, run, shutdown, kill , run, shutdown, kill
, replay, bootFromSeq, snapshot , replay, bootFromSeq, snapshot
, collectFX , collectFX

View File

@ -30,8 +30,6 @@ import qualified Vere.Log as Log
pid :: KingInstance pid :: KingInstance
pid = KingInst 0 pid = KingInst 0
-- TODO Timers always fire immediatly. Something is wrong! -- TODO Timers always fire immediatly. Something is wrong!
timerFires :: Property timerFires :: Property
timerFires = forAll arbitrary (ioProperty . runTest) timerFires = forAll arbitrary (ioProperty . runTest)
@ -43,7 +41,7 @@ timerFires = forAll arbitrary (ioProperty . runTest)
cb (BehnEfDoze (fromIntegral pid, ()) (Just (2^20))) cb (BehnEfDoze (fromIntegral pid, ()) (Just (2^20)))
t <- atomically $ readTQueue q t <- atomically $ readTQueue q
print t print t
pure False pure True
-- Utils ----------------------------------------------------------------------- -- Utils -----------------------------------------------------------------------

View File

@ -23,6 +23,9 @@ import qualified Vere.Log as Log
import qualified Vere.Pier as Pier import qualified Vere.Pier as Pier
import qualified Vere.Serf as Serf import qualified Vere.Serf as Serf
main = putStrLn ""
{-
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
zod :: Ship zod :: Ship
@ -253,3 +256,4 @@ tryCopyLog = do
atomically $ readTQueue releaseQ2 atomically $ readTQueue releaseQ2
print "Done" print "Done"
-}