mirror of
https://github.com/ilyakooo0/urbit.git
synced 2024-12-15 18:12:47 +03:00
Merge remote-tracking branch 'origin/bs/uterm' into eg/uterm
This commit is contained in:
commit
25450d5ac8
@ -49,6 +49,9 @@ data Bug
|
||||
| CollectAllFX
|
||||
{ bPierPath :: FilePath
|
||||
}
|
||||
| EventBrowser
|
||||
{ bPierPath :: FilePath
|
||||
}
|
||||
| ValidateEvents
|
||||
{ bPierPath :: FilePath
|
||||
, bFirstEvt :: Word64
|
||||
@ -117,10 +120,10 @@ new = do
|
||||
$ metavar "SHIP"
|
||||
<> help "Ship address"
|
||||
|
||||
nPierPath <- argument auto
|
||||
nPierPath <- optional
|
||||
$ strArgument
|
||||
$ metavar "PIER"
|
||||
<> help "Path to pier"
|
||||
<> value Nothing
|
||||
|
||||
nPillPath <- strOption
|
||||
$ short 'B'
|
||||
@ -240,6 +243,9 @@ checkEvs = ValidateEvents <$> pierPath <*> firstEv <*> lastEv
|
||||
checkFx :: Parser Bug
|
||||
checkFx = ValidateFX <$> pierPath <*> firstEv <*> lastEv
|
||||
|
||||
browseEvs :: Parser Bug
|
||||
browseEvs = EventBrowser <$> pierPath
|
||||
|
||||
bugCmd :: Parser Cmd
|
||||
bugCmd = fmap CmdBug
|
||||
$ subparser
|
||||
@ -255,6 +261,10 @@ bugCmd = fmap CmdBug
|
||||
( info (checkEvs <**> helper)
|
||||
$ progDesc "Parse all data in event log"
|
||||
)
|
||||
<> command "event-browser"
|
||||
( info (browseEvs <**> helper)
|
||||
$ progDesc "Interactively view (and prune) event log"
|
||||
)
|
||||
<> command "validate-effects"
|
||||
( info (checkFx <**> helper)
|
||||
$ progDesc "Parse all data in event log"
|
||||
|
@ -1,8 +1,6 @@
|
||||
{-
|
||||
# Booting a Ship
|
||||
|
||||
- TODO Correctly setup the Pier directory.
|
||||
- TODO Hook up CLI command.
|
||||
- TODO Don't just boot, also run the ship (unless `-x` is set).
|
||||
- TODO Figure out why ships booted by us don't work.
|
||||
|
||||
@ -40,38 +38,17 @@
|
||||
to the event log.
|
||||
|
||||
|
||||
# Proper Logging
|
||||
|
||||
- TODO Consider using RIO's logging infrastructure.
|
||||
- TODO If that's too heavy, figure out what the best way to do
|
||||
logging is now.
|
||||
- TODO Convert all existing logging to the chosen logging system.
|
||||
- TODO Add more logging to all the places. Logging is super useful.
|
||||
|
||||
|
||||
# Implement subcommands to test event and effect parsing.
|
||||
|
||||
- `king * --collect-fx`: All effects that come from the serf get
|
||||
written into the `effects` LMDB database.
|
||||
|
||||
- `king parse-events PIER`: Run through the event log, and parse all
|
||||
events, print failures.
|
||||
|
||||
- `king parse-effects PIER`: Run through the event log, and parse all
|
||||
effects, print any failures.
|
||||
|
||||
- `king clear-fx PIER`: Deletes all collected effects.
|
||||
|
||||
- `king full-replay PIER`: Replays the whole event log events, print
|
||||
any failures. On success, replace the snapshot.
|
||||
|
||||
|
||||
# Validate Pill Files
|
||||
|
||||
- `king validate-pill PILL`: Parse a pill file. Print an error on
|
||||
exit, and print a description of the pill on success.
|
||||
|
||||
|
||||
# Full Replay -- An Integration Test
|
||||
|
||||
- Copy the event log:
|
||||
@ -132,6 +109,7 @@ import qualified Data.Set as Set
|
||||
import qualified Vere.Log as Log
|
||||
import qualified Vere.Pier as Pier
|
||||
import qualified Vere.Serf as Serf
|
||||
import qualified EventBrowser
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
@ -171,19 +149,6 @@ runApp inner = do
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
example :: IO ()
|
||||
example = runApp sayHello
|
||||
|
||||
sayHello :: RIO App ()
|
||||
sayHello = do
|
||||
name <- view appName
|
||||
logDebug $ "Hello, " <> name
|
||||
logInfo $ "Hello, " <> name
|
||||
logWarn $ "Hello, " <> name
|
||||
logError $ "Hello, " <> name
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
zod :: Ship
|
||||
zod = 0
|
||||
|
||||
@ -380,11 +345,19 @@ newShip CLI.New{..} _ = do
|
||||
runShip :: HasLogFunc e => CLI.Run -> CLI.Opts -> RIO e ()
|
||||
runShip (CLI.Run pierPath) _ = tryPlayShip pierPath
|
||||
|
||||
startBrowser :: HasLogFunc e => FilePath -> RIO e ()
|
||||
startBrowser pierPath =
|
||||
rwith (Log.existing logPath) $ \log ->
|
||||
EventBrowser.run log
|
||||
where
|
||||
logPath = pierPath <> "/.urb/log"
|
||||
|
||||
main :: IO ()
|
||||
main = CLI.parseArgs >>= runApp . \case
|
||||
CLI.CmdRun r o -> runShip r o
|
||||
CLI.CmdNew n o -> newShip n o
|
||||
CLI.CmdBug (CLI.CollectAllFX pax) -> collectAllFx pax
|
||||
CLI.CmdBug (CLI.EventBrowser pax) -> startBrowser pax
|
||||
CLI.CmdBug (CLI.ValidatePill pax pil seq) -> testPill pax pil seq
|
||||
CLI.CmdBug (CLI.ValidateEvents pax f l) -> checkEvs pax f l
|
||||
CLI.CmdBug (CLI.ValidateFX pax f l) -> checkFx pax f l
|
||||
|
@ -128,6 +128,7 @@ data HttpServerEv
|
||||
| HttpServerEvRequestLocal (ServId, UD, UD, ()) HttpServerReq
|
||||
| HttpServerEvLive (ServId, ()) Port (Maybe Port)
|
||||
| HttpServerEvBorn (KingId, ()) ()
|
||||
| HttpServerEvCrud Path Cord Tang
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
deriveNoun ''Address
|
||||
@ -155,6 +156,7 @@ data ArvoEv
|
||||
= ArvoEvWhom () Ship
|
||||
| ArvoEvWack () Word512
|
||||
| ArvoEvWarn Path Noun
|
||||
| ArvoEvCrud Path Cord Tang
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
deriveNoun ''ArvoEv
|
||||
@ -163,8 +165,8 @@ deriveNoun ''ArvoEv
|
||||
-- Boat Events -----------------------------------------------------------------
|
||||
|
||||
data BoatEv
|
||||
= BoatEvBoat () ()
|
||||
| BoatEvVoid Void
|
||||
= BoatEvBoat () ()
|
||||
| BoatEvCrud Path Cord Tang
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
deriveNoun ''BoatEv
|
||||
@ -173,8 +175,9 @@ deriveNoun ''BoatEv
|
||||
-- Timer Events ----------------------------------------------------------------
|
||||
|
||||
data BehnEv
|
||||
= BehnEvWake () ()
|
||||
= BehnEvWake () ()
|
||||
| BehnEvBorn (KingId, ()) ()
|
||||
| BehnEvCrud Path Cord Tang
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
deriveNoun ''BehnEv
|
||||
@ -184,7 +187,7 @@ deriveNoun ''BehnEv
|
||||
|
||||
data NewtEv
|
||||
= NewtEvBarn (Atom, ()) ()
|
||||
| NewtEvBorn Void
|
||||
| NewtEvCrud Path Cord Tang
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
deriveNoun ''NewtEv
|
||||
@ -225,7 +228,7 @@ data TermEv
|
||||
| TermEvBlew (UD, ()) Word Word
|
||||
| TermEvBoot (UD, ()) LegacyBootEvent
|
||||
| TermEvHail (UD, ()) ()
|
||||
| TermEvBorn Void
|
||||
| TermEvCrud Path Cord Tang
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
deriveNoun ''LegacyBootEvent
|
||||
|
191
pkg/king/lib/EventBrowser.hs
Normal file
191
pkg/king/lib/EventBrowser.hs
Normal file
@ -0,0 +1,191 @@
|
||||
{-
|
||||
TODO Handle CTRL-D
|
||||
-}
|
||||
|
||||
module EventBrowser (run) where
|
||||
|
||||
import UrbitPrelude
|
||||
|
||||
import Arvo
|
||||
import Data.Conduit
|
||||
import Urbit.Time
|
||||
import Vere.Pier.Types
|
||||
|
||||
import Control.Monad.Trans.Maybe (MaybeT(..))
|
||||
|
||||
import Vere.Log (EventLog)
|
||||
|
||||
import qualified Data.Conduit.Combinators as C
|
||||
import qualified Vere.Log as Log
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
data Event = Event
|
||||
{ num :: Word64
|
||||
, mug :: Mug
|
||||
, wen :: Wen
|
||||
, ova :: Ev
|
||||
}
|
||||
deriving Show
|
||||
|
||||
data Input = Next | Prev | Quit | Trim | Effs | Init | Last
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
run :: HasLogFunc e => EventLog -> RIO e ()
|
||||
run log = do
|
||||
hSetBuffering stdin NoBuffering
|
||||
hSetEcho stdin False
|
||||
logInfo $ displayShow (Log.identity log)
|
||||
let cycle = fromIntegral $ lifecycleLen $ Log.identity log
|
||||
las <- Log.lastEv log
|
||||
loop cycle las las
|
||||
where
|
||||
failRead cur =
|
||||
putStrLn ("ERROR: Failed to read event: " <> tshow cur)
|
||||
|
||||
input cyc las cur mFx = do
|
||||
getInput las cur >>= \case
|
||||
Next -> loop cyc las (succ cur)
|
||||
Prev -> loop cyc las (pred cur)
|
||||
Init -> loop cyc las 1
|
||||
Last -> loop cyc las las
|
||||
Quit -> pure ()
|
||||
Trim -> trim cyc las cur mFx
|
||||
Effs -> showEffects mFx >> input cyc las cur mFx
|
||||
|
||||
trim cyc las cur mFx = do
|
||||
deleteFrom log las cur >>= \case
|
||||
True -> loop cyc (pred cur) (pred cur)
|
||||
False -> input cyc las cur mFx
|
||||
|
||||
loop cyc las 0 = loop cyc las 1
|
||||
loop cyc las cur | cur > las = loop cyc las las
|
||||
loop cyc las cur | cyc >= cur = do
|
||||
putStrLn ""
|
||||
putStrLn " [EVENT]"
|
||||
putStrLn ""
|
||||
putStrLn " Lifecycle Nock"
|
||||
putStrLn ""
|
||||
input cyc las cur (Just [])
|
||||
|
||||
loop cyc las cur = do
|
||||
mEv <- peekEvent log cur
|
||||
mFx <- peekEffect log cur
|
||||
|
||||
case mEv of
|
||||
Nothing -> failRead cur
|
||||
Just ev -> showEvent ev >> showEffectsTeaser mFx
|
||||
|
||||
input cyc las cur mFx
|
||||
|
||||
deleteFrom :: HasLogFunc e => EventLog -> Word64 -> Word64 -> RIO e Bool
|
||||
deleteFrom log las cur = do
|
||||
sure <- areYouSure
|
||||
if sure then doDelete else abortDelete
|
||||
pure sure
|
||||
where
|
||||
abortDelete = do
|
||||
putStrLn "\n\n [ABORTED]\n"
|
||||
putStrLn " Aborted delete, no events pruned.\n"
|
||||
|
||||
doDelete = do
|
||||
Log.trimEvents log cur
|
||||
putStrLn "\n\n [DELETED]\n"
|
||||
putStrLn " It's gone forever!\n"
|
||||
|
||||
question =
|
||||
if las == cur
|
||||
then mconcat [ " This will permanently delete the last event (#"
|
||||
, tshow las
|
||||
, ")\n" ]
|
||||
else mconcat [ " This will permanently delete all events in (#"
|
||||
, tshow cur
|
||||
, " - #"
|
||||
, tshow las
|
||||
, ")\n" ]
|
||||
|
||||
areYouSure = do
|
||||
putStrLn "\n\n ARE YOU SURE????"
|
||||
putStrLn ""
|
||||
putStrLn question
|
||||
putStr "(y|n) "
|
||||
hFlush stdout
|
||||
getChar <&> \case
|
||||
'y' -> True
|
||||
_ -> False
|
||||
|
||||
getInput :: Word64 -> Word64 -> RIO e Input
|
||||
getInput las cur = do
|
||||
putStr ("(" <> tshow cur <> "/" <> tshow las <> ") ")
|
||||
hFlush stdout
|
||||
getChar >>= \case
|
||||
'j' -> pure Next
|
||||
'k' -> pure Prev
|
||||
'q' -> pure Quit
|
||||
'f' -> pure Effs
|
||||
'x' -> pure Trim
|
||||
'0' -> pure Init
|
||||
'G' -> pure Last
|
||||
_ -> do putStrLn "\n"
|
||||
putStrLn help
|
||||
getInput las cur
|
||||
where
|
||||
help = unlines
|
||||
[ " [HELP]"
|
||||
, ""
|
||||
, " k View the previous event"
|
||||
, " j View the next event"
|
||||
, " 0 View the first event"
|
||||
, " G View the last event"
|
||||
, " q Quit"
|
||||
, " x Delete (only the last event)"
|
||||
, " ? Show this help"
|
||||
]
|
||||
|
||||
showEffectsTeaser :: Maybe FX -> RIO e ()
|
||||
showEffectsTeaser Nothing = putStrLn " [No collected effects]\n"
|
||||
showEffectsTeaser (Just []) = putStrLn " [No effects for this event]\n"
|
||||
showEffectsTeaser (Just fx) = putStrLn $ mconcat
|
||||
[ " ["
|
||||
, tshow (length fx)
|
||||
, " collected effects. Press 'f' to view]\n"
|
||||
]
|
||||
|
||||
showEffects :: Maybe FX -> RIO e ()
|
||||
showEffects Nothing = putStrLn " [No collected effects]\n"
|
||||
showEffects (Just []) = putStrLn " [No effects for this event]\n"
|
||||
showEffects (Just fx) = do
|
||||
putStrLn "\n"
|
||||
putStrLn " [EFFECTS]"
|
||||
for_ fx $ \ef -> do
|
||||
putStrLn ""
|
||||
showEffect ef
|
||||
putStrLn ""
|
||||
|
||||
showEffect :: Lenient Ef -> RIO e ()
|
||||
showEffect (GoodParse ef) =
|
||||
putStrLn $ unlines $ fmap (" " <>) $ lines $ pack $ ppShow ef
|
||||
showEffect (FailParse n) =
|
||||
putStrLn $ unlines $ fmap (" " <>) $ lines $ pack $ ppShow n
|
||||
|
||||
showEvent :: Event -> RIO e ()
|
||||
showEvent ev = do
|
||||
putStrLn "\n"
|
||||
putStrLn " [EVENT]"
|
||||
putStrLn ""
|
||||
putStrLn $ unlines $ fmap (" " <>) $ lines $ pack $ ppShow (ova ev)
|
||||
|
||||
peekEffect :: HasLogFunc e => EventLog -> Word64 -> RIO e (Maybe FX)
|
||||
peekEffect log eId = runMaybeT $ do
|
||||
(id, bs) <- MaybeT $ runConduit (Log.streamEffectsRows log eId .| C.head)
|
||||
guard (id == eId)
|
||||
io $ cueBSExn bs >>= fromNounExn
|
||||
|
||||
peekEvent :: HasLogFunc e => EventLog -> Word64 -> RIO e (Maybe Event)
|
||||
peekEvent log eId = runMaybeT $ do
|
||||
octs <- MaybeT $ runConduit (Log.streamEvents log eId .| C.head)
|
||||
noun <- io $ cueBSExn octs
|
||||
(m,w,e) <- io $ fromNounExn noun
|
||||
ovum <- fromNounExn e
|
||||
pure (Event eId m w ovum)
|
325
pkg/king/lib/Vere/LMDB.hs
Normal file
325
pkg/king/lib/Vere/LMDB.hs
Normal file
@ -0,0 +1,325 @@
|
||||
module Vere.LMDB where
|
||||
|
||||
import UrbitPrelude hiding (init)
|
||||
|
||||
import Data.RAcquire
|
||||
-- import Data.Conduit
|
||||
import Database.LMDB.Raw
|
||||
import Foreign.Marshal.Alloc
|
||||
import Foreign.Ptr
|
||||
import Vere.Pier.Types
|
||||
|
||||
import Foreign.Storable (peek, poke, sizeOf)
|
||||
|
||||
import qualified Data.ByteString.Unsafe as BU
|
||||
-- import qualified Data.Vector as V
|
||||
|
||||
|
||||
-- Types -----------------------------------------------------------------------
|
||||
|
||||
type Env = MDB_env
|
||||
type Val = MDB_val
|
||||
type Txn = MDB_txn
|
||||
type Dbi = MDB_dbi
|
||||
type Cur = MDB_cursor
|
||||
|
||||
data VereLMDBExn
|
||||
= NoLogIdentity
|
||||
| MissingEvent EventId
|
||||
| BadNounInLogIdentity ByteString DecodeErr ByteString
|
||||
| BadKeyInEventLog
|
||||
| BadWriteLogIdentity LogIdentity
|
||||
| BadWriteEvent EventId
|
||||
| BadWriteEffect EventId
|
||||
deriving Show
|
||||
|
||||
instance Exception VereLMDBExn where
|
||||
|
||||
|
||||
-- Transactions ----------------------------------------------------------------
|
||||
|
||||
{-
|
||||
A read-only transaction that commits at the end.
|
||||
|
||||
Use this when opening database handles.
|
||||
-}
|
||||
openTxn :: Env -> RAcquire e Txn
|
||||
openTxn env = mkRAcquire begin commit
|
||||
where
|
||||
begin = io $ mdb_txn_begin env Nothing True
|
||||
commit = io . mdb_txn_commit
|
||||
|
||||
{-
|
||||
A read-only transaction that aborts at the end.
|
||||
|
||||
Use this when reading data from already-opened databases.
|
||||
-}
|
||||
readTxn :: Env -> RAcquire e Txn
|
||||
readTxn env = mkRAcquire begin abort
|
||||
where
|
||||
begin = io $ mdb_txn_begin env Nothing True
|
||||
abort = io . mdb_txn_abort
|
||||
|
||||
{-
|
||||
A read-write transaction that commits upon sucessful completion and
|
||||
aborts on exception.
|
||||
|
||||
Use this when reading data from already-opened databases.
|
||||
-}
|
||||
writeTxn :: Env -> RAcquire e Txn
|
||||
writeTxn env = mkRAcquireType begin finalize
|
||||
where
|
||||
begin = io $ mdb_txn_begin env Nothing False
|
||||
finalize txn = io . \case
|
||||
ReleaseNormal -> mdb_txn_commit txn
|
||||
ReleaseEarly -> mdb_txn_commit txn
|
||||
ReleaseException -> mdb_txn_abort txn
|
||||
|
||||
|
||||
-- Cursors ---------------------------------------------------------------------
|
||||
|
||||
cursor :: Txn -> Dbi -> RAcquire e Cur
|
||||
cursor txn dbi = mkRAcquire open close
|
||||
where
|
||||
open = io $ mdb_cursor_open txn dbi
|
||||
close = io . mdb_cursor_close
|
||||
|
||||
|
||||
-- Last Key In Dbi -------------------------------------------------------------
|
||||
|
||||
lastKeyWord64 :: Env -> Dbi -> Txn -> RIO e Word64
|
||||
lastKeyWord64 env dbi txn =
|
||||
rwith (cursor txn dbi) $ \cur ->
|
||||
withKVPtrs' nullVal nullVal $ \pKey pVal ->
|
||||
io $ mdb_cursor_get MDB_LAST cur pKey pVal >>= \case
|
||||
False -> pure 0
|
||||
True -> peek pKey >>= mdbValToWord64
|
||||
|
||||
|
||||
-- Delete Rows -----------------------------------------------------------------
|
||||
|
||||
deleteAllRows :: Env -> Dbi -> RIO e ()
|
||||
deleteAllRows env dbi =
|
||||
rwith (writeTxn env) $ \txn ->
|
||||
rwith (cursor txn dbi) $ \cur ->
|
||||
withKVPtrs' nullVal nullVal $ \pKey pVal -> do
|
||||
let loop = io (mdb_cursor_get MDB_LAST cur pKey pVal) >>= \case
|
||||
False -> pure ()
|
||||
True -> do io $ mdb_cursor_del (compileWriteFlags []) cur
|
||||
loop
|
||||
loop
|
||||
|
||||
deleteRowsFrom :: HasLogFunc e => Env -> Dbi -> Word64 -> RIO e ()
|
||||
deleteRowsFrom env dbi start = do
|
||||
rwith (writeTxn env) $ \txn -> do
|
||||
last <- lastKeyWord64 env dbi txn
|
||||
for_ [start..last] $ \eId -> do
|
||||
withWordPtr eId $ \pKey -> do
|
||||
let key = MDB_val 8 (castPtr pKey)
|
||||
found <- io $ mdb_del txn dbi key Nothing
|
||||
unless found $
|
||||
throwIO (MissingEvent eId)
|
||||
|
||||
|
||||
-- Append Rows to Sequence -----------------------------------------------------
|
||||
|
||||
{-
|
||||
appendToSequence :: Env -> Dbi -> Vector ByteString -> RIO e ()
|
||||
appendToSequence env dbi events = do
|
||||
numEvs <- readIORef (numEvents log)
|
||||
next <- pure (numEvs + 1)
|
||||
doAppend $ zip [next..] $ toList events
|
||||
writeIORef (numEvents log) (numEvs + word (length events))
|
||||
where
|
||||
flags = compileWriteFlags [MDB_NOOVERWRITE]
|
||||
doAppend = \kvs ->
|
||||
rwith (writeTxn env) $ \txn ->
|
||||
for_ kvs $ \(k,v) -> do
|
||||
putBytes flags txn dbi k v >>= \case
|
||||
True -> pure ()
|
||||
False -> throwIO (BadWriteEvent k)
|
||||
-}
|
||||
|
||||
|
||||
-- Insert ----------------------------------------------------------------------
|
||||
|
||||
insertWord64 :: Env -> Dbi -> Word64 -> ByteString -> RIO e ()
|
||||
insertWord64 env dbi k v = do
|
||||
rwith (writeTxn env) $ \txn ->
|
||||
putBytes flags txn dbi k v >>= \case
|
||||
True -> pure ()
|
||||
False -> throwIO (BadWriteEffect k)
|
||||
where
|
||||
flags = compileWriteFlags []
|
||||
|
||||
|
||||
{-
|
||||
--------------------------------------------------------------------------------
|
||||
-- Read Events -----------------------------------------------------------------
|
||||
|
||||
streamEvents :: HasLogFunc e
|
||||
=> EventLog -> Word64
|
||||
-> ConduitT () ByteString (RIO e) ()
|
||||
streamEvents log first = do
|
||||
last <- lift $ lastEv log
|
||||
batch <- lift $ readBatch log first
|
||||
unless (null batch) $ do
|
||||
for_ batch yield
|
||||
streamEvents log (first + word (length batch))
|
||||
|
||||
streamEffectsRows :: ∀e. HasLogFunc e
|
||||
=> EventLog -> EventId
|
||||
-> ConduitT () (Word64, ByteString) (RIO e) ()
|
||||
streamEffectsRows log = go
|
||||
where
|
||||
go :: EventId -> ConduitT () (Word64, ByteString) (RIO e) ()
|
||||
go next = do
|
||||
batch <- lift $ readRowsBatch (env log) (effectsTbl log) next
|
||||
unless (null batch) $ do
|
||||
for_ batch yield
|
||||
go (next + fromIntegral (length batch))
|
||||
|
||||
{-
|
||||
Read 1000 rows from the events table, starting from event `first`.
|
||||
|
||||
Throws `MissingEvent` if an event was missing from the log.
|
||||
-}
|
||||
readBatch :: EventLog -> Word64 -> RIO e (V.Vector ByteString)
|
||||
readBatch log first = start
|
||||
where
|
||||
start = do
|
||||
last <- lastEv log
|
||||
if (first > last)
|
||||
then pure mempty
|
||||
else readRows $ fromIntegral $ min 1000 $ ((last+1) - first)
|
||||
|
||||
assertFound :: EventId -> Bool -> RIO e ()
|
||||
assertFound id found = do
|
||||
unless found $ throwIO $ MissingEvent id
|
||||
|
||||
readRows count =
|
||||
withWordPtr first $ \pIdx ->
|
||||
withKVPtrs' (MDB_val 8 (castPtr pIdx)) nullVal $ \pKey pVal ->
|
||||
rwith (readTxn $ env log) $ \txn ->
|
||||
rwith (cursor txn $ eventsTbl log) $ \cur -> do
|
||||
assertFound first =<< io (mdb_cursor_get MDB_SET_KEY cur pKey pVal)
|
||||
fetchRows count cur pKey pVal
|
||||
|
||||
fetchRows count cur pKey pVal = do
|
||||
env <- ask
|
||||
V.generateM count $ \i -> runRIO env $ do
|
||||
key <- io $ peek pKey >>= mdbValToWord64
|
||||
val <- io $ peek pVal >>= mdbValToBytes
|
||||
idx <- pure (first + word i)
|
||||
unless (key == idx) $ throwIO $ MissingEvent idx
|
||||
when (count /= succ i) $ do
|
||||
assertFound idx =<< io (mdb_cursor_get MDB_NEXT cur pKey pVal)
|
||||
pure val
|
||||
|
||||
{-
|
||||
Read 1000 rows from the database, starting from key `first`.
|
||||
-}
|
||||
readRowsBatch :: ∀e. HasLogFunc e
|
||||
=> Env -> Dbi -> Word64 -> RIO e (V.Vector (Word64, ByteString))
|
||||
readRowsBatch env dbi first = readRows
|
||||
where
|
||||
readRows = do
|
||||
logDebug $ display ("(readRowsBatch) From: " <> tshow first)
|
||||
withWordPtr first $ \pIdx ->
|
||||
withKVPtrs' (MDB_val 8 (castPtr pIdx)) nullVal $ \pKey pVal ->
|
||||
rwith (readTxn env) $ \txn ->
|
||||
rwith (cursor txn dbi) $ \cur ->
|
||||
io (mdb_cursor_get MDB_SET_RANGE cur pKey pVal) >>= \case
|
||||
False -> pure mempty
|
||||
True -> V.unfoldrM (fetchBatch cur pKey pVal) 1000
|
||||
|
||||
fetchBatch :: Cur -> Ptr Val -> Ptr Val -> Word
|
||||
-> RIO e (Maybe ((Word64, ByteString), Word))
|
||||
fetchBatch cur pKey pVal 0 = pure Nothing
|
||||
fetchBatch cur pKey pVal n = do
|
||||
key <- io $ peek pKey >>= mdbValToWord64
|
||||
val <- io $ peek pVal >>= mdbValToBytes
|
||||
io $ mdb_cursor_get MDB_NEXT cur pKey pVal >>= \case
|
||||
False -> pure $ Just ((key, val), 0)
|
||||
True -> pure $ Just ((key, val), pred n)
|
||||
|
||||
-}
|
||||
|
||||
-- Utils -----------------------------------------------------------------------
|
||||
|
||||
withKVPtrs' :: (MonadIO m, MonadUnliftIO m)
|
||||
=> Val -> Val -> (Ptr Val -> Ptr Val -> m a) -> m a
|
||||
withKVPtrs' k v cb =
|
||||
withRunInIO $ \run ->
|
||||
withKVPtrs k v $ \x y -> run (cb x y)
|
||||
|
||||
nullVal :: MDB_val
|
||||
nullVal = MDB_val 0 nullPtr
|
||||
|
||||
word :: Int -> Word64
|
||||
word = fromIntegral
|
||||
|
||||
assertExn :: Exception e => Bool -> e -> IO ()
|
||||
assertExn True _ = pure ()
|
||||
assertExn False e = throwIO e
|
||||
|
||||
eitherExn :: Exception e => Either a b -> (a -> e) -> IO b
|
||||
eitherExn eat exn = either (throwIO . exn) pure eat
|
||||
|
||||
byteStringAsMdbVal :: ByteString -> (MDB_val -> IO a) -> IO a
|
||||
byteStringAsMdbVal bs k =
|
||||
BU.unsafeUseAsCStringLen bs $ \(ptr,sz) ->
|
||||
k (MDB_val (fromIntegral sz) (castPtr ptr))
|
||||
|
||||
mdbValToWord64 :: MDB_val -> IO Word64
|
||||
mdbValToWord64 (MDB_val sz ptr) = do
|
||||
assertExn (sz == 8) BadKeyInEventLog
|
||||
peek (castPtr ptr)
|
||||
|
||||
withWord64AsMDBval :: (MonadIO m, MonadUnliftIO m)
|
||||
=> Word64 -> (MDB_val -> m a) -> m a
|
||||
withWord64AsMDBval w cb = do
|
||||
withWordPtr w $ \p ->
|
||||
cb (MDB_val (fromIntegral (sizeOf w)) (castPtr p))
|
||||
|
||||
withWordPtr :: (MonadIO m, MonadUnliftIO m)
|
||||
=> Word64 -> (Ptr Word64 -> m a) -> m a
|
||||
withWordPtr w cb =
|
||||
withRunInIO $ \run ->
|
||||
allocaBytes (sizeOf w) (\p -> poke p w >> run (cb p))
|
||||
|
||||
|
||||
-- Lower-Level Operations ------------------------------------------------------
|
||||
|
||||
getMb :: MonadIO m => Txn -> Dbi -> ByteString -> m (Maybe Noun)
|
||||
getMb txn db key =
|
||||
io $
|
||||
byteStringAsMdbVal key $ \mKey ->
|
||||
mdb_get txn db mKey >>= traverse (mdbValToNoun key)
|
||||
|
||||
mdbValToBytes :: MDB_val -> IO ByteString
|
||||
mdbValToBytes (MDB_val sz ptr) = do
|
||||
BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
|
||||
|
||||
mdbValToNoun :: ByteString -> MDB_val -> IO Noun
|
||||
mdbValToNoun key (MDB_val sz ptr) = do
|
||||
bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
|
||||
let res = cueBS bs
|
||||
eitherExn res (\err -> BadNounInLogIdentity key err bs)
|
||||
|
||||
putNoun :: MonadIO m
|
||||
=> MDB_WriteFlags -> Txn -> Dbi -> ByteString -> Noun -> m Bool
|
||||
putNoun flags txn db key val =
|
||||
io $
|
||||
byteStringAsMdbVal key $ \mKey ->
|
||||
byteStringAsMdbVal (jamBS val) $ \mVal ->
|
||||
mdb_put flags txn db mKey mVal
|
||||
|
||||
|
||||
putBytes :: MonadIO m
|
||||
=> MDB_WriteFlags -> Txn -> Dbi -> Word64 -> ByteString -> m Bool
|
||||
putBytes flags txn db id bs =
|
||||
io $
|
||||
withWord64AsMDBval id $ \idVal ->
|
||||
byteStringAsMdbVal bs $ \mVal ->
|
||||
mdb_put flags txn db idVal mVal
|
@ -2,9 +2,9 @@
|
||||
TODO Effects storage logic is messy.
|
||||
-}
|
||||
|
||||
module Vere.Log ( EventLog, identity, nextEv
|
||||
module Vere.Log ( EventLog, identity, nextEv, lastEv
|
||||
, new, existing
|
||||
, streamEvents, appendEvents
|
||||
, streamEvents, appendEvents, trimEvents
|
||||
, streamEffectsRows, writeEffectsRow
|
||||
) where
|
||||
|
||||
@ -251,6 +251,18 @@ writeEffectsRow log k v = do
|
||||
--------------------------------------------------------------------------------
|
||||
-- Read Events -----------------------------------------------------------------
|
||||
|
||||
trimEvents :: HasLogFunc e => EventLog -> Word64 -> RIO e ()
|
||||
trimEvents log start = do
|
||||
last <- lastEv log
|
||||
rwith (writeTxn $ env log) $ \txn ->
|
||||
for_ [start..last] $ \eId ->
|
||||
withWordPtr eId $ \pKey -> do
|
||||
let key = MDB_val 8 (castPtr pKey)
|
||||
found <- io $ mdb_del txn (eventsTbl log) key Nothing
|
||||
unless found $
|
||||
throwIO (MissingEvent eId)
|
||||
writeIORef (numEvents log) (pred start)
|
||||
|
||||
streamEvents :: HasLogFunc e
|
||||
=> EventLog -> Word64
|
||||
-> ConduitT () ByteString (RIO e) ()
|
||||
@ -318,22 +330,21 @@ readRowsBatch :: ∀e. HasLogFunc e
|
||||
readRowsBatch env dbi first = readRows
|
||||
where
|
||||
readRows = do
|
||||
logDebug $ displayShow ("readRows", first)
|
||||
logDebug $ display ("(readRowsBatch) From: " <> tshow first)
|
||||
withWordPtr first $ \pIdx ->
|
||||
withKVPtrs' (MDB_val 8 (castPtr pIdx)) nullVal $ \pKey pVal ->
|
||||
rwith (readTxn env) $ \txn ->
|
||||
rwith (cursor txn dbi) $ \cur ->
|
||||
io (mdb_cursor_get MDB_SET_RANGE cur pKey pVal) >>= \case
|
||||
False -> pure mempty
|
||||
True -> V.unfoldrM (fetchRows cur pKey pVal) 1000
|
||||
True -> V.unfoldrM (fetchBatch cur pKey pVal) 1000
|
||||
|
||||
fetchRows :: Cur -> Ptr Val -> Ptr Val -> Word
|
||||
fetchBatch :: Cur -> Ptr Val -> Ptr Val -> Word
|
||||
-> RIO e (Maybe ((Word64, ByteString), Word))
|
||||
fetchRows cur pKey pVal 0 = pure Nothing
|
||||
fetchRows cur pKey pVal n = do
|
||||
fetchBatch cur pKey pVal 0 = pure Nothing
|
||||
fetchBatch cur pKey pVal n = do
|
||||
key <- io $ peek pKey >>= mdbValToWord64
|
||||
val <- io $ peek pVal >>= mdbValToBytes
|
||||
logDebug $ displayShow ("fetchRows", n, key, val)
|
||||
io $ mdb_cursor_get MDB_NEXT cur pKey pVal >>= \case
|
||||
False -> pure $ Just ((key, val), 0)
|
||||
True -> pure $ Just ((key, val), pred n)
|
||||
|
166
pkg/king/lib/Vere/NounServ.hs
Normal file
166
pkg/king/lib/Vere/NounServ.hs
Normal file
@ -0,0 +1,166 @@
|
||||
module Vere.NounServ
|
||||
( Conn(..)
|
||||
, Server(..)
|
||||
, Client(..)
|
||||
, wsServer
|
||||
, wsClient
|
||||
, testIt
|
||||
) where
|
||||
|
||||
import UrbitPrelude
|
||||
|
||||
import qualified Network.Wai.Handler.Warp as W
|
||||
import qualified Network.WebSockets as WS
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
data Conn i o = Conn
|
||||
{ cRecv :: STM (Maybe i)
|
||||
, cSend :: o -> STM ()
|
||||
}
|
||||
|
||||
mkConn :: TBMChan i -> TBMChan o -> Conn i o
|
||||
mkConn inp out = Conn (readTBMChan inp) (writeTBMChan out)
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
data Client i o = Client
|
||||
{ cConn :: Conn i o
|
||||
, cAsync :: Async ()
|
||||
}
|
||||
|
||||
data Server i o a = Server
|
||||
{ sAccept :: STM (Maybe (Conn i o))
|
||||
, sAsync :: Async ()
|
||||
, sData :: a
|
||||
}
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
wsConn :: (FromNoun i, ToNoun o, Show o, HasLogFunc e)
|
||||
=> Utf8Builder
|
||||
-> TBMChan i -> TBMChan o
|
||||
-> WS.Connection
|
||||
-> RIO e ()
|
||||
wsConn pre inp out wsc = do
|
||||
env <- ask
|
||||
writer <- io $ async $ runRIO env $ forever $ do
|
||||
logWarn (pre <> "(wsConn) Waiting for data.")
|
||||
byt <- io $ toStrict <$> WS.receiveData wsc
|
||||
logWarn (pre <> "Got data")
|
||||
dat <- cueBSExn byt >>= fromNounExn
|
||||
logWarn (pre <> "(wsConn) Decoded data, writing to chan")
|
||||
atomically $ writeTBMChan inp dat
|
||||
|
||||
reader <- io $ async $ runRIO env $ forever $ do
|
||||
logWarn (pre <> "Waiting for data from chan")
|
||||
atomically (readTBMChan out) >>= \case
|
||||
Nothing -> do
|
||||
logWarn (pre <> "(wsConn) Connection closed")
|
||||
error "dead-conn"
|
||||
Just msg -> do
|
||||
logWarn (pre <> "(wsConn) Got message! " <> displayShow msg)
|
||||
io $ WS.sendBinaryData wsc $ fromStrict $ jamBS $ toNoun msg
|
||||
|
||||
res <- atomically (waitCatchSTM writer <|> waitCatchSTM reader)
|
||||
|
||||
logWarn $ displayShow (res :: Either SomeException ())
|
||||
|
||||
atomically (closeTBMChan inp >> closeTBMChan out)
|
||||
|
||||
cancel writer
|
||||
cancel reader
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
wsClient :: ∀i o e. (Show o, Show i, ToNoun o, FromNoun i, HasLogFunc e)
|
||||
=> W.Port -> RIO e (Client i o)
|
||||
wsClient por = do
|
||||
env <- ask
|
||||
inp <- io $ newTBMChanIO 5
|
||||
out <- io $ newTBMChanIO 5
|
||||
con <- pure (mkConn inp out)
|
||||
|
||||
logDebug "(wsClie) Trying to connect"
|
||||
|
||||
tid <- io $ async
|
||||
$ WS.runClient "127.0.0.1" por "/"
|
||||
$ runRIO env . wsConn "(wsClie) " inp out
|
||||
|
||||
pure $ Client con tid
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
wsServer :: ∀i o e. (Show o, Show i, ToNoun o, FromNoun i, HasLogFunc e)
|
||||
=> RIO e (Server i o W.Port)
|
||||
wsServer = do
|
||||
con <- io $ newTBMChanIO 5
|
||||
|
||||
let app pen = do
|
||||
logError "(wsServer) Got connection! Accepting"
|
||||
wsc <- io $ WS.acceptRequest pen
|
||||
inp <- io $ newTBMChanIO 5
|
||||
out <- io $ newTBMChanIO 5
|
||||
atomically $ writeTBMChan con (mkConn inp out)
|
||||
wsConn "(wsServ) " inp out wsc
|
||||
|
||||
tid <- async $ do
|
||||
env <- ask
|
||||
logError "(wsServer) Starting server"
|
||||
io $ WS.runServer "127.0.0.1" 9999 (runRIO env . app)
|
||||
logError "(wsServer) Server died"
|
||||
atomically $ closeTBMChan con
|
||||
|
||||
pure $ Server (readTBMChan con) tid 9999
|
||||
|
||||
|
||||
-- Hacky Integration Test ------------------------------------------------------
|
||||
|
||||
fromJust :: MonadIO m => Text -> Maybe a -> m a
|
||||
fromJust err Nothing = error (unpack err)
|
||||
fromJust _ (Just x) = pure x
|
||||
|
||||
type Example = Maybe (Word, (), Word)
|
||||
|
||||
example :: Example
|
||||
example = Just (99, (), 44)
|
||||
|
||||
testIt :: HasLogFunc e => RIO e ()
|
||||
testIt = do
|
||||
logTrace "(testIt) Starting Server"
|
||||
Server{..} <- wsServer @Example @Example
|
||||
logTrace "(testIt) Connecting"
|
||||
Client{..} <- wsClient @Example @Example sData
|
||||
|
||||
logTrace "(testIt) Accepting connection"
|
||||
sConn <- fromJust "accept" =<< atomically sAccept
|
||||
|
||||
let
|
||||
clientSend = do
|
||||
logTrace "(testIt) Sending from client"
|
||||
atomically (cSend cConn example)
|
||||
logTrace "(testIt) Waiting for response"
|
||||
res <- atomically (cRecv sConn)
|
||||
print ("clientSend", res, example)
|
||||
unless (res == Just example) $ do
|
||||
error "Bad data"
|
||||
logInfo "(testIt) Success"
|
||||
|
||||
serverSend = do
|
||||
logTrace "(testIt) Sending from server"
|
||||
atomically (cSend sConn example)
|
||||
logTrace "(testIt) Waiting for response"
|
||||
res <- atomically (cRecv cConn)
|
||||
print ("serverSend", res, example)
|
||||
unless (res == Just example) $ do
|
||||
error "Bad data"
|
||||
logInfo "(testIt) Success"
|
||||
|
||||
clientSend
|
||||
clientSend
|
||||
clientSend
|
||||
serverSend
|
||||
serverSend
|
||||
|
||||
cancel sAsync
|
||||
cancel cAsync
|
@ -49,9 +49,9 @@ generateBootSeq ship Pill{..} = do
|
||||
pure $ BootSeq ident pBootFormulas ovums
|
||||
where
|
||||
ident = LogIdentity ship True (fromIntegral $ length pBootFormulas)
|
||||
preKern ent = [ EvBlip $ BlipEvTerm $ TermEvBoot (49,()) (Fake (who ident))
|
||||
, EvBlip $ BlipEvArvo $ ArvoEvWhom () ship
|
||||
preKern ent = [ EvBlip $ BlipEvArvo $ ArvoEvWhom () ship
|
||||
, EvBlip $ BlipEvArvo $ ArvoEvWack () ent
|
||||
, EvBlip $ BlipEvTerm $ TermEvBoot (1,()) (Fake (who ident))
|
||||
]
|
||||
|
||||
|
||||
|
@ -229,18 +229,9 @@ sendLen s i = do
|
||||
|
||||
sendOrder :: HasLogFunc e => Serf e -> Order -> RIO e ()
|
||||
sendOrder w o = do
|
||||
logDebug $ display ("[Serf.sendOrder.toNoun] " <> tshow o)
|
||||
n <- evaluate (toNoun o)
|
||||
|
||||
case o of
|
||||
OWork (DoWork (Work _ _ _ e)) -> do logTrace $ displayShow $ toNoun (e::Ev)
|
||||
_ -> do pure ()
|
||||
|
||||
logDebug "[Serf.sendOrder.jam]"
|
||||
bs <- evaluate (jamBS n)
|
||||
logDebug $ display ("[Serf.sendOrder.send]: " <> tshow (length bs))
|
||||
sendBytes w bs
|
||||
logDebug "[Serf.sendOrder.sent]"
|
||||
logDebug $ display ("(sendOrder) " <> tshow o)
|
||||
sendBytes w $ jamBS $ toNoun o
|
||||
logDebug "(sendOrder) Done"
|
||||
|
||||
sendBytes :: HasLogFunc e => Serf e -> ByteString -> RIO e ()
|
||||
sendBytes s bs = handle ioErr $ do
|
||||
@ -298,9 +289,9 @@ shutdown serf code = sendOrder serf (OExit code)
|
||||
-}
|
||||
recvPlea :: HasLogFunc e => Serf e -> RIO e Plea
|
||||
recvPlea w = do
|
||||
logDebug "[Vere.Serf.recvPlea] waiting"
|
||||
logDebug "(recvPlea) Waiting"
|
||||
a <- recvAtom w
|
||||
logDebug "[Vere.Serf.recvPlea] got atom"
|
||||
logDebug "(recvPlea) Got atom"
|
||||
n <- fromRightExn (cue a) (const $ BadPleaAtom a)
|
||||
p <- fromRightExn (fromNounErr n) (\(p,m) -> BadPleaNoun (traceShowId n) p m)
|
||||
|
||||
|
@ -2,6 +2,12 @@ name: king
|
||||
version: 0.1.0
|
||||
license: AGPL-3.0-only
|
||||
|
||||
flags:
|
||||
Release:
|
||||
description: "Produce statically-linked executables"
|
||||
default: false
|
||||
manual: true
|
||||
|
||||
library:
|
||||
source-dirs: lib
|
||||
ghc-options:
|
||||
@ -87,6 +93,7 @@ dependencies:
|
||||
- wai-conduit
|
||||
- warp
|
||||
- warp-tls
|
||||
- websockets
|
||||
|
||||
default-extensions:
|
||||
- ApplicativeDo
|
||||
@ -138,9 +145,16 @@ executables:
|
||||
main: Main.hs
|
||||
source-dirs: app
|
||||
dependencies: ["king"]
|
||||
when:
|
||||
- condition: flag(Release)
|
||||
then:
|
||||
cc-options: -static
|
||||
ld-options: -static -pthread
|
||||
else: {}
|
||||
ghc-options:
|
||||
- -threaded
|
||||
- -rtsopts
|
||||
- -static
|
||||
- -O2
|
||||
- "-with-rtsopts=-N"
|
||||
- -fwarn-incomplete-patterns
|
||||
- -O0
|
||||
|
@ -18,6 +18,10 @@ nix:
|
||||
ghc-options:
|
||||
king: -fobject-code
|
||||
|
||||
flags:
|
||||
king:
|
||||
Release: false
|
||||
|
||||
# build:
|
||||
# executable-profiling: true
|
||||
# executable-stripping: false
|
||||
|
Loading…
Reference in New Issue
Block a user