More of pier startup factored correctly.

This commit is contained in:
Elliot Glaysher 2019-06-18 15:38:24 -07:00
parent 8a16fdd864
commit 7caadf43bc
5 changed files with 127 additions and 44 deletions

View File

@ -2,17 +2,18 @@ module Vere where
import ClassyPrelude import ClassyPrelude
import Data.Void import Data.Void
import Data.Noun
import qualified Vere.Http.Server as Server import qualified Vere.Http.Server as Server
import qualified Vere.Http.Client as Client import qualified Vere.Http.Client as Client
-- +vere ----------------------------------------------------------------------- -- +vere -----------------------------------------------------------------------
data WTFIsThis data WTFIsThis
= WTFIsThis (Maybe Varience) TheActualFuckingThing = WTFIsThis (Maybe Varience) Eff
data Varience = Gold | Iron | Lead data Varience = Gold | Iron | Lead
data TheActualFuckingThing data Eff
= HttpServer Server.Eff = HttpServer Server.Eff
| HttpClient Client.Eff | HttpClient Client.Eff
| Behn Void | Behn Void
@ -24,3 +25,10 @@ data TheActualFuckingThing
| Init Void | Init Void
| Term Void | Term Void
type Perform = Eff -> IO ()
data IODriver = IODriver
{ bornEvent :: IO Noun
, startDriver :: (Noun -> STM ()) -> IO (Async (), Perform)
}

View File

@ -21,6 +21,7 @@ import Data.Void
import Database.LMDB.Raw import Database.LMDB.Raw
import Foreign.Ptr import Foreign.Ptr
import Foreign.Marshal.Alloc import Foreign.Marshal.Alloc
import Vere
import Vere.Pier.Types import Vere.Pier.Types
import Control.Lens ((^.)) import Control.Lens ((^.))
@ -34,7 +35,7 @@ import qualified Data.Vector.Mutable as MV
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
-- TODO: Handle throws on the async -- TODO: Handle throws on the async
init :: FilePath -> TQueue (Writ [Effect]) -> (Writ [Effect] -> STM ()) init :: FilePath -> TQueue (Writ [Eff]) -> (Writ [Eff] -> STM ())
-> IO LogState -> IO LogState
init dir inp cb = do init dir inp cb = do
env <- mdb_env_create env <- mdb_env_create
@ -130,8 +131,8 @@ withWordPtr w cb = do
-- TODO: We need to be able to send back an exception to the main thread on an -- TODO: We need to be able to send back an exception to the main thread on an
-- exception on the persistence thread. -- exception on the persistence thread.
persistThread :: MDB_env persistThread :: MDB_env
-> TQueue (Writ [Effect]) -> TQueue (Writ [Eff])
-> (Writ [Effect] -> STM ()) -> (Writ [Eff] -> STM ())
-> IO (Async ()) -> IO (Async ())
persistThread env inputQueue onPersist = asyncBound $ persistThread env inputQueue onPersist = asyncBound $
forever do forever do

View File

@ -1,13 +1,21 @@
module Vere.Pier where module Vere.Pier where
import ClassyPrelude import ClassyPrelude
import Data.Noun
import Data.Noun.Pill
import Vere
import Vere.Pier.Types import Vere.Pier.Types
import qualified Vere.Log as Log import qualified Vere.Log as Log
import qualified Vere.Worker as Worker
ioDrivers = [] :: [IODriver]
-- This is ugly and wrong -- This is called to make a freshly booted pier. It assigns an identity to an
newPier :: FilePath -> LogIdentity -> IO Pier -- event log and takes a chill pill.
newPier top id = do newPier :: Pill -> FilePath -> LogIdentity -> IO Pier
newPier pill top id = do
let logPath = top <> "/log" let logPath = top <> "/log"
computeQueue <- newTQueueIO computeQueue <- newTQueueIO
@ -19,22 +27,67 @@ newPier top id = do
-- input/output queues. -- input/output queues.
logState <- Log.init logPath persistQueue (writeTQueue releaseQueue) logState <- Log.init logPath persistQueue (writeTQueue releaseQueue)
-- In first boot, we need to write this!
Log.writeLogIdentity logState id Log.writeLogIdentity logState id
pure (Pier{..}) let logLatestEventNumber = 0
let getEvents = Log.readEvents logState
workerState <- Worker.startWorkerProcess
Worker.bootWorker workerState id pill
performCommonPierStartup workerState computeQueue persistQueue releaseQueue logState
restartPier :: FilePath -> IO Pier -- This reads in a pier
restartPier top = do runPierFromDisk :: FilePath -> IO Pier
runPierFromDisk top = do
let logPath = top <> "/log" let logPath = top <> "/log"
computeQueue <- newTQueueIO computeQueue <- newTQueueIO
persistQueue <- newTQueueIO persistQueue <- newTQueueIO
releaseQueue <- newTQueueIO releaseQueue <- newTQueueIO
-- What we really want to do is write the log identity and then do normal
-- startup, but writeLogIdentity requires a full log state including
-- input/output queues.
logState <- Log.init logPath persistQueue (writeTQueue releaseQueue) logState <- Log.init logPath persistQueue (writeTQueue releaseQueue)
-- When we create a worker, we should take arguments indicating the identity. -- In first boot, we need to write this!
id <- Log.readLogIdentity logState
logLatestEventNumber <- Log.latestEventNumber logState
let getEvents = Log.readEvents logState
workerState <- Worker.startWorkerProcess
Worker.resumeWorker workerState id logLatestEventNumber getEvents
performCommonPierStartup workerState computeQueue persistQueue releaseQueue logState
performCommonPierStartup :: Worker.Worker
-> TQueue Noun
-> TQueue (Writ [Eff])
-> TQueue (Writ [Eff])
-> LogState
-> IO Pier
performCommonPierStartup workerState computeQueue persistQueue releaseQueue logState = do
for ioDrivers $ \x -> do
bootMessage <- bornEvent x
atomically $ writeTQueue computeQueue bootMessage
driverThreads <- for ioDrivers $ \x -> do
startDriver x (writeTQueue computeQueue)
-- TODO: Don't do a bunch of extra work; we send all events to all drivers
portingThread <- async $ do
forever $ do
r <- atomically (readTQueue releaseQueue)
for_ driverThreads $ \(_, k) ->
for_ (payload r) $ \eff ->
k eff
Worker.workerThread workerState
pure (Pier{..}) pure (Pier{..})

View File

@ -7,8 +7,8 @@ import Data.Noun.Atom
import Data.Noun.Poet import Data.Noun.Poet
import Database.LMDB.Raw import Database.LMDB.Raw
import Urbit.Time import Urbit.Time
import Vere
data Effect
newtype Ovum = Ovum Void newtype Ovum = Ovum Void
deriving newtype (Eq, Ord, Show, ToNoun, FromNoun) deriving newtype (Eq, Ord, Show, ToNoun, FromNoun)
@ -25,10 +25,12 @@ data Writ a = Writ
} }
data Pier = Pier data Pier = Pier
{ computeQueue :: TQueue (Writ Word) { computeQueue :: TQueue Noun
, persistQueue :: TQueue (Writ [Effect]) , persistQueue :: TQueue (Writ [Eff])
, releaseQueue :: TQueue (Writ [Effect]) , releaseQueue :: TQueue (Writ [Eff])
, logState :: LogState , logState :: LogState
, driverThreads :: [(Async (), Perform)]
, portingThread :: Async ()
} }
-- TODO: We are uncertain about q's type. There's some serious entanglement -- TODO: We are uncertain about q's type. There's some serious entanglement
@ -36,8 +38,8 @@ data Pier = Pier
-- away with anything less than passing the full u3_writ around. -- away with anything less than passing the full u3_writ around.
data LogState = LogState data LogState = LogState
{ env :: MDB_env { env :: MDB_env
, inputQueue :: TQueue (Writ [Effect]) , inputQueue :: TQueue (Writ [Eff])
, onPersist :: Writ [Effect] -> STM () , onPersist :: Writ [Eff] -> STM ()
, writer :: Async () , writer :: Async ()
} }

View File

@ -26,10 +26,6 @@ data Worker = Worker
, recvHandle :: Handle , recvHandle :: Handle
, process :: ProcessHandle , process :: ProcessHandle
, identity :: LogIdentity
-- TODO: This shouldn't be here.
, wLogState :: LogState
-- , getInput :: STM (Writ ()) -- , getInput :: STM (Writ ())
-- , onComputed :: Writ [Effect] -> STM () -- , onComputed :: Writ [Effect] -> STM ()
@ -43,11 +39,11 @@ data Worker = Worker
-- Think about how to handle process exit -- Think about how to handle process exit
-- Tear down subprocess on exit? (terminiteProcess) -- Tear down subprocess on exit? (terminiteProcess)
start :: LogIdentity -> LogState -> IO Worker startWorkerProcess :: IO Worker
start id s = startWorkerProcess =
do do
(Just i, Just o, _, p) <- createProcess pSpec (Just i, Just o, _, p) <- createProcess pSpec
pure (Worker i o p id s) pure (Worker i o p)
where where
pSpec = pSpec =
(proc "urbit-worker" []) { std_in = CreatePipe (proc "urbit-worker" []) { std_in = CreatePipe
@ -124,7 +120,8 @@ data WorkerExn
| BadPleaNoun Noun | BadPleaNoun Noun
| ReplacedEventDuringReplay EventId ReplacementEv | ReplacedEventDuringReplay EventId ReplacementEv
| WorkerConnectionClosed | WorkerConnectionClosed
| UnexpectedInitialPlea Plea | UnexpectedPleaOnNewShip Plea
| InvalidInitialPlea Plea
deriving (Show) deriving (Show)
instance Exception WorkerExn instance Exception WorkerExn
@ -167,18 +164,21 @@ sendAndRecv w eventId event =
Stdr _ cord -> print cord >> loop Stdr _ cord -> print cord >> loop
Slog _ pri t -> printTank pri t >> loop Slog _ pri t -> printTank pri t >> loop
sendBootEvent :: Worker -> IO () sendBootEvent :: LogIdentity -> Worker -> IO ()
sendBootEvent w = do sendBootEvent id w = do
sendAtom w $ jam $ toNoun (Cord "boot", (identity w)) sendAtom w $ jam $ toNoun (Cord "boot", id)
-- the ship is booted, but it is behind. shove events to the worker until it is -- the ship is booted, but it is behind. shove events to the worker until it is
-- caught up. -- caught up.
replay :: Worker -> WorkerState -> EventId replay :: Worker
-> WorkerState
-> LogIdentity
-> EventId
-> (EventId -> Word64 -> IO (Vector (EventId, Atom))) -> (EventId -> Word64 -> IO (Vector (EventId, Atom)))
-> IO () -> IO ()
replay w (wid, wmug) lastCommitedId getEvents = do replay w (wid, wmug) identity lastCommitedId getEvents = do
when (wid == 1) (sendBootEvent w) when (wid == 1) (sendBootEvent identity w)
loop wid loop wid
where where
@ -195,25 +195,44 @@ replay w (wid, wmug) lastCommitedId getEvents = do
loop (curEvent + toRead) loop (curEvent + toRead)
startPier :: Worker -> IO (EventId)
startPier w = bootWorker :: Worker
-> LogIdentity
-> Pill
-> IO ()
bootWorker w identity pill =
do
recvPlea w >>= \case
Play Nil -> pure ()
x@(Play _) -> throwIO (UnexpectedPleaOnNewShip x)
x -> throwIO (InvalidInitialPlea x)
-- TODO: actually boot the pill
undefined
requestSnapshot w
-- Maybe return the current event id ? But we'll have to figure that out
-- later.
pure ()
resumeWorker :: Worker
-> LogIdentity
-> EventId
-> (EventId -> Word64 -> IO (Vector (EventId, Atom)))
-> IO ()
resumeWorker w identity logLatestEventNumber eventFetcher =
do do
ws@(eventId, mug) <- recvPlea w >>= \case ws@(eventId, mug) <- recvPlea w >>= \case
Play Nil -> pure (1, Mug 0) Play Nil -> pure (1, Mug 0)
Play (NotNil (e, m, _)) -> pure (e, m) Play (NotNil (e, m, _)) -> pure (e, m)
x -> throwIO (UnexpectedInitialPlea x) x -> throwIO (InvalidInitialPlea x)
logLatestEventNumber <- Log.latestEventNumber (wLogState w) replay w ws identity logLatestEventNumber eventFetcher
when (logLatestEventNumber == 0) $ do
-- todo: boot. we need a pill.
undefined
replay w ws logLatestEventNumber (Log.readEvents (wLogState w))
requestSnapshot w requestSnapshot w
pure (logLatestEventNumber) pure ()
workerThread :: Worker -> IO (Async ()) workerThread :: Worker -> IO (Async ())
workerThread w = undefined workerThread w = undefined