diff --git a/pkg/hs-urbit/lib/Vere.hs b/pkg/hs-urbit/lib/Vere.hs index 9d5ba5a9f4..9cd0b539d0 100644 --- a/pkg/hs-urbit/lib/Vere.hs +++ b/pkg/hs-urbit/lib/Vere.hs @@ -2,17 +2,18 @@ module Vere where import ClassyPrelude import Data.Void +import Data.Noun import qualified Vere.Http.Server as Server import qualified Vere.Http.Client as Client -- +vere ----------------------------------------------------------------------- data WTFIsThis - = WTFIsThis (Maybe Varience) TheActualFuckingThing + = WTFIsThis (Maybe Varience) Eff data Varience = Gold | Iron | Lead -data TheActualFuckingThing +data Eff = HttpServer Server.Eff | HttpClient Client.Eff | Behn Void @@ -24,3 +25,10 @@ data TheActualFuckingThing | Init Void | Term Void + +type Perform = Eff -> IO () + +data IODriver = IODriver + { bornEvent :: IO Noun + , startDriver :: (Noun -> STM ()) -> IO (Async (), Perform) + } diff --git a/pkg/hs-urbit/lib/Vere/Log.hs b/pkg/hs-urbit/lib/Vere/Log.hs index 719d147e0b..71f0e8719c 100644 --- a/pkg/hs-urbit/lib/Vere/Log.hs +++ b/pkg/hs-urbit/lib/Vere/Log.hs @@ -21,6 +21,7 @@ import Data.Void import Database.LMDB.Raw import Foreign.Ptr import Foreign.Marshal.Alloc +import Vere import Vere.Pier.Types import Control.Lens ((^.)) @@ -34,7 +35,7 @@ import qualified Data.Vector.Mutable as MV -------------------------------------------------------------------------------- -- TODO: Handle throws on the async -init :: FilePath -> TQueue (Writ [Effect]) -> (Writ [Effect] -> STM ()) +init :: FilePath -> TQueue (Writ [Eff]) -> (Writ [Eff] -> STM ()) -> IO LogState init dir inp cb = do env <- mdb_env_create @@ -130,8 +131,8 @@ withWordPtr w cb = do -- TODO: We need to be able to send back an exception to the main thread on an -- exception on the persistence thread. persistThread :: MDB_env - -> TQueue (Writ [Effect]) - -> (Writ [Effect] -> STM ()) + -> TQueue (Writ [Eff]) + -> (Writ [Eff] -> STM ()) -> IO (Async ()) persistThread env inputQueue onPersist = asyncBound $ forever do diff --git a/pkg/hs-urbit/lib/Vere/Pier.hs b/pkg/hs-urbit/lib/Vere/Pier.hs index 185419f0fa..010a24984a 100644 --- a/pkg/hs-urbit/lib/Vere/Pier.hs +++ b/pkg/hs-urbit/lib/Vere/Pier.hs @@ -1,13 +1,21 @@ module Vere.Pier where import ClassyPrelude + +import Data.Noun +import Data.Noun.Pill +import Vere import Vere.Pier.Types + import qualified Vere.Log as Log +import qualified Vere.Worker as Worker +ioDrivers = [] :: [IODriver] --- This is ugly and wrong -newPier :: FilePath -> LogIdentity -> IO Pier -newPier top id = do +-- This is called to make a freshly booted pier. It assigns an identity to an +-- event log and takes a chill pill. +newPier :: Pill -> FilePath -> LogIdentity -> IO Pier +newPier pill top id = do let logPath = top <> "/log" computeQueue <- newTQueueIO @@ -19,22 +27,67 @@ newPier top id = do -- input/output queues. logState <- Log.init logPath persistQueue (writeTQueue releaseQueue) + -- In first boot, we need to write this! Log.writeLogIdentity logState id - pure (Pier{..}) + let logLatestEventNumber = 0 + let getEvents = Log.readEvents logState + + workerState <- Worker.startWorkerProcess + + Worker.bootWorker workerState id pill + + performCommonPierStartup workerState computeQueue persistQueue releaseQueue logState -restartPier :: FilePath -> IO Pier -restartPier top = do +-- This reads in a pier +runPierFromDisk :: FilePath -> IO Pier +runPierFromDisk top = do let logPath = top <> "/log" computeQueue <- newTQueueIO persistQueue <- newTQueueIO releaseQueue <- newTQueueIO + -- What we really want to do is write the log identity and then do normal + -- startup, but writeLogIdentity requires a full log state including + -- input/output queues. logState <- Log.init logPath persistQueue (writeTQueue releaseQueue) - -- When we create a worker, we should take arguments indicating the identity. + -- In first boot, we need to write this! + id <- Log.readLogIdentity logState + logLatestEventNumber <- Log.latestEventNumber logState + + let getEvents = Log.readEvents logState + + workerState <- Worker.startWorkerProcess + Worker.resumeWorker workerState id logLatestEventNumber getEvents + + performCommonPierStartup workerState computeQueue persistQueue releaseQueue logState + + +performCommonPierStartup :: Worker.Worker + -> TQueue Noun + -> TQueue (Writ [Eff]) + -> TQueue (Writ [Eff]) + -> LogState + -> IO Pier +performCommonPierStartup workerState computeQueue persistQueue releaseQueue logState = do + for ioDrivers $ \x -> do + bootMessage <- bornEvent x + atomically $ writeTQueue computeQueue bootMessage + + driverThreads <- for ioDrivers $ \x -> do + startDriver x (writeTQueue computeQueue) + + -- TODO: Don't do a bunch of extra work; we send all events to all drivers + portingThread <- async $ do + forever $ do + r <- atomically (readTQueue releaseQueue) + for_ driverThreads $ \(_, k) -> + for_ (payload r) $ \eff -> + k eff + + Worker.workerThread workerState pure (Pier{..}) - diff --git a/pkg/hs-urbit/lib/Vere/Pier/Types.hs b/pkg/hs-urbit/lib/Vere/Pier/Types.hs index 46b34057b1..e03a8e197c 100644 --- a/pkg/hs-urbit/lib/Vere/Pier/Types.hs +++ b/pkg/hs-urbit/lib/Vere/Pier/Types.hs @@ -7,8 +7,8 @@ import Data.Noun.Atom import Data.Noun.Poet import Database.LMDB.Raw import Urbit.Time +import Vere -data Effect newtype Ovum = Ovum Void deriving newtype (Eq, Ord, Show, ToNoun, FromNoun) @@ -25,10 +25,12 @@ data Writ a = Writ } data Pier = Pier - { computeQueue :: TQueue (Writ Word) - , persistQueue :: TQueue (Writ [Effect]) - , releaseQueue :: TQueue (Writ [Effect]) + { computeQueue :: TQueue Noun + , persistQueue :: TQueue (Writ [Eff]) + , releaseQueue :: TQueue (Writ [Eff]) , logState :: LogState + , driverThreads :: [(Async (), Perform)] + , portingThread :: Async () } -- TODO: We are uncertain about q's type. There's some serious entanglement @@ -36,8 +38,8 @@ data Pier = Pier -- away with anything less than passing the full u3_writ around. data LogState = LogState { env :: MDB_env - , inputQueue :: TQueue (Writ [Effect]) - , onPersist :: Writ [Effect] -> STM () + , inputQueue :: TQueue (Writ [Eff]) + , onPersist :: Writ [Eff] -> STM () , writer :: Async () } diff --git a/pkg/hs-urbit/lib/Vere/Worker.hs b/pkg/hs-urbit/lib/Vere/Worker.hs index 1f8e8251eb..3f9b3cc3f0 100644 --- a/pkg/hs-urbit/lib/Vere/Worker.hs +++ b/pkg/hs-urbit/lib/Vere/Worker.hs @@ -26,10 +26,6 @@ data Worker = Worker , recvHandle :: Handle , process :: ProcessHandle - , identity :: LogIdentity - -- TODO: This shouldn't be here. - , wLogState :: LogState - -- , getInput :: STM (Writ ()) -- , onComputed :: Writ [Effect] -> STM () @@ -43,11 +39,11 @@ data Worker = Worker -- Think about how to handle process exit -- Tear down subprocess on exit? (terminiteProcess) -start :: LogIdentity -> LogState -> IO Worker -start id s = +startWorkerProcess :: IO Worker +startWorkerProcess = do (Just i, Just o, _, p) <- createProcess pSpec - pure (Worker i o p id s) + pure (Worker i o p) where pSpec = (proc "urbit-worker" []) { std_in = CreatePipe @@ -124,7 +120,8 @@ data WorkerExn | BadPleaNoun Noun | ReplacedEventDuringReplay EventId ReplacementEv | WorkerConnectionClosed - | UnexpectedInitialPlea Plea + | UnexpectedPleaOnNewShip Plea + | InvalidInitialPlea Plea deriving (Show) instance Exception WorkerExn @@ -167,18 +164,21 @@ sendAndRecv w eventId event = Stdr _ cord -> print cord >> loop Slog _ pri t -> printTank pri t >> loop -sendBootEvent :: Worker -> IO () -sendBootEvent w = do - sendAtom w $ jam $ toNoun (Cord "boot", (identity w)) +sendBootEvent :: LogIdentity -> Worker -> IO () +sendBootEvent id w = do + sendAtom w $ jam $ toNoun (Cord "boot", id) -- the ship is booted, but it is behind. shove events to the worker until it is -- caught up. -replay :: Worker -> WorkerState -> EventId +replay :: Worker + -> WorkerState + -> LogIdentity + -> EventId -> (EventId -> Word64 -> IO (Vector (EventId, Atom))) -> IO () -replay w (wid, wmug) lastCommitedId getEvents = do - when (wid == 1) (sendBootEvent w) +replay w (wid, wmug) identity lastCommitedId getEvents = do + when (wid == 1) (sendBootEvent identity w) loop wid where @@ -195,25 +195,44 @@ replay w (wid, wmug) lastCommitedId getEvents = do loop (curEvent + toRead) -startPier :: Worker -> IO (EventId) -startPier w = + +bootWorker :: Worker + -> LogIdentity + -> Pill + -> IO () +bootWorker w identity pill = + do + recvPlea w >>= \case + Play Nil -> pure () + x@(Play _) -> throwIO (UnexpectedPleaOnNewShip x) + x -> throwIO (InvalidInitialPlea x) + + -- TODO: actually boot the pill + undefined + + requestSnapshot w + + -- Maybe return the current event id ? But we'll have to figure that out + -- later. + pure () + +resumeWorker :: Worker + -> LogIdentity + -> EventId + -> (EventId -> Word64 -> IO (Vector (EventId, Atom))) + -> IO () +resumeWorker w identity logLatestEventNumber eventFetcher = do ws@(eventId, mug) <- recvPlea w >>= \case Play Nil -> pure (1, Mug 0) Play (NotNil (e, m, _)) -> pure (e, m) - x -> throwIO (UnexpectedInitialPlea x) + x -> throwIO (InvalidInitialPlea x) - logLatestEventNumber <- Log.latestEventNumber (wLogState w) - - when (logLatestEventNumber == 0) $ do - -- todo: boot. we need a pill. - undefined - - replay w ws logLatestEventNumber (Log.readEvents (wLogState w)) + replay w ws identity logLatestEventNumber eventFetcher requestSnapshot w - pure (logLatestEventNumber) + pure () workerThread :: Worker -> IO (Async ()) workerThread w = undefined