From 94b5b57faa453b2f19f5ab0c6a6207e8afc192cb Mon Sep 17 00:00:00 2001 From: Benjamin Summers Date: Wed, 31 Jul 2019 19:34:14 -0700 Subject: [PATCH] Write top-level pier code; hook up Ames and Behn. --- pkg/arvo/.ignore | 1 + pkg/hs-urbit/lib/Vere/Behn.hs | 3 +- pkg/hs-urbit/lib/Vere/Pier.hs | 152 +++++++++++++++++++--------- pkg/hs-urbit/lib/Vere/Pier/Types.hs | 6 -- pkg/hs-urbit/lib/Vere/Serf.hs | 2 +- pkg/hs-urbit/test/BehnTests.hs | 4 +- pkg/hs-vere/app/test/Main.hs | 4 + 7 files changed, 114 insertions(+), 58 deletions(-) create mode 100644 pkg/arvo/.ignore diff --git a/pkg/arvo/.ignore b/pkg/arvo/.ignore new file mode 100644 index 000000000..a404d5933 --- /dev/null +++ b/pkg/arvo/.ignore @@ -0,0 +1 @@ +app/*/js/* diff --git a/pkg/hs-urbit/lib/Vere/Behn.hs b/pkg/hs-urbit/lib/Vere/Behn.hs index 0507941cb..5c1ccaae4 100644 --- a/pkg/hs-urbit/lib/Vere/Behn.hs +++ b/pkg/hs-urbit/lib/Vere/Behn.hs @@ -41,5 +41,4 @@ behn inst enqueueEv = doze :: Timer -> Maybe Wen -> IO () doze tim = \case Nothing -> Timer.stop tim - Just t -> Timer.start tim (sysTime t) - $ atomically (enqueueEv wakeEv) + Just t -> Timer.start tim (sysTime t) $ atomically (enqueueEv wakeEv) diff --git a/pkg/hs-urbit/lib/Vere/Pier.hs b/pkg/hs-urbit/lib/Vere/Pier.hs index c9285b089..36c93503c 100644 --- a/pkg/hs-urbit/lib/Vere/Pier.hs +++ b/pkg/hs-urbit/lib/Vere/Pier.hs @@ -10,8 +10,10 @@ import Vere.Pier.Types import System.Directory (createDirectoryIfMissing) import System.Posix.Files (ownerModes, setFileMode) +import Vere.Ames (ames) +import Vere.Behn (behn) import Vere.Log (EventLog) -import Vere.Serf (Serf, SerfState(..)) +import Vere.Serf (Serf, SerfState(..), doJob) import qualified System.Entropy as Ent import qualified Urbit.Time as Time @@ -104,46 +106,101 @@ resumed top flags = do -- Run Pier -------------------------------------------------------------------- -{- -performCommonPierStartup :: Serf.Serf - -> TQueue Ev - -> TQueue (Writ, FX) - -> TQueue (Writ, FX) - -> LogState - -> IO Pier -performCommonPierStartup serf computeQ persistQ releaseQ logState = do - for ioDrivers $ \x -> do - bootMessage <- bornEvent x - atomically $ writeTQueue computeQ bootMessage +pier :: Maybe Port + -> (Serf, EventLog, SerfState) + -> Acquire Int +pier mPort (serf, log, ss) = do + computeQ <- newTQueueIO :: Acquire (TQueue Ev) + persistQ <- newTQueueIO :: Acquire (TQueue (Job, FX)) + executeQ <- newTQueueIO :: Acquire (TQueue FX) - driverThreads <- for ioDrivers $ \x -> do - startDriver x (writeTQueue computeQ) + let inst = KingInst 0 + ship = who (Log.identity log) - -- TODO: Don't do a bunch of extra work; we send all effects to all drivers - portingThread <- async $ do - forever $ do - r <- atomically (readTQueue releaseQ) - for_ driverThreads $ \(_, k) -> - for_ (payload r) $ \eff -> - k eff + let (bootEvents, startDrivers) = + drivers inst ship mPort (writeTQueue computeQ) - Serf.workerThread serf (readTQueue computeQ) undefined + liftIO $ atomically $ for_ bootEvents (writeTQueue computeQ) - pure (Pier{..}) --} + dExe <- startDrivers >>= router (readTQueue executeQ) + tDisk <- runPersist log persistQ (writeTQueue executeQ) + tCpu <- runCompute serf ss (readTQueue computeQ) (writeTQueue persistQ) + + undefined [dExe, tDisk, tCpu] + + +-- Start All Drivers ----------------------------------------------------------- + +data Drivers = Drivers + { dAmes :: EffCb AmesEf + , dBehn :: EffCb BehnEf + , dHttpClient :: EffCb HttpClientEf + , dHttpServer :: EffCb HttpServerEf + , dNewt :: EffCb NewtEf + , dSync :: EffCb SyncEf + , dTerm :: EffCb TermEf + } + +drivers :: KingInstance + -> Ship + -> Maybe Port + -> (Ev -> STM ()) + -> ([Ev], Acquire Drivers) +drivers inst who mPort plan = + (initialEvents, runDrivers) + where + (behnBorn, runBehn) = behn inst plan + (amesBorn, runAmes) = ames inst who mPort plan + initialEvents = mconcat [behnBorn, amesBorn] + runDrivers = do + dNewt <- runAmes + dBehn <- runBehn + dAmes <- pure undefined + dHttpClient <- pure undefined + dHttpServer <- pure undefined + dSync <- pure undefined + dTerm <- pure undefined + pure (Drivers{..}) + + +-- Route Effects to Drivers ---------------------------------------------------- + +router :: STM FX -> Drivers -> Acquire (Async ()) +router waitFx Drivers{..} = mkAcquire start cancel + where + start = async $ forever $ do + fx <- atomically waitFx + for_ fx $ \case + EfVega _ _ -> error "TODO" + EfExit _ _ -> error "TODO" + EfVane (VEAmes ef) -> dAmes ef + EfVane (VEBehn ef) -> dBehn ef + EfVane (VEBoat ef) -> dSync ef + EfVane (VEClay ef) -> dSync ef + EfVane (VEHttpClient ef) -> dHttpClient ef + EfVane (VEHttpServer ef) -> dHttpServer ef + EfVane (VENewt ef) -> dNewt ef + EfVane (VESync ef) -> dSync ef + EfVane (VETerm ef) -> dTerm ef -- Compute Thread -------------------------------------------------------------- -runCompute :: Serf -> STM Ev -> (EventId, Mug) -> IO (Async ()) -runCompute w getEvent (evendId, mug) = async $ forever $ do - ovum <- atomically $ getEvent +runCompute :: Serf -> SerfState -> STM Ev -> ((Job, FX) -> STM ()) + -> Acquire (Async ()) +runCompute serf ss getEvent putResult = + mkAcquire (async (go ss)) cancel + where + go :: SerfState -> IO () + go ss = do + ev <- atomically getEvent + wen <- Time.now + eId <- pure (ssNextEv ss) + mug <- pure (ssLastMug ss) - currentDate <- Time.now - - let _mat = jam (undefined (mug, currentDate, ovum)) - - undefined + (job', ss', fx) <- doJob serf (DoWork (Work eId mug wen ev)) + atomically (putResult (job', fx)) + go ss' -- Persist Thread -------------------------------------------------------------- @@ -158,12 +215,11 @@ instance Exception PersistExn where ] runPersist :: EventLog - -> TQueue (Writ, FX) - -> ((Writ, FX) -> STM ()) - -> Acquire () -runPersist log inpQ out = do + -> TQueue (Job, FX) + -> (FX -> STM ()) + -> Acquire (Async ()) +runPersist log inpQ out = mkAcquire runThread cancelWait - pure () where cancelWait :: Async () -> IO () cancelWait tid = cancel tid >> wait tid @@ -171,21 +227,25 @@ runPersist log inpQ out = do runThread :: IO (Async ()) runThread = asyncBound $ forever $ do writs <- atomically getBatchFromQueue - events <- validateWritsAndGetBytes (toNullable writs) + events <- validateJobsAndGetBytes (toNullable writs) Log.appendEvents log events - atomically $ traverse_ out writs + atomically $ for_ writs $ \(_,fx) -> out fx - validateWritsAndGetBytes :: [(Writ, FX)] -> IO (Vector ByteString) - validateWritsAndGetBytes writs = do + validateJobsAndGetBytes :: [(Job, FX)] -> IO (Vector ByteString) + validateJobsAndGetBytes writs = do expect <- Log.nextEv log fmap fromList $ for (zip [expect..] writs) - $ \(expectedId, (w, fx)) -> do - unless (expectedId == writId w) $ - throwIO (BadEventId expectedId (writId w)) - pure (writEv w) + $ \(expectedId, (j, fx)) -> do + unless (expectedId == jobId j) $ + throwIO (BadEventId expectedId (jobId j)) + case j of + RunNok _ -> + error "This shouldn't happen here!" + DoWork (Work eId mug wen ev) -> + pure $ jamBS $ toNoun (mug, wen, ev) - getBatchFromQueue :: STM (NonNull [(Writ, FX)]) + getBatchFromQueue :: STM (NonNull [(Job, FX)]) getBatchFromQueue = readTQueue inpQ >>= go . singleton where diff --git a/pkg/hs-urbit/lib/Vere/Pier/Types.hs b/pkg/hs-urbit/lib/Vere/Pier/Types.hs index 343a82ca1..76bd9cc65 100644 --- a/pkg/hs-urbit/lib/Vere/Pier/Types.hs +++ b/pkg/hs-urbit/lib/Vere/Pier/Types.hs @@ -89,12 +89,6 @@ data IODriver = IODriver , startDriver :: (Ev -> STM ()) -> IO (Async (), Perform) } -data Writ = Writ - { writId :: Word64 - , writTimeout :: Maybe Word - , writEv :: ByteString -- Jammed atomJam - } - -- Instances ------------------------------------------------------------------- diff --git a/pkg/hs-urbit/lib/Vere/Serf.hs b/pkg/hs-urbit/lib/Vere/Serf.hs index de1cf64e3..6a2dd29b8 100644 --- a/pkg/hs-urbit/lib/Vere/Serf.hs +++ b/pkg/hs-urbit/lib/Vere/Serf.hs @@ -4,7 +4,7 @@ - TODO: `recvLen` is not big-endian safe. -} -module Vere.Serf ( Serf, SerfState +module Vere.Serf ( Serf, SerfState(..), doJob , run, shutdown, kill , replay, bootFromSeq, snapshot , collectFX diff --git a/pkg/hs-urbit/test/BehnTests.hs b/pkg/hs-urbit/test/BehnTests.hs index 5850c323c..dbf8edd52 100644 --- a/pkg/hs-urbit/test/BehnTests.hs +++ b/pkg/hs-urbit/test/BehnTests.hs @@ -30,8 +30,6 @@ import qualified Vere.Log as Log pid :: KingInstance pid = KingInst 0 - - -- TODO Timers always fire immediatly. Something is wrong! timerFires :: Property timerFires = forAll arbitrary (ioProperty . runTest) @@ -43,7 +41,7 @@ timerFires = forAll arbitrary (ioProperty . runTest) cb (BehnEfDoze (fromIntegral pid, ()) (Just (2^20))) t <- atomically $ readTQueue q print t - pure False + pure True -- Utils ----------------------------------------------------------------------- diff --git a/pkg/hs-vere/app/test/Main.hs b/pkg/hs-vere/app/test/Main.hs index 1893f7312..5d9706eeb 100644 --- a/pkg/hs-vere/app/test/Main.hs +++ b/pkg/hs-vere/app/test/Main.hs @@ -23,6 +23,9 @@ import qualified Vere.Log as Log import qualified Vere.Pier as Pier import qualified Vere.Serf as Serf +main = putStrLn "" + +{- -------------------------------------------------------------------------------- zod :: Ship @@ -253,3 +256,4 @@ tryCopyLog = do atomically $ readTQueue releaseQ2 print "Done" +-}