From 4f52382a757729f02b48553780bcef798feb4678 Mon Sep 17 00:00:00 2001 From: Elliot Glaysher Date: Tue, 18 Jun 2019 17:04:57 -0700 Subject: [PATCH] Various Fixes and Improvements. --- pkg/hs-urbit/lib/Vere.hs | 28 +--------------- pkg/hs-urbit/lib/Vere/Http.hs | 6 ++++ pkg/hs-urbit/lib/Vere/Http/Client.hs | 1 + pkg/hs-urbit/lib/Vere/Http/Server.hs | 11 ++++-- pkg/hs-urbit/lib/Vere/Pier.hs | 4 +-- pkg/hs-urbit/lib/Vere/Pier/Types.hs | 50 +++++++++++++++++++++++++--- pkg/hs-urbit/lib/Vere/Worker.hs | 44 ++++++++++++++++-------- 7 files changed, 95 insertions(+), 49 deletions(-) diff --git a/pkg/hs-urbit/lib/Vere.hs b/pkg/hs-urbit/lib/Vere.hs index 9cd0b539d0..5a68023aad 100644 --- a/pkg/hs-urbit/lib/Vere.hs +++ b/pkg/hs-urbit/lib/Vere.hs @@ -3,32 +3,6 @@ module Vere where import ClassyPrelude import Data.Void import Data.Noun -import qualified Vere.Http.Server as Server -import qualified Vere.Http.Client as Client +import Vere.Pier.Types -- +vere ----------------------------------------------------------------------- - -data WTFIsThis - = WTFIsThis (Maybe Varience) Eff - -data Varience = Gold | Iron | Lead - -data Eff - = HttpServer Server.Eff - | HttpClient Client.Eff - | Behn Void - | Clay Void - | Boat Void - | Sync Void - | Newt Void - | Ames Void - | Init Void - | Term Void - - -type Perform = Eff -> IO () - -data IODriver = IODriver - { bornEvent :: IO Noun - , startDriver :: (Noun -> STM ()) -> IO (Async (), Perform) - } diff --git a/pkg/hs-urbit/lib/Vere/Http.hs b/pkg/hs-urbit/lib/Vere/Http.hs index 6d034840a6..0b1197f6af 100644 --- a/pkg/hs-urbit/lib/Vere/Http.hs +++ b/pkg/hs-urbit/lib/Vere/Http.hs @@ -10,6 +10,7 @@ import qualified Network.HTTP.Types as HT import qualified Network.HTTP.Types.Method as H data Header = Header Text Text + deriving (Eq, Ord, Show) type Method = H.StdMethod @@ -19,17 +20,22 @@ data Request = Request , headerList :: [Header] , body :: Maybe ByteString } + deriving (Eq, Ord, Show) data ResponseHeader = ResponseHeader { statusCode :: Int , headers :: [Header] } + deriving (Eq, Ord, Show) + data Event = Started ResponseHeader -- [%start hdr (unit octs) ?] | Received ByteString -- [%continue [~ octs] %.n] | Done -- [%continue ~ %.y] | Canceled -- %cancel | Failed Text -- %cancel + deriving (Eq, Ord, Show) + convertHeaders :: [HT.Header] -> [Header] convertHeaders = fmap f diff --git a/pkg/hs-urbit/lib/Vere/Http/Client.hs b/pkg/hs-urbit/lib/Vere/Http/Client.hs index 2261c1ab4b..247b416706 100644 --- a/pkg/hs-urbit/lib/Vere/Http/Client.hs +++ b/pkg/hs-urbit/lib/Vere/Http/Client.hs @@ -21,6 +21,7 @@ data Ev = Receive ReqId Event -- [%receive @ todo] data Eff = NewReq ReqId Request -- [%request @ todo] | CancelReq ReqId -- [%cancel-request @] + deriving (Eq, Ord, Show) data State = State { sManager :: H.Manager diff --git a/pkg/hs-urbit/lib/Vere/Http/Server.hs b/pkg/hs-urbit/lib/Vere/Http/Server.hs index 2e1586a29e..bc1ccb82b7 100644 --- a/pkg/hs-urbit/lib/Vere/Http/Server.hs +++ b/pkg/hs-urbit/lib/Vere/Http/Server.hs @@ -21,11 +21,13 @@ type ConnectionId = Word type RequestId = Word data Eff = Eff ServerId ConnectionId RequestId ServerRequest + deriving (Eq, Ord, Show) -- | An http server effect is configuration, or it sends an outbound response data ServerRequest = SetConfig Config | Response Event + deriving (Eq, Ord, Show) data Config = Config { secure :: Maybe (Key, Cert) @@ -33,14 +35,17 @@ data Config = Config , log :: Bool , redirect :: Bool } + deriving (Eq, Ord, Show) + -- Note: We need to parse PEM-encoded RSA private keys and cert or cert chain -- from Wain -newtype Key = Key PEM -newtype Cert = Cert PEM +type Key = PEM +type Cert = PEM data Wain = Wain [Text] newtype PEM = PEM ByteString + deriving newtype (Eq, Ord, Show) data ClientResponse = Progress ResponseHeader Int (Maybe Int) (Maybe ByteString) @@ -80,7 +85,7 @@ startServer :: State -> Config -> IO () startServer s c = do tls <- case (secure c) of Nothing -> error "no wai" - Just (Key (PEM key), Cert (PEM cert)) -> + Just (PEM key, PEM cert) -> pure (W.tlsSettingsMemory cert key) -- we need to do the dance where we do the socket checking dance. or shove a diff --git a/pkg/hs-urbit/lib/Vere/Pier.hs b/pkg/hs-urbit/lib/Vere/Pier.hs index 010a24984a..3a9ae2b968 100644 --- a/pkg/hs-urbit/lib/Vere/Pier.hs +++ b/pkg/hs-urbit/lib/Vere/Pier.hs @@ -67,7 +67,7 @@ runPierFromDisk top = do performCommonPierStartup :: Worker.Worker - -> TQueue Noun + -> TQueue Ovum -> TQueue (Writ [Eff]) -> TQueue (Writ [Eff]) -> LogState @@ -88,6 +88,6 @@ performCommonPierStartup workerState computeQueue persistQueue releaseQueue logS for_ (payload r) $ \eff -> k eff - Worker.workerThread workerState + Worker.workerThread workerState (readTQueue computeQueue) undefined pure (Pier{..}) diff --git a/pkg/hs-urbit/lib/Vere/Pier/Types.hs b/pkg/hs-urbit/lib/Vere/Pier/Types.hs index e03a8e197c..20128806d6 100644 --- a/pkg/hs-urbit/lib/Vere/Pier/Types.hs +++ b/pkg/hs-urbit/lib/Vere/Pier/Types.hs @@ -7,16 +7,58 @@ import Data.Noun.Atom import Data.Noun.Poet import Database.LMDB.Raw import Urbit.Time -import Vere -newtype Ovum = Ovum Void - deriving newtype (Eq, Ord, Show, ToNoun, FromNoun) +import qualified Vere.Http.Server as Server +import qualified Vere.Http.Client as Client + +data WTFIsThis + = WTFIsThis (Maybe Varience) Eff + +data Event + = BehnBorn + | HttpBorn + | CttpBorn + deriving (Eq, Ord, Show) + +data Eff + = HttpServer Server.Eff + | HttpClient Client.Eff + | Behn Void + | Clay Void + | Boat Void + | Sync Void + | Newt Void + | Ames Void + | Init Void + | Term Void + deriving (Eq, Ord, Show) + +instance ToNoun Eff where + +instance FromNoun Eff where + + +data Varience = Gold | Iron | Lead + +type Perform = Eff -> IO () + +newtype Path = Path [Text] + deriving (Eq, Ord, Show) + +data Ovum = Ovum Path Event + deriving (Eq, Ord, Show, ToNoun, FromNoun) newtype Mug = Mug Word32 deriving newtype (Eq, Ord, Show, ToNoun, FromNoun) newtype Jam = Jam Atom +data IODriver = IODriver + { bornEvent :: IO Ovum + , startDriver :: (Ovum -> STM ()) -> IO (Async (), Perform) + } + + data Writ a = Writ { eventId :: Word64 , timeout :: Maybe Word @@ -25,7 +67,7 @@ data Writ a = Writ } data Pier = Pier - { computeQueue :: TQueue Noun + { computeQueue :: TQueue Ovum , persistQueue :: TQueue (Writ [Eff]) , releaseQueue :: TQueue (Writ [Eff]) , logState :: LogState diff --git a/pkg/hs-urbit/lib/Vere/Worker.hs b/pkg/hs-urbit/lib/Vere/Worker.hs index 3f9b3cc3f0..a9354194ab 100644 --- a/pkg/hs-urbit/lib/Vere/Worker.hs +++ b/pkg/hs-urbit/lib/Vere/Worker.hs @@ -14,6 +14,8 @@ import Data.Noun.Pill import Vere.Pier.Types import System.Process +import qualified Urbit.Time as Time + import Data.ByteString (hGet) import Data.ByteString.Unsafe (unsafeUseAsCString) import Foreign.Ptr (castPtr) @@ -76,7 +78,7 @@ type Play = Nullable (EventId, Mug, ShipId) data Plea = Play Play | Work EventId Mug Job - | Done EventId Mug [Ovum] + | Done EventId Mug [Eff] | Stdr EventId Cord | Slog EventId Word32 Tank deriving (Eq, Show) @@ -107,7 +109,7 @@ type NextEventId = Word64 type WorkerState = (EventId, Mug) type ReplacementEv = (EventId, Mug, Job) -type WorkResult = (EventId, Mug, [Ovum]) +type WorkResult = (EventId, Mug, [Eff]) type WorkerResp = (Either ReplacementEv WorkResult) -- Exceptions ------------------------------------------------------------------ @@ -176,24 +178,26 @@ replay :: Worker -> LogIdentity -> EventId -> (EventId -> Word64 -> IO (Vector (EventId, Atom))) - -> IO () + -> IO (EventId, Mug) replay w (wid, wmug) identity lastCommitedId getEvents = do when (wid == 1) (sendBootEvent identity w) - loop wid + vLast <- newIORef (wid, wmug) + loop vLast wid + readIORef vLast where -- Replay events in batches of 1000. - loop curEvent = do + loop vLast curEvent = do let toRead = min 1000 (1 + lastCommitedId - curEvent) when (toRead > 0) do events <- getEvents curEvent toRead for_ events $ \(eventId, event) -> do sendAndRecv w eventId event >>= \case - (Left ev) -> throwIO (ReplacedEventDuringReplay eventId ev) - (Right _) -> pure () + Left ev -> throwIO (ReplacedEventDuringReplay eventId ev) + Right (id, mug, _) -> writeIORef vLast (id, mug) - loop (curEvent + toRead) + loop vLast (curEvent + toRead) bootWorker :: Worker @@ -220,7 +224,7 @@ resumeWorker :: Worker -> LogIdentity -> EventId -> (EventId -> Word64 -> IO (Vector (EventId, Atom))) - -> IO () + -> IO (EventId, Mug) resumeWorker w identity logLatestEventNumber eventFetcher = do ws@(eventId, mug) <- recvPlea w >>= \case @@ -228,14 +232,28 @@ resumeWorker w identity logLatestEventNumber eventFetcher = Play (NotNil (e, m, _)) -> pure (e, m) x -> throwIO (InvalidInitialPlea x) - replay w ws identity logLatestEventNumber eventFetcher + r <- replay w ws identity logLatestEventNumber eventFetcher requestSnapshot w - pure () + pure r -workerThread :: Worker -> IO (Async ()) -workerThread w = undefined +workerThread :: Worker -> STM Ovum -> (EventId, Mug) -> IO (Async ()) +workerThread w getEvent (evendId, mug) = async $ forever do + ovum <- atomically $ getEvent + + currentDate <- Time.now + + let mat = jam (undefined (mug, currentDate, ovum)) + + undefined + + -- Writ (eventId + 1) Nothing mat + -- -- assign a new event id. + -- -- assign a date + -- -- get current mug state + -- -- (jam [mug event]) + -- sendAndRecv requestSnapshot :: Worker -> IO () requestSnapshot w = undefined