From 3514439fe1e26a4732ff2a33e778652c43ccaeb7 Mon Sep 17 00:00:00 2001 From: Elliot Glaysher Date: Thu, 30 May 2019 13:19:26 -0700 Subject: [PATCH] We can read data from an lmdb event log. (And maybe write, but we didn't test that.) --- pkg/hair/lib/Vere/Log.hs | 153 ++++++++++++++++++------------ pkg/hair/lib/Vere/Pier.hs | 17 ++++ pkg/hair/lib/Vere/Pier/Types.hs | 45 +++++++++ pkg/hair/lib/Vere/TestReadPier.hs | 27 ++++++ pkg/hair/package.yaml | 1 + 5 files changed, 182 insertions(+), 61 deletions(-) create mode 100644 pkg/hair/lib/Vere/Pier.hs create mode 100644 pkg/hair/lib/Vere/Pier/Types.hs create mode 100644 pkg/hair/lib/Vere/TestReadPier.hs diff --git a/pkg/hair/lib/Vere/Log.hs b/pkg/hair/lib/Vere/Log.hs index dbb3ca0f2..638cbf678 100644 --- a/pkg/hair/lib/Vere/Log.hs +++ b/pkg/hair/lib/Vere/Log.hs @@ -1,7 +1,16 @@ -- TODO: Make sure transaction closed in all error cases -module Vere.Log where +-- TODO: Don't allow writing non-contiguous events +module Vere.Log ( + init, + shutdown, + -- we don't export write; you use the queue + readEvents, + latestEventNumber, + readLogIdentity, + writeLogIdentity +) where -import ClassyPrelude +import ClassyPrelude hiding (init) import Data.Noun import Data.Noun.Atom import Data.Noun.Jam @@ -10,6 +19,7 @@ import Data.Void import Database.LMDB.Raw import Foreign.Ptr import Foreign.Marshal.Alloc +import Vere.Pier.Types import Foreign.Storable (peek, poke, sizeOf) @@ -20,33 +30,26 @@ import qualified Data.Vector.Mutable as MV -------------------------------------------------------------------------------- --- TODO: We are uncertain about q's type. There's some serious entanglement --- with u3_pier in this logic in the C code, and you might not be able to get --- away with anything less than passing the full u3_writ around. -data State = State - { env :: MDB_env - , q :: TQueue (Word64,Atom,Noun) - } - -data LogIdentity = LogIdentity - { who :: Noun - , is_fake :: Noun - , life :: Noun - } - --------------------------------------------------------------------------------- - -init :: FilePath -> IO State -init dir = do +-- TODO: Handle throws on the async +init :: FilePath -> TQueue (Writ [Effect]) -> (Writ [Effect] -> STM ()) + -> IO LogState +init dir inp cb = do env <- mdb_env_create mdb_env_set_maxdbs env 3 mdb_env_set_mapsize env (40 * 1024 * 1024 * 1024) mdb_env_open env dir [] - tq <- newTQueueIO - pure (State env tq) + writer <- persistThread env inp cb + pure (LogState env inp cb writer) -shutdown :: State -> IO () -shutdown s = mdb_env_close (env s) +-- TODO: properly handle shutdowns during write +shutdown :: LogState -> IO () +shutdown s = do + void $ waitCancel (writer s) + mdb_env_close (env s) + + +waitCancel :: Async a -> IO (Either SomeException a) +waitCancel async = cancel async >> waitCatch async -------------------------------------------------------------------------------- @@ -76,19 +79,23 @@ mdbValToNoun (MDB_val sz ptr) = do bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz) maybe (error "mdb bad cue") pure (cue (packAtom bs)) -putRaw :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> ByteString -> ByteString - -> IO Bool +putRaw :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> MDB_val -> MDB_val -> IO () putRaw flags txn db key val = - byteStringAsMdbVal key \mKey -> - byteStringAsMdbVal val \mVal -> - mdb_put flags txn db mKey mVal - -put :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> ByteString -> Noun -> IO () -put flags txn db bsKey val = - putRaw flags txn db bsKey bsVal >>= \case + mdb_put flags txn db key val >>= \case True -> pure () False -> error "mdb bad put" - where bsVal = nounToBs val + +putNoun :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> ByteString -> Noun -> IO () +putNoun flags txn db key val = + byteStringAsMdbVal key $ \mKey -> + byteStringAsMdbVal (nounToBs val) $ \mVal -> + putRaw flags txn db mKey mVal + +putJam :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> Word64 -> Jam -> IO () +putJam flags txn db id (Jam atom) = + withWord64AsMDBval id $ \idVal -> + byteStringAsMdbVal (unpackAtom atom) $ \mVal -> + putRaw flags txn db idVal mVal get :: MDB_txn -> MDB_dbi -> ByteString -> IO Noun get txn db key = @@ -100,24 +107,48 @@ mdbValToWord64 (MDB_val sz ptr) = do assertErr (sz == 8) "wrong size in mdbValToWord64" peek (castPtr ptr) +withWord64AsMDBval :: Word64 -> (MDB_val -> IO a) -> IO a +withWord64AsMDBval w cb = do + withWordPtr w $ \p -> + cb (MDB_val (fromIntegral (sizeOf w)) (castPtr p)) + -------------------------------------------------------------------------------- withWordPtr :: Word64 -> (Ptr Word64 -> IO a) -> IO a withWordPtr w cb = do allocaBytes (sizeOf w) (\p -> poke p w >> cb p) +-- TODO: We need to be able to send back an exception to the main thread on an +-- exception on the persistence thread. +persistThread :: MDB_env + -> TQueue (Writ [Effect]) + -> (Writ [Effect] -> STM ()) + -> IO (Async ()) +persistThread env inputQueue onPersist = async $ + do + writs <- atomically $ readQueue inputQueue + writeEvents writs + atomically $ traverse_ onPersist writs + where + writeEvents writs = do + txn <- mdb_txn_begin env Nothing False + db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY] -writeEvent :: State -> Word64 -> Atom -> Noun -> IO () -writeEvent s id event effect = atomically $ - writeTQueue (q s) (id, event, effect) + let flags = compileWriteFlags [MDB_NOOVERWRITE] + for_ writs $ \w -> + putJam flags txn db (eventId w) (event w) + + mdb_txn_commit txn + +deriving instance Show MDB_val -- TODO: This will read len items and will error if there are less than that -- available. This differs from the current pier.c's expectations. -readEvents :: MDB_env -> Word64 -> Word64 -> IO (V.Vector (Word64,Atom)) -readEvents env first len = +readEvents :: LogState -> Word64 -> Word64 -> IO (V.Vector (Word64,Atom)) +readEvents (LogState env _ _ _) first len = withWordPtr first $ \pIdx -> - withKVPtrs (MDB_val 64 (castPtr pIdx)) (MDB_val 0 nullPtr) $ \pKey pVal -> + withKVPtrs (MDB_val 8 (castPtr pIdx)) (MDB_val 0 nullPtr) $ \pKey pVal -> do txn <- mdb_txn_begin env Nothing True db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY] @@ -132,10 +163,12 @@ readEvents env first len = let idx = first + (fromIntegral i) - assertErr (key /= idx) "missing event in database" + assertErr (key == idx) ("missing event in database " <> (show idx)) + + when (i + 1 /= (int len)) do + found <- mdb_cursor_get MDB_NEXT cur pKey pVal + assertErr found "lmdb: next event not found" - found <- mdb_cursor_get MDB_NEXT cur pKey pVal - assertErr found "lmdb: next event not found" pure (idx, val) mdb_cursor_close cur @@ -143,7 +176,6 @@ readEvents env first len = pure vec - int :: Word64 -> Int int = fromIntegral @@ -151,10 +183,10 @@ assertErr :: Bool -> String -> IO () assertErr True _ = pure () assertErr False m = error m -latestEventNumber :: MDB_env -> IO Word64 -latestEventNumber env = +latestEventNumber :: LogState -> IO Word64 +latestEventNumber (LogState env _ _ _) = do - txn <- mdb_txn_begin env Nothing False + txn <- mdb_txn_begin env Nothing True db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY] cur <- mdb_cursor_open txn db res <- fetch txn db cur @@ -170,22 +202,10 @@ latestEventNumber env = False -> pure 0 True -> peek pKey >>= mdbValToWord64 - -------------------------------------------------------------------------------- -writeLogIdentity :: MDB_env -> LogIdentity -> IO () -writeLogIdentity env LogIdentity{..} = do - txn <- mdb_txn_begin env Nothing False - db <- mdb_dbi_open txn (Just "META") [] - let flags = compileWriteFlags [] - put flags txn db "who" who - put flags txn db "is-fake" is_fake - put flags txn db "life" life - mdb_txn_commit txn - pure () - -readLogIdentity :: MDB_env -> IO LogIdentity -readLogIdentity env = do +readLogIdentity :: LogState -> IO LogIdentity +readLogIdentity (LogState env _ _ _) = do txn <- mdb_txn_begin env Nothing True db <- mdb_dbi_open txn (Just "META") [] who <- get txn db "who" @@ -193,3 +213,14 @@ readLogIdentity env = do life <- get txn db "life" mdb_txn_abort txn pure (LogIdentity who is_fake life) + +writeLogIdentity :: LogState -> LogIdentity -> IO () +writeLogIdentity (LogState env _ _ _) LogIdentity{..} = do + txn <- mdb_txn_begin env Nothing False + db <- mdb_dbi_open txn (Just "META") [] + let flags = compileWriteFlags [] + putNoun flags txn db "who" who + putNoun flags txn db "is-fake" is_fake + putNoun flags txn db "life" life + mdb_txn_commit txn + pure () diff --git a/pkg/hair/lib/Vere/Pier.hs b/pkg/hair/lib/Vere/Pier.hs new file mode 100644 index 000000000..d86f544b7 --- /dev/null +++ b/pkg/hair/lib/Vere/Pier.hs @@ -0,0 +1,17 @@ +module Vere.Pier where + +import ClassyPrelude +import Vere.Pier.Types +import qualified Vere.Log as Log + +initPier :: FilePath -> IO Pier +initPier top = do + let logPath = top <> "/log" + + computeQueue <- newTQueueIO + persistQueue <- newTQueueIO + releaseQueue <- newTQueueIO + + logState <- Log.init logPath persistQueue (writeTQueue releaseQueue) + + pure (Pier{..}) diff --git a/pkg/hair/lib/Vere/Pier/Types.hs b/pkg/hair/lib/Vere/Pier/Types.hs new file mode 100644 index 000000000..536cdcb81 --- /dev/null +++ b/pkg/hair/lib/Vere/Pier/Types.hs @@ -0,0 +1,45 @@ +module Vere.Pier.Types where + +import ClassyPrelude +import Data.Noun +import Data.Noun.Atom +import Database.LMDB.Raw +import Urbit.Time + +data Effect +data Ovum +newtype Mug = Mug Word32 + +newtype Jam = Jam Atom + +data Writ a = Writ + { eventId :: Word64 + , job :: (Wen, Ovum) -- (pair date ovum) + , timeout :: Maybe Word + , mug :: Mug + , event :: Jam -- mat + , payload :: a + } + +data Pier = Pier + { computeQueue :: TQueue (Writ Word) + , persistQueue :: TQueue (Writ [Effect]) + , releaseQueue :: TQueue (Writ [Effect]) + , logState :: LogState + } + +-- TODO: We are uncertain about q's type. There's some serious entanglement +-- with u3_pier in this logic in the C code, and you might not be able to get +-- away with anything less than passing the full u3_writ around. +data LogState = LogState + { env :: MDB_env + , inputQueue :: TQueue (Writ [Effect]) + , onPersist :: Writ [Effect] -> STM () + , writer :: Async () + } + +data LogIdentity = LogIdentity + { who :: Noun + , is_fake :: Noun + , life :: Noun + } deriving Show diff --git a/pkg/hair/lib/Vere/TestReadPier.hs b/pkg/hair/lib/Vere/TestReadPier.hs new file mode 100644 index 000000000..d29f8aa14 --- /dev/null +++ b/pkg/hair/lib/Vere/TestReadPier.hs @@ -0,0 +1,27 @@ +module Vere.TestReadPier where + +import ClassyPrelude +import Data.Noun.Jam +import qualified Vere.Log as Log + +main :: IO () +main = do + let logPath = "/Users/erg/src/urbit/zod/.urb/log/" + + -- junk + persistQueue <- newTQueueIO + releaseQueue <- newTQueueIO + logState <- Log.init logPath persistQueue (writeTQueue releaseQueue) + + -- + log <- Log.readLogIdentity logState + print log + + -- + latestEvent <- Log.latestEventNumber logState + print latestEvent + + -- + events <- Log.readEvents logState 1000 1 + print $ cue . snd <$> events + diff --git a/pkg/hair/package.yaml b/pkg/hair/package.yaml index d0493b2e2..64f98297a 100644 --- a/pkg/hair/package.yaml +++ b/pkg/hair/package.yaml @@ -102,6 +102,7 @@ default-extensions: - Rank2Types - RankNTypes - RecordWildCards + - StandaloneDeriving - ScopedTypeVariables - TemplateHaskell - TupleSections