FX cleanup, collect FX, Vere.Log works in ByteStrings instead of Atoms.

This commit is contained in:
Benjamin Summers 2019-07-21 12:56:18 -07:00
parent 4685ab3ce6
commit dd27be941a
10 changed files with 357 additions and 536 deletions

View File

@ -4,6 +4,7 @@ module Noun.Conversions
, Cord(..), Knot(..), Term(..), Tape(..), Tour(..)
, Tank(..), Tang, Plum(..)
, Mug(..), Path(..), Ship(..)
, Lenient(..)
) where
import ClassyPrelude hiding (hash)
@ -113,6 +114,22 @@ instance (FromNoun a, FromNoun c) => FromNoun (AtomCell a c) where
Cell _ _ -> ACCell <$> parseNoun n
-- Lenient ---------------------------------------------------------------------
data Lenient a
= FailParse Noun
| GoodParse a
deriving (Eq, Ord, Show)
instance FromNoun a => FromNoun (Lenient a) where
parseNoun n =
(GoodParse <$> parseNoun n) <|> pure (FailParse n)
instance ToNoun a => ToNoun (Lenient a) where
toNoun (FailParse n) = n
toNoun (GoodParse x) = toNoun x
-- Nullable --------------------------------------------------------------------
{-|

View File

@ -1,4 +1,4 @@
module Noun.Cue (cue, cueExn, cueBS, DecodeErr) where
module Noun.Cue (cue, cueExn, cueBS, cueBSExn, DecodeErr) where
import ClassyPrelude
@ -24,14 +24,17 @@ import qualified Data.Vector.Primitive as VP
cueBS :: ByteString -> Either DecodeErr Noun
cueBS = doGet dNoun
cueBSExn :: ByteString -> IO Noun
cueBSExn bs =
cueBS bs & \case
Left e -> throwIO e
Right x -> pure x
cue :: Atom -> Either DecodeErr Noun
cue = cueBS . view atomBytes
cueExn :: Atom -> IO Noun
cueExn atm =
cueBS (atm ^. atomBytes) & \case
Left e -> throwIO e
Right x -> pure x
cueExn atm = cueBSExn (atm ^. atomBytes)
-- Debugging -------------------------------------------------------------------

View File

@ -0,0 +1,67 @@
module Vere.FX(FX, Eff(..), Blit(..), Varience(..), PutDel(..)) where
import UrbitPrelude hiding (Term)
import Urbit.Time
import Vere.Ovum
import qualified Vere.Ames as Ames
import qualified Vere.Http.Client as Client
import qualified Vere.Http.Server as Server
--------------------------------------------------------------------------------
data PutDel = PDPut | PDDel
deriving (Eq, Ord, Show)
type FX = [(Path, Lenient Eff)]
data Eff
= EHttpServer Server.Eff
| EHttpClient Client.Eff
| EAmes Ames.Eff
| EBbye Noun
| EBehn Noun
| EBlit [Blit]
| EBoat Noun
| EClay Noun
| ECrud Noun
| EDirk Noun
| EDoze (Maybe Wen)
| EErgo Noun
| EExit Noun
| EFlog Noun
| EForm Noun
| EHill [Term]
| EInit
| ELogo Noun
| EMass Noun
| ENewt Noun
| EOgre Noun
| ESend [Blit]
| ESync Noun
| ETerm Noun
| EThou Noun
| ETurf (Maybe (PutDel, [Text])) -- TODO Unsure
| EVega Noun
| EWest Noun
| EWoot Noun
deriving (Eq, Ord, Show)
data Blit
= Bel
| Clr
| Hop Word64
| Lin [Char]
| Mor
| Sag Path Noun
| Sav Path Atom
| Url Text
deriving (Eq, Ord, Show)
data Varience = Gold | Iron | Lead
deriveNoun ''Blit
deriveNoun ''Eff
deriveNoun ''PutDel
deriveNoun ''Varience

View File

@ -48,7 +48,7 @@ lastEv = readIORef . numEvents
data EventLogExn
= NoLogIdentity
| MissingEvent EventId
| BadNounInLogIdentity Atom
| BadNounInLogIdentity ByteString DecodeErr ByteString
| BadKeyInEventLog
| BadWriteLogIdentity LogIdentity
| BadWriteEvent EventId
@ -196,7 +196,7 @@ clearEvents env eventsTbl =
loop
loop
appendEvents :: EventLog -> Vector Atom -> IO ()
appendEvents :: EventLog -> Vector ByteString -> IO ()
appendEvents log !events = do
numEvs <- readIORef (numEvents log)
next <- nextEv log
@ -209,15 +209,12 @@ appendEvents log !events = do
for_ kvs $ \(k,v) -> do
putEvent flags txn (eventsTbl log) k v >>= \case
True -> pure ()
False -> do traceM "event write failed, trying to cue"
n <- cueExn v
traceM "finished cue"
throwIO (BadWriteEvent k)
False -> throwIO (BadWriteEvent k)
-- Read Events -----------------------------------------------------------------
streamEvents :: EventLog -> Word64 -> ConduitT () Atom IO ()
streamEvents :: EventLog -> Word64 -> ConduitT () ByteString IO ()
streamEvents log first = do
last <- liftIO $ lastEv log
traceM ("streamEvents: " <> show (first, last))
@ -226,7 +223,7 @@ streamEvents log first = do
for_ batch yield
streamEvents log (first + word (length batch))
readBatch :: EventLog -> Word64 -> IO (V.Vector Atom)
readBatch :: EventLog -> Word64 -> IO (V.Vector ByteString)
readBatch log first = start
where
start = do
@ -250,7 +247,7 @@ readBatch log first = start
fetchRows count cur pKey pVal = do
V.generateM count $ \i -> do
key <- peek pKey >>= mdbValToWord64
val <- peek pVal >>= mdbValToAtom
val <- peek pVal >>= mdbValToBytes
idx <- pure (first + word i)
unless (key == idx) $ throwIO $ MissingEvent idx
when (count /= succ i) $ do
@ -270,8 +267,8 @@ assertExn :: Exception e => Bool -> e -> IO ()
assertExn True _ = pure ()
assertExn False e = throwIO e
maybeExn :: Exception e => Maybe a -> e -> IO a
maybeExn mb exn = maybe (throwIO exn) pure mb
eitherExn :: Exception e => Either a b -> (a -> e) -> IO b
eitherExn eat exn = either (throwIO . exn) pure eat
byteStringAsMdbVal :: ByteString -> (MDB_val -> IO a) -> IO a
byteStringAsMdbVal bs k =
@ -298,18 +295,17 @@ withWordPtr w cb = do
getMb :: Txn -> Dbi -> ByteString -> IO (Maybe Noun)
getMb txn db key =
byteStringAsMdbVal key $ \mKey ->
mdb_get txn db mKey >>= traverse mdbValToNoun
mdb_get txn db mKey >>= traverse (mdbValToNoun key)
mdbValToAtom :: MDB_val -> IO Atom
mdbValToAtom (MDB_val sz ptr) = do
bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
pure (bs ^. from atomBytes)
mdbValToBytes :: MDB_val -> IO ByteString
mdbValToBytes (MDB_val sz ptr) = do
BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
mdbValToNoun :: MDB_val -> IO Noun
mdbValToNoun (MDB_val sz ptr) = do
mdbValToNoun :: ByteString -> MDB_val -> IO Noun
mdbValToNoun key (MDB_val sz ptr) = do
bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
let res = bs ^? _Cue
maybeExn res (BadNounInLogIdentity (bs ^. from atomBytes))
let res = cueBS bs
eitherExn res (\err -> BadNounInLogIdentity key err bs)
putNoun :: MDB_WriteFlags -> Txn -> Dbi -> ByteString -> Noun -> IO Bool
putNoun flags txn db key val =
@ -317,10 +313,9 @@ putNoun flags txn db key val =
byteStringAsMdbVal (jamBS val) $ \mVal ->
mdb_put flags txn db mKey mVal
putEvent :: MDB_WriteFlags -> Txn -> Dbi -> Word64 -> Atom -> IO Bool
putEvent flags txn db id atom = do
putEvent :: MDB_WriteFlags -> Txn -> Dbi -> Word64 -> ByteString -> IO Bool
putEvent flags txn db id bs = do
withWord64AsMDBval id $ \idVal -> do
let !bs = atom ^. atomBytes
traceM ("putEvent: " <> show (id, length bs))
byteStringAsMdbVal bs $ \mVal -> do
mdb_put flags txn db idVal mVal

View File

@ -15,9 +15,6 @@ newtype FileOcts = FileOcts ByteString
newtype BigTape = BigTape Text
deriving newtype (Eq, Ord, ToNoun, FromNoun)
newtype Nock = Nock Noun
deriving newtype (Eq, Ord, FromNoun, ToNoun)
newtype Desk = Desk Text
deriving newtype (Eq, Ord, Show, ToNoun, FromNoun)
@ -125,7 +122,12 @@ deriveNoun ''Dawn
-- HTTP ------------------------------------------------------------------------
newtype PEM = PEM Cord
deriving newtype (Eq, Ord, Show, ToNoun, FromNoun)
type ServerId = Atom
type Key = PEM
type Cert = PEM
data Address
= AIpv4 Atom -- @if
@ -133,6 +135,14 @@ data Address
| AAmes Atom -- @p
deriving (Eq, Ord, Show)
data ServerConfig = ServerConfig
{ secure :: Maybe (Key, Cert)
, proxy :: Bool
, log :: Bool
, redirect :: Bool
}
deriving (Eq, Ord, Show)
data ResponseHeader = ResponseHeader
{ rhStatus :: Word
, rhHeaders :: [(Text, Text)]
@ -160,11 +170,13 @@ data HttpClient
deriving (Eq, Ord, Show)
data HttpServer
= HttpServerRequest (Atom, Word, Word, ()) ServerId Address HttpRequest
| HttpServerLive (Atom, ()) Text (Maybe Word)
| HttpServerBorn (Atom, ()) ()
= HttpServerRequest (Atom, Word, Word, ()) ServerId Address HttpRequest
| HttpServerLive (Atom, ()) Text (Maybe Word)
| HttpServerBorn (Atom, ()) ()
| HttpServerSetConfig (Atom, ()) ServerConfig
deriving (Eq, Ord, Show)
deriveNoun ''ServerConfig
deriveNoun ''HttpRequest
deriveNoun ''Address
deriveNoun ''ResponseHeader
@ -261,17 +273,6 @@ data Belt
| Txt Tour
deriving (Eq, Ord, Show)
data Blit
= Bel
| Clr
| Hop Word64
| Lin [Char]
| Mor
| Sag Path Noun
| Sav Path Atom
| Url Text
deriving (Eq, Ord, Show)
data Term
= TermBelt (Atom, ()) Belt
| TermBlew (Atom, ()) Word Word
@ -283,7 +284,6 @@ data Term
deriveNoun ''LegacyBootEvent
deriveNoun ''ArrowKey
deriveNoun ''Belt
deriveNoun ''Blit
deriveNoun ''Term

View File

@ -1,11 +1,12 @@
{-# OPTIONS_GHC -Wwarn #-}
module Vere.Pier where
module Vere.Pier (booted, resumed, runPersist, runCompute) where
import Data.Acquire
import UrbitPrelude
import Vere.Ovum
import Vere.FX
import Vere.Pier.Types
import Data.Conduit
import System.Directory (createDirectoryIfMissing)
import System.Posix.Files (ownerModes, setFileMode)
@ -13,34 +14,24 @@ import Vere.Log (EventLog)
import Vere.Serf (Serf, SerfState(..))
import qualified System.Entropy as Ent
import qualified Urbit.Time as Time
import qualified Vere.Log as Log
import qualified Vere.Serf as Serf
--------------------------------------------------------------------------------
ioDrivers = [] :: [IODriver]
_ioDrivers = [] :: [IODriver]
setupPierDirectory :: FilePath -> IO ()
setupPierDirectory shipPath = do
_setupPierDirectory :: FilePath -> IO ()
_setupPierDirectory shipPath = do
for_ ["put", "get", "log", "chk"] $ \seg -> do
let pax = shipPath <> "/.urb/" <> seg
createDirectoryIfMissing True pax
setFileMode pax ownerModes
{-
data Pier = Pier
{ computeQueue :: TQueue Ovum
, persistQueue :: TQueue (Writ [Eff])
, releaseQueue :: TQueue (Writ [Eff])
, log :: EventLog
, driverThreads :: [(Async (), Perform)]
, portingThread :: Async ()
}
-}
--------------------------------------------------------------------------------
-- Load pill into boot sequence. -----------------------------------------------
genEntropy :: IO Word512
genEntropy = fromIntegral . view (from atomBytes) <$> Ent.getEntropy 64
@ -52,56 +43,13 @@ generateBootSeq ship Pill{..} = do
pure $ BootSeq ident pBootFormulas ovums
where
ident = LogIdentity ship True (fromIntegral $ length pBootFormulas)
preKern ent = [ Ovum (Path ["", "term", "1"]) (Boot $ Fake $ who ident)
, Ovum (Path ["", "arvo"]) (Whom ship)
, Ovum (Path ["", "arvo"]) (Wack ent)
preKern ent = [ OvumBlip $ BlipTerm $ TermBoot (1,()) (Fake (who ident))
, OvumBlip $ BlipArvo $ ArvoWhom () ship
, OvumBlip $ BlipArvo $ ArvoWack () ent
]
--------------------------------------------------------------------------------
{-
This is called to make a freshly booted pier. It assigns an identity
to an event log and takes a chill pill.
-}
boot :: FilePath -> FilePath -> Ship
-> (Serf -> EventLog -> SerfState -> IO a)
-> IO a
boot pillPath top ship act = do
let logPath = top <> "/.urb/log"
pill <- loadFile @Pill pillPath >>= \case
Left l -> error (show l)
Right p -> pure p
seq@(BootSeq ident x y) <- generateBootSeq ship pill
with (Log.new logPath ident) $ \log -> do
serf <- Serf.startSerfProcess top
(events, serfSt) <- Serf.bootFromSeq serf seq
Serf.requestSnapshot serf serfSt
traceM "writeJobs"
writeJobs log (fromList events)
act serf log serfSt
{-
What we really want to do is write the log identity and then do
normal startup, but writeIdent requires a full log state
including input/output queues.
-}
resume :: FilePath -> (Serf -> EventLog -> SerfState -> IO a) -> IO a
resume top act = do
with (Log.existing (top <> "/.urb/log")) $ \log -> do
traceM "But why?"
serf <- Serf.startSerfProcess top
traceM "What"
serfSt <- Serf.replay serf log
traceM "is"
Serf.requestSnapshot serf serfSt
traceM "happening"
act serf log serfSt
-- Write a batch of jobs into the event log ------------------------------------
writeJobs :: EventLog -> Vector Job -> IO ()
writeJobs log !jobs = do
@ -109,23 +57,56 @@ writeJobs log !jobs = do
events <- fmap fromList $ traverse fromJob (zip [expect..] $ toList jobs)
Log.appendEvents log events
where
fromJob :: (EventId, Job) -> IO Atom
fromJob :: (EventId, Job) -> IO ByteString
fromJob (expectedId, job) = do
guard (expectedId == jobId job)
pure $ jam $ jobPayload job
pure $ jamBS $ jobPayload job
jobPayload :: Job -> Noun
jobPayload (RunNok (LifeCyc _ m n)) = toNoun (m, n)
jobPayload (DoWork (Work _ m d o)) = toNoun (m, d, o)
-- Boot a new ship. ------------------------------------------------------------
booted :: FilePath -> FilePath -> Ship -> Acquire (Serf, EventLog, SerfState)
booted pillPath top ship = do
pill <- liftIO $ loadFile @Pill pillPath >>= \case
Left l -> error (show l)
Right p -> pure p
seq@(BootSeq ident x y) <- liftIO $ generateBootSeq ship pill
log <- Log.new (top <> "/.urb/log") ident
serf <- Serf.run top
liftIO $ do
(events, serfSt) <- Serf.bootFromSeq serf seq
Serf.snapshot serf serfSt
writeJobs log (fromList events)
pure (serf, log, serfSt)
-- Resume an existing ship. ----------------------------------------------------
resumed :: FilePath -> Acquire (Serf, EventLog, SerfState)
resumed top = do
log <- Log.existing (top <> "/.urb/log")
serf <- Serf.run top
serfSt <- liftIO (Serf.replay serf log)
liftIO (Serf.snapshot serf serfSt)
pure (serf, log, serfSt)
-- Run Pier --------------------------------------------------------------------
{-
performCommonPierStartup :: Serf.Serf
-> TQueue Ovum
-> TQueue (Writ [Eff])
-> TQueue (Writ [Eff])
-> TQueue (Writ, FX)
-> TQueue (Writ, FX)
-> LogState
-> IO Pier
performCommonPierStartup serf computeQ persistQ releaseQ logState = do
@ -150,6 +131,19 @@ performCommonPierStartup serf computeQ persistQ releaseQ logState = do
-}
-- Compute Thread --------------------------------------------------------------
runCompute :: Serf -> STM Ovum -> (EventId, Mug) -> IO (Async ())
runCompute w getEvent (evendId, mug) = async $ forever $ do
ovum <- atomically $ getEvent
currentDate <- Time.now
let _mat = jam (undefined (mug, currentDate, ovum))
undefined
-- Persist Thread --------------------------------------------------------------
data PersistExn = BadEventId EventId EventId
@ -162,8 +156,8 @@ instance Exception PersistExn where
]
runPersist :: EventLog
-> TQueue (Writ [Eff])
-> (Writ [Eff] -> STM ())
-> TQueue (Writ, FX)
-> ((Writ, FX) -> STM ())
-> Acquire ()
runPersist log inpQ out = do
mkAcquire runThread cancelWait
@ -174,22 +168,22 @@ runPersist log inpQ out = do
runThread :: IO (Async ())
runThread = asyncBound $ forever $ do
writs <- atomically (toNullable <$> getBatchFromQueue)
events <- validateWritsAndGetAtom writs
writs <- atomically getBatchFromQueue
events <- validateWritsAndGetBytes (toNullable writs)
Log.appendEvents log events
atomically $ traverse_ out writs
validateWritsAndGetAtom :: [Writ [Eff]] -> IO (Vector Atom)
validateWritsAndGetAtom writs = do
validateWritsAndGetBytes :: [(Writ, FX)] -> IO (Vector ByteString)
validateWritsAndGetBytes writs = do
expect <- Log.nextEv log
fmap fromList
$ for (zip [expect..] writs)
$ \(expectedId, Writ{..}) -> do
unless (expectedId == eventId) $
throwIO (BadEventId expectedId eventId)
pure (unJam event)
$ \(expectedId, (w, fx)) -> do
unless (expectedId == writId w) $
throwIO (BadEventId expectedId (writId w))
pure (writEv w)
getBatchFromQueue :: STM (NonNull [Writ [Eff]])
getBatchFromQueue :: STM (NonNull [(Writ, FX)])
getBatchFromQueue =
readTQueue inpQ >>= go . singleton
where

View File

@ -1,64 +1,28 @@
{-# LANGUAGE UndecidableInstances #-}
module Vere.Pier.Types where
import UrbitPrelude
import UrbitPrelude hiding (Term)
import Urbit.Time
import Vere.Ovum
import Vere.FX
import qualified Vere.Ames as Ames
import qualified Vere.Http.Client as Client
import qualified Vere.Http.Server as Server
--------------------------------------------------------------------------------
newtype ShipId = ShipId (Ship, Bool)
deriving newtype (Eq, Ord, Show, ToNoun, FromNoun)
newtype Octs = Octs ByteString
deriving newtype (Eq, Ord, Show, FromNoun, ToNoun)
newtype FileOcts = FileOcts ByteString
deriving newtype (Eq, Ord, ToNoun, FromNoun)
newtype BigTape = BigTape Text
deriving newtype (Eq, Ord, ToNoun, FromNoun)
type Life = Noun
type Pass = Noun
type Turf = Noun
type PUrl = Todo Noun
type Seed = Todo Noun
type Czar = Todo Noun -- Map Ship (Life, Pass)
type Bloq = Todo Atom -- @ud
newtype Todo a = Todo a
deriving newtype (Eq, Ord, ToNoun, FromNoun)
instance Show (Todo a) where
show (Todo _) = "TODO"
data Dawn = MkDawn
{ dSeed :: Seed
, dShip :: Ship
, dCzar :: Czar
, dTurf :: [Turf]
, dBloq :: Bloq
, dNode :: PUrl
}
deriving (Eq, Ord, Show)
data LegacyBootEvent
= Fake Ship
| Dawn Dawn
deriving (Eq, Ord, Show)
-- Don't show Nock values. -----------------------------------------------------
newtype Nock = Nock Noun
deriving newtype (Eq, Ord, FromNoun, ToNoun)
instance Show Nock where
show _ = "Nock"
--------------------------------------------------------------------------------
type EventId = Word64
data Pill = Pill
{ pBootFormulas :: [Nock]
, pKernelOvums :: [RawOvum]
, pUserspaceOvums :: [RawOvum]
, pKernelOvums :: [Ovum]
, pUserspaceOvums :: [Ovum]
}
deriving (Eq, Ord)
@ -68,21 +32,19 @@ data LogIdentity = LogIdentity
, lifecycleLen :: Word
} deriving (Eq, Ord, Show)
data BootSeq = BootSeq LogIdentity [Nock] [RawOvum]
data BootSeq = BootSeq LogIdentity [Nock] [Ovum]
deriving (Eq, Ord, Show)
newtype Desk = Desk Text
deriving newtype (Eq, Ord, Show, ToNoun, FromNoun)
data Mime = Mime Path FileOcts
deriving (Eq, Ord, Show)
type EventId = Word64
deriveNoun ''LogIdentity
deriveNoun ''Pill
-- Jobs ------------------------------------------------------------------------
data Work = Work EventId Mug Wen RawOvum
data Work = Work EventId Mug Wen Ovum
deriving (Eq, Ord, Show)
data LifeCyc = LifeCyc EventId Mug Nock
@ -111,261 +73,22 @@ data Order
| OWork Job
deriving (Eq, Ord, Show)
data ResponseHeader = ResponseHeader
{ rhStatus :: Word
, rhHeaders :: [(Text, Text)]
}
deriving (Eq, Ord, Show)
deriveToNoun ''Order
data HttpEvent
= Start ResponseHeader (Maybe Octs) Bool
| Continue (Maybe Octs) Bool
| Cancel
deriving (Eq, Ord, Show)
data Lane
= If Wen Atom Atom -- {$if p/@da q/@ud r/@if}
| Is Atom (Maybe Lane) Atom -- {$is p/@ud q/(unit lane) r/@is}
| Ix Wen Atom Atom -- {$ix p/@da q/@ud r/@if}
deriving (Eq, Ord, Show)
data ArrowKey = D | L | R | U
deriving (Eq, Ord, Show)
data Address
= AIpv4 Atom -- @if
| AIpv6 Atom -- @is
| AAmes Atom -- @p
deriving (Eq, Ord, Show)
instance ToNoun Address where
toNoun = \case
AIpv4 x -> toNoun (Cord "ipv4", x)
AIpv6 x -> toNoun (Cord "ipv6", x)
AAmes x -> toNoun (Cord "ames", x)
instance FromNoun Address where
parseNoun n = do
parseNoun n >>= \case
(Cord "ipv4", at) -> pure (AIpv4 at)
(Cord "ipv6", at) -> pure (AIpv6 at)
(Cord "ames", at) -> pure (AAmes at)
_ -> fail "Address must be either %ipv4, %ipv6, or %ames"
data Belt
= Aro ArrowKey
| Bac
| Ctl Char
| Del
| Met Char
| Ret
| Txt Tour
deriving (Eq, Ord, Show)
type ServerId = Atom
type JSON = Todo Noun
data RequestParams
= List [JSON]
| Object [(Text, JSON)]
deriving (Eq, Ord, Show)
data HttpRequest = HttpRequest
{ reqId :: Text
, reqUrl :: Text
, reqHeaders :: [(Text, Text)]
, reqFinished :: Maybe Octs
}
deriving (Eq, Ord, Show)
data Event
= Veer Cord Path BigTape
| Into Desk Bool [(Path, Maybe Mime)]
| Whom Ship
| Boot LegacyBootEvent
| Wack Word512
| Boat
| Barn
| Born
| Blew Word Word
| Hail
| Wake
| Receive ServerId HttpEvent
| Request ServerId Address HttpRequest
| Live Text Bool Word
| Hear Lane Atom
| Belt Belt
| Crud Text [Tank]
deriving (Eq, Ord, Show)
data PutDel = PDPut | PDDel
deriving (Eq, Ord, Show)
instance ToNoun PutDel where
toNoun = \case PDPut -> toNoun (Cord "put")
PDDel -> toNoun (Cord "del")
instance FromNoun PutDel where
parseNoun n = do
parseNoun n >>= \case
Cord "put" -> pure PDPut
Cord "del" -> pure PDDel
_ -> fail "PutDel must be either %put or %del"
data RecEx = RE Word Word
deriving (Eq, Ord, Show)
data NewtEx = NE Word
deriving (Eq, Ord, Show)
data Eff
= EHttpServer Server.Eff
| EHttpClient Client.Eff
| EAmes Ames.Eff
| EBbye Noun
| EBehn Noun
| EBlit [Blit]
| EBoat Noun
| EClay Noun
| ECrud Noun
| EDirk Noun
| EDoze (Maybe Wen)
| EErgo Noun
| EExit Noun
| EFlog Noun
| EForm Noun
| EHill [Term]
| EInit
| ELogo Noun
| EMass Noun
| ENewt Noun
| EOgre Noun
| ESend [Blit]
| ESync Noun
| ETerm Noun
| EThou Noun
| ETurf (Maybe (PutDel, [Text])) -- TODO Unsure
| EVega Noun
| EWest Noun
| EWoot Noun
deriving (Eq, Ord, Show)
data Blit
= Bel
| Clr
| Hop Word64
| Lin [Char]
| Mor
| Sag Path Noun
| Sav Path Atom
| Url Text
deriving (Eq, Ord, Show)
data Varience = Gold | Iron | Lead
--------------------------------------------------------------------------------
type Perform = Eff -> IO ()
data RawOvum = Ovum Path Event
deriving (Eq, Ord, Show)
--------------------------------------------------------------------------------
{-
This parses an ovum in a slightly complicated way.
The Ovum structure is not setup to be easily parsed into typed data,
since the type of the event depends on the head of the path, and
the shape of the rest of the path depends on the shape of the event.
To make parsing easier (indeed, to allow use to use `deriveEvent` to
generate parsers for this) we first re-arrange the data in the ovum.
And ovum is `[path event]`, but the first two fields of the path
are used for routing, the event is always a head-tagged structure,
and the rest of the path is basically data that's a part of the event.
So, we take something with this struture:
[[fst snd rest] [tag val]]
Then restructure it into *this* shape:
[fst [snd [tag rest val]]]
And then proceed with parsing as usual.
-}
data OvalOvum
= OOBlip BlipOvum
| OOVane VaneOvum
instance FromNoun OvalOvum where
parseNoun n = named "Ovum" $ do
(path::Path, tag::Cord, v::Noun) <- parseNoun n
case path of
Path ("" : m : p) -> OOBlip <$> parseNoun (toNoun (m, tag, p, v))
Path ("vane" : m : p) -> OOVane <$> parseNoun (toNoun (m, tag, p, v))
Path (_:_:_) -> fail "path must start with %$ or %vane"
Path (_:_) -> fail "path too short"
Path _ -> fail "empty path"
instance ToNoun OvalOvum where
toNoun oo =
fromNounErr noun & \case
Left err -> error (show err)
Right (pathSnd::Knot, tag::Cord, Path path, val::Noun) ->
toNoun (Path (pathHead:pathSnd:path), (tag, val))
where
(pathHead, noun) =
case oo of OOBlip bo -> ("", toNoun bo)
OOVane vo -> ("vane", toNoun vo)
--------------------------------------------------------------------------------
type AmesOvum = Void
type ArvoOvum = Void
type BehnOvum = Void
type BoatOvum = Void
type HttpClientOvum = Void
type HttpServerOvum = Void
type NewtOvum = Void
type SyncOvum = Void
type TermOvum = Void
data BlipOvum
= BOAmes AmesOvum
| BOArvo ArvoOvum
| BOBehn BehnOvum
| BOBoat BoatOvum
| BOHttpClient HttpClientOvum
| BOHttpServer HttpServerOvum
| BONewt NewtOvum
| BOSync SyncOvum
| BOTerm TermOvum
data KernelModule
= Ames | Behn | Clay | Dill | Eyre | Ford | Gall | Iris | Jael
data VaneOvum
= VOVane (KernelModule, ()) Void
| VOZuse () Void
--------------------------------------------------------------------------------
newtype Jam = Jam { unJam :: Atom }
deriving newtype (Eq, Ord, Show, ToNoun, FromNoun)
data IODriver = IODriver
{ bornEvent :: IO RawOvum
, startDriver :: (RawOvum -> STM ()) -> IO (Async (), Perform)
{ bornEvent :: IO Ovum
, startDriver :: (Ovum -> STM ()) -> IO (Async (), Perform)
}
data Writ a = Writ
{ eventId :: Word64
, timeout :: Maybe Word
, event :: Jam -- mat
, payload :: a
data Writ = Writ
{ writId :: Word64
, writTimeout :: Maybe Word
, writEv :: ByteString -- Jammed atomJam
}
@ -392,38 +115,5 @@ instance ToNoun Job where
toNoun (DoWork w) = toNoun w
toNoun (RunNok l) = toNoun l
instance Show FileOcts where
show (FileOcts bs) = show (take 32 bs <> "...")
instance Show BigTape where
show (BigTape t) = show (take 32 t <> "...")
instance Show Nock where
show _ = "Nock"
instance Show Pill where
show (Pill x y z) = show (length x, length y, length z)
deriveToNoun ''Order
deriveNoun ''ArrowKey
deriveNoun ''Belt
deriveNoun ''BlipOvum
deriveNoun ''Blit
deriveNoun ''Dawn
deriveNoun ''Eff
deriveNoun ''Event
deriveNoun ''HttpEvent
deriveNoun ''HttpRequest
deriveNoun ''KernelModule
deriveNoun ''Lane
deriveNoun ''LegacyBootEvent
deriveNoun ''LogIdentity
deriveNoun ''Mime
deriveNoun ''NewtEx
deriveNoun ''Pill
deriveNoun ''RawOvum
deriveNoun ''RecEx
deriveNoun ''RequestParams
deriveNoun ''ResponseHeader
deriveNoun ''VaneOvum

View File

@ -1,19 +1,14 @@
{-# OPTIONS_GHC -Wwarn #-}
{-
- TODO: `Serf` type should have something like:
```
getInput :: STM (Writ ())
onComputed :: Writ [Effect] -> STM ()
onExit :: Serf -> IO ()
task :: Async ()
```
- TODO: `recvLen` is not big-endian safe.
-}
{-# OPTIONS_GHC -Wwarn #-}
module Vere.Serf where
module Vere.Serf ( Serf, SerfState
, run, shutdown, kill
, replay, bootFromSeq, snapshot
, collectFX
) where
import UrbitPrelude hiding (fail)
import Data.Conduit
@ -23,6 +18,8 @@ import Data.Void
import Noun
import System.Process
import Vere.Pier.Types
import Vere.Ovum
import Vere.FX
import Control.Concurrent (threadDelay)
import Data.ByteString (hGet)
@ -30,6 +27,7 @@ import Data.ByteString.Unsafe (unsafeUseAsCString)
import Foreign.Marshal.Alloc (alloca)
import Foreign.Ptr (castPtr)
import Foreign.Storable (peek, poke)
import System.Directory (createDirectoryIfMissing)
import System.Exit (ExitCode)
import qualified Data.ByteString.Unsafe as BS
@ -53,12 +51,15 @@ data Serf = Serf
, sState :: MVar SerfState
}
data ShipId = ShipId Ship Bool
deriving (Eq, Ord, Show)
type Play = Maybe (EventId, Mug, ShipId)
data Plea
= PPlay Play
| PWork Work
| PDone EventId Mug [(Path, Eff)]
| PDone EventId Mug FX
| PStdr EventId Cord
| PSlog EventId Word32 Tank
deriving (Eq, Show)
@ -66,8 +67,7 @@ data Plea
type GetJobs = EventId -> Word64 -> IO (Vector Job)
type ReplacementEv = Job
type Fx = [(Path, Eff)]
type WorkResult = (SerfState, Fx)
type WorkResult = (SerfState, FX)
type SerfResp = Either ReplacementEv WorkResult
data SerfExn
@ -78,7 +78,7 @@ data SerfExn
| BadPleaNoun Noun [Text] Text
| ReplacedEventDuringReplay EventId ReplacementEv
| ReplacedEventDuringBoot EventId ReplacementEv
| EffectsDuringBoot EventId [(Path, Eff)]
| EffectsDuringBoot EventId FX
| SerfConnectionClosed
| UnexpectedPleaOnNewShip Plea
| InvalidInitialPlea Plea
@ -89,6 +89,7 @@ data SerfExn
instance Exception SerfExn
deriveNoun ''ShipId
deriveNoun ''Plea
@ -114,18 +115,18 @@ fromRightExn (Right x) _ = pure x
-- Process Management ----------------------------------------------------------
{-
TODO Think about how to handle process exit
TODO Tear down subprocess on exit? (terminiteProcess)
TODO `config` is a stub, fill it in.
-}
startSerfProcess :: FilePath -> IO Serf
startSerfProcess pier =
do
run :: FilePath -> Acquire Serf
run pierPath = mkAcquire (startUp pierPath) tearDown
startUp :: FilePath -> IO Serf
startUp pierPath = do
(Just i, Just o, _, p) <- createProcess pSpec
ss <- newEmptyMVar
pure (Serf i o p ss)
where
chkDir = traceShowId pier
chkDir = traceShowId pierPath
diskKey = ""
config = "0"
args = [chkDir, diskKey, config]
@ -134,12 +135,22 @@ startSerfProcess pier =
, std_out = CreatePipe
}
tearDown :: Serf -> IO ()
tearDown serf = do
race_ (threadDelay 1000000 >> terminateProcess (process serf))
(shutdownAndWait serf 0)
waitForExit :: Serf -> IO ExitCode
waitForExit serf = waitForProcess (process serf)
kill :: Serf -> IO ExitCode
kill serf = terminateProcess (process serf) >> waitForExit serf
shutdownAndWait :: Serf -> Word8 -> IO ExitCode
shutdownAndWait serf code = do
shutdown serf code
waitForExit serf
-- Basic Send and Receive Operations -------------------------------------------
@ -200,16 +211,11 @@ cordString (Cord bs) = unpack $ T.strip $ decodeUtf8 bs
--------------------------------------------------------------------------------
requestSnapshot :: Serf -> SerfState -> IO ()
requestSnapshot serf SerfState{..} = sendOrder serf (OSave $ ssNextEv - 1)
snapshot :: Serf -> SerfState -> IO ()
snapshot serf SerfState{..} = sendOrder serf (OSave $ ssNextEv - 1)
requestShutdown :: Serf -> Word8 -> IO ()
requestShutdown serf code = sendOrder serf (OExit code)
shutdownAndWait :: Serf -> Word8 -> IO ExitCode
shutdownAndWait serf code = do
requestShutdown serf code
waitForExit serf
shutdown :: Serf -> Word8 -> IO ()
shutdown serf code = sendOrder serf (OExit code)
{-
TODO Find a cleaner way to handle `PStdr` Pleas.
@ -272,7 +278,7 @@ sendWork w job =
--------------------------------------------------------------------------------
doJob :: Serf -> Job -> IO (Job, SerfState, Fx)
doJob :: Serf -> Job -> IO (Job, SerfState, FX)
doJob serf job = do
sendWork serf job >>= \case
Left replaced -> doJob serf replaced
@ -287,7 +293,7 @@ bootJob serf job = do
replayJob :: Serf -> Job -> IO SerfState
replayJob serf job = do
sendWork serf job >>= \case
Left replace -> throwIO (ReplacedEventDuringReplay (jobId job) replace)
Left replace -> throwIO (ReplacedEventDuringReplay (jobId job) replace)
Right (ss, _) -> pure ss
@ -319,7 +325,6 @@ bootFromSeq serf (BootSeq ident nocks ovums) = do
where
muckNock nok eId mug _ = RunNok $ LifeCyc eId mug nok
muckOvum ov eId mug wen = DoWork $ Work eId mug wen ov
{-
The ship is booted, but it is behind. shove events to the worker
until it is caught up.
@ -337,7 +342,7 @@ replay serf log = do
.| toJobs (Log.identity log) (ssNextEv ss)
.| replayJobs serf ss
toJobs :: LogIdentity -> EventId -> ConduitT Atom Job IO ()
toJobs :: LogIdentity -> EventId -> ConduitT ByteString Job IO ()
toJobs ident eId =
await >>= \case
Nothing -> traceM "no more jobs" >> pure ()
@ -345,27 +350,51 @@ toJobs ident eId =
traceM (show eId)
toJobs ident (eId+1)
where
isNock = eId > fromIntegral (lifecycleLen ident)
isNock = trace (show (eId, lifecycleLen ident))
$ eId <= fromIntegral (lifecycleLen ident)
fromAtom :: Atom -> IO Job
fromAtom at | isNock = do
noun <- cueExn at
fromAtom :: ByteString -> IO Job
fromAtom bs | isNock = do
noun <- cueBSExn bs
(mug, nok) <- fromNounExn noun
pure $ RunNok (LifeCyc eId mug nok)
fromAtom at | isNock = do
noun <- cueExn at
fromAtom bs = do
noun <- cueBSExn bs
(mug, wen, ovm) <- fromNounExn noun
pure $ DoWork (Work eId mug wen ovm)
-- Compute Thread --------------------------------------------------------------
-- Collect Effects for Parsing -------------------------------------------------
startComputeThread :: Serf -> STM RawOvum -> (EventId, Mug) -> IO (Async ())
startComputeThread w getEvent (evendId, mug) = async $ forever $ do
ovum <- atomically $ getEvent
collectFX :: Serf -> Log.EventLog -> IO ()
collectFX serf log = do
ss <- handshake serf (Log.identity log)
currentDate <- Time.now
let pax = "/home/benjamin/testnet-zod-fx"
let _mat = jam (undefined (mug, currentDate, ovum))
createDirectoryIfMissing True pax
runConduit $ Log.streamEvents log (ssNextEv ss)
.| toJobs (Log.identity log) (ssNextEv ss)
.| doCollectFX serf ss
.| persistFX pax
persistFX :: FilePath -> ConduitT (EventId, FX) Void IO ()
persistFX pax = await >>= \case
Nothing -> pure ()
Just (eId, fx) -> do
writeFile (pax <> "/" <> show eId) (jamBS $ toNoun fx)
persistFX pax
doCollectFX :: Serf -> SerfState -> ConduitT Job (EventId, FX) IO ()
doCollectFX serf = go
where
go :: SerfState -> ConduitT Job (EventId, FX) IO ()
go ss = await >>= \case
Nothing -> pure ()
Just jb -> do
(_, ss, fx) <- liftIO (doJob serf jb)
liftIO $ print (jobId jb)
yield (jobId jb, fx)
go ss
undefined

View File

@ -36,10 +36,10 @@ assertEqual x y = do
-- Database Operations ---------------------------------------------------------
data Db = Db LogIdentity [Atom]
data Db = Db LogIdentity [ByteString]
deriving (Eq, Ord, Show)
addEvents :: Db -> [Atom] -> Db
addEvents :: Db -> [ByteString] -> Db
addEvents (Db id evs) new = Db id (evs <> new)
readDb :: EventLog -> IO Db
@ -89,7 +89,7 @@ tryReadDatabase = forAll arbitrary (ioProperty . runTest)
tryAppend :: Property
tryAppend = forAll arbitrary (ioProperty . runTest)
where
runTest :: ([Atom], Db) -> IO Bool
runTest :: ([ByteString], Db) -> IO Bool
runTest (extra, db) = do
runInBoundThread $
withTestDir $ \dir -> do
@ -102,16 +102,13 @@ tryAppend = forAll arbitrary (ioProperty . runTest)
readDb log >>= assertEqual db'
pure True
readAtom :: FilePath -> IO Atom
readAtom path = view (from atomBytes) <$> readFile path
tryAppendHuge :: Property
tryAppendHuge = forAll arbitrary (ioProperty . runTest)
where
runTest :: ([Atom], Db) -> IO Bool
runTest :: ([ByteString], Db) -> IO Bool
runTest (extra, db) = do
runInBoundThread $ do
extra <- do b <- readAtom "/home/benjamin/r/urbit/bin/brass.pill"
extra <- do b <- readFile "/home/benjamin/r/urbit/bin/brass.pill"
pure (extra <> [b] <> extra)
withTestDir $ \dir -> do
db' <- pure (addEvents db extra)
@ -147,8 +144,8 @@ tests =
arb :: Arbitrary a => Gen a
arb = arbitrary
instance Arbitrary Natural where
arbitrary = fromInteger . abs <$> arbitrary
instance Arbitrary ByteString where
arbitrary = pack <$> arbitrary
instance (Arbitrary a, Arbitrary b) => Arbitrary (LargeKey a b) where
arbitrary = LargeKey <$> arb <*> arb

View File

@ -19,6 +19,13 @@ import Urbit.Time (Wen)
import qualified Vere.Log as Log
import qualified Vere.Ovum as Ovum
import qualified Vere.Pier as Pier
import qualified Vere.Serf as Serf
--------------------------------------------------------------------------------
zod :: Ship
zod = 0
--------------------------------------------------------------------------------
@ -28,27 +35,34 @@ removeFileIfExists pax = do
when exists $ do
removeFile pax
catchAny :: IO a -> (SomeException -> IO a) -> IO a
catchAny = Control.Exception.catch
--------------------------------------------------------------------------------
wipeSnapshot :: FilePath -> IO ()
wipeSnapshot shipPath = do
removeFileIfExists (shipPath <> ".urb/chk/north.bin")
removeFileIfExists (shipPath <> ".urb/chk/south.bin")
removeFileIfExists (shipPath <> "/.urb/chk/north.bin")
removeFileIfExists (shipPath <> "/.urb/chk/south.bin")
print (shipPath <> "/.urb/chk/north.bin")
print (shipPath <> "/.urb/chk/south.bin")
tryBootFromPill :: FilePath -> FilePath -> Ship -> IO ()
tryBootFromPill pillPath shipPath ship = do
wipeSnapshot shipPath
Pier.boot pillPath shipPath ship $ \s l ss -> do
with (Pier.booted pillPath shipPath ship) $ \(serf, log, ss) -> do
print "lul"
print ss
threadDelay 500000
shutdownAndWait s 0 >>= print
shutdown serf 0 >>= print
putStrLn "Booted!"
tryResume :: FilePath -> IO ()
tryResume shipPath = do
Pier.resume shipPath $ \s l ss -> do
with (Pier.resumed shipPath) $ \(serf, log, ss) -> do
print ss
threadDelay 500000
shutdownAndWait s 0 >>= print
shutdown serf 0 >>= print
putStrLn "Resumed!"
tryFullReplay :: FilePath -> IO ()
@ -56,11 +70,7 @@ tryFullReplay shipPath = do
wipeSnapshot shipPath
tryResume shipPath
zod :: Ship
zod = 0
catchAny :: IO a -> (SomeException -> IO a) -> IO a
catchAny = Control.Exception.catch
--------------------------------------------------------------------------------
tryParseEvents :: FilePath -> EventId -> IO ()
tryParseEvents dir first = do
@ -73,12 +83,13 @@ tryParseEvents dir first = do
paths <- sort . ordNub <$> readIORef vPax
for_ paths print
where
showEvents :: IORef [Path] -> EventId -> EventId -> ConduitT Atom Void IO ()
showEvents :: IORef [Path] -> EventId -> EventId
-> ConduitT ByteString Void IO ()
showEvents vPax eId cycle = await >>= \case
Nothing -> print "Done!"
Just at -> do
Just bs -> do
-- print ("got event", eId)
n <- liftIO $ cueExn at
n <- liftIO $ cueBSExn bs
-- print ("done cue", eId)
when (eId <= cycle) $ do
putStrLn ("lifecycle nock: " <> tshow eId)
@ -105,14 +116,32 @@ tryParseEvents dir first = do
unpackJob :: Noun -> IO (Mug, Wen, Noun)
unpackJob n = fromNounExn n
--------------------------------------------------------------------------------
collectedFX :: FilePath -> Acquire ()
collectedFX top = do
log <- Log.existing (top <> "/.urb/log")
serf <- Serf.run top
liftIO (Serf.collectFX serf log)
collectAllFx :: FilePath -> IO ()
collectAllFx top = do
wipeSnapshot top
with (collectedFX top) $ \() ->
putStrLn "Done collecting effects!"
--------------------------------------------------------------------------------
main :: IO ()
main = do
let pillPath = "/home/benjamin/r/urbit/bin/brass.pill"
shipPath = "/home/benjamin/r/urbit/zod/"
ship = zod
tryParseEvents "/home/benjamin/r/urbit/zod/.urb/log" 1
tryParseEvents "/home/benjamin/r/urbit/testnet-zod/.urb/log" 1
collectAllFx "/home/benjamin/r/urbit/testnet-zod"
-- tryParseEvents "/home/benjamin/r/urbit/zod/.urb/log" 1
-- tryParseEvents "/home/benjamin/r/urbit/testnet-zod/.urb/log" 1
-- tryBootFromPill pillPath shipPath ship
-- tryResume shipPath
@ -152,7 +181,7 @@ tryCopyLog = do
})
$ \log2 -> do
let writs = zip [1..] events <&> \(id, a) ->
Writ id Nothing (Jam a) []
(Writ id Nothing a, [])
print "About to write"