We can read data from an lmdb event log.

(And maybe write, but we didn't test that.)
This commit is contained in:
Elliot Glaysher 2019-05-30 13:19:26 -07:00
parent 7cedae3f70
commit 3514439fe1
5 changed files with 182 additions and 61 deletions

View File

@ -1,7 +1,16 @@
-- TODO: Make sure transaction closed in all error cases -- TODO: Make sure transaction closed in all error cases
module Vere.Log where -- TODO: Don't allow writing non-contiguous events
module Vere.Log (
init,
shutdown,
-- we don't export write; you use the queue
readEvents,
latestEventNumber,
readLogIdentity,
writeLogIdentity
) where
import ClassyPrelude import ClassyPrelude hiding (init)
import Data.Noun import Data.Noun
import Data.Noun.Atom import Data.Noun.Atom
import Data.Noun.Jam import Data.Noun.Jam
@ -10,6 +19,7 @@ import Data.Void
import Database.LMDB.Raw import Database.LMDB.Raw
import Foreign.Ptr import Foreign.Ptr
import Foreign.Marshal.Alloc import Foreign.Marshal.Alloc
import Vere.Pier.Types
import Foreign.Storable (peek, poke, sizeOf) import Foreign.Storable (peek, poke, sizeOf)
@ -20,33 +30,26 @@ import qualified Data.Vector.Mutable as MV
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
-- TODO: We are uncertain about q's type. There's some serious entanglement -- TODO: Handle throws on the async
-- with u3_pier in this logic in the C code, and you might not be able to get init :: FilePath -> TQueue (Writ [Effect]) -> (Writ [Effect] -> STM ())
-- away with anything less than passing the full u3_writ around. -> IO LogState
data State = State init dir inp cb = do
{ env :: MDB_env
, q :: TQueue (Word64,Atom,Noun)
}
data LogIdentity = LogIdentity
{ who :: Noun
, is_fake :: Noun
, life :: Noun
}
--------------------------------------------------------------------------------
init :: FilePath -> IO State
init dir = do
env <- mdb_env_create env <- mdb_env_create
mdb_env_set_maxdbs env 3 mdb_env_set_maxdbs env 3
mdb_env_set_mapsize env (40 * 1024 * 1024 * 1024) mdb_env_set_mapsize env (40 * 1024 * 1024 * 1024)
mdb_env_open env dir [] mdb_env_open env dir []
tq <- newTQueueIO writer <- persistThread env inp cb
pure (State env tq) pure (LogState env inp cb writer)
shutdown :: State -> IO () -- TODO: properly handle shutdowns during write
shutdown s = mdb_env_close (env s) shutdown :: LogState -> IO ()
shutdown s = do
void $ waitCancel (writer s)
mdb_env_close (env s)
waitCancel :: Async a -> IO (Either SomeException a)
waitCancel async = cancel async >> waitCatch async
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
@ -76,19 +79,23 @@ mdbValToNoun (MDB_val sz ptr) = do
bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz) bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
maybe (error "mdb bad cue") pure (cue (packAtom bs)) maybe (error "mdb bad cue") pure (cue (packAtom bs))
putRaw :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> ByteString -> ByteString putRaw :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> MDB_val -> MDB_val -> IO ()
-> IO Bool
putRaw flags txn db key val = putRaw flags txn db key val =
byteStringAsMdbVal key \mKey -> mdb_put flags txn db key val >>= \case
byteStringAsMdbVal val \mVal ->
mdb_put flags txn db mKey mVal
put :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> ByteString -> Noun -> IO ()
put flags txn db bsKey val =
putRaw flags txn db bsKey bsVal >>= \case
True -> pure () True -> pure ()
False -> error "mdb bad put" False -> error "mdb bad put"
where bsVal = nounToBs val
putNoun :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> ByteString -> Noun -> IO ()
putNoun flags txn db key val =
byteStringAsMdbVal key $ \mKey ->
byteStringAsMdbVal (nounToBs val) $ \mVal ->
putRaw flags txn db mKey mVal
putJam :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> Word64 -> Jam -> IO ()
putJam flags txn db id (Jam atom) =
withWord64AsMDBval id $ \idVal ->
byteStringAsMdbVal (unpackAtom atom) $ \mVal ->
putRaw flags txn db idVal mVal
get :: MDB_txn -> MDB_dbi -> ByteString -> IO Noun get :: MDB_txn -> MDB_dbi -> ByteString -> IO Noun
get txn db key = get txn db key =
@ -100,24 +107,48 @@ mdbValToWord64 (MDB_val sz ptr) = do
assertErr (sz == 8) "wrong size in mdbValToWord64" assertErr (sz == 8) "wrong size in mdbValToWord64"
peek (castPtr ptr) peek (castPtr ptr)
withWord64AsMDBval :: Word64 -> (MDB_val -> IO a) -> IO a
withWord64AsMDBval w cb = do
withWordPtr w $ \p ->
cb (MDB_val (fromIntegral (sizeOf w)) (castPtr p))
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
withWordPtr :: Word64 -> (Ptr Word64 -> IO a) -> IO a withWordPtr :: Word64 -> (Ptr Word64 -> IO a) -> IO a
withWordPtr w cb = do withWordPtr w cb = do
allocaBytes (sizeOf w) (\p -> poke p w >> cb p) allocaBytes (sizeOf w) (\p -> poke p w >> cb p)
-- TODO: We need to be able to send back an exception to the main thread on an
-- exception on the persistence thread.
persistThread :: MDB_env
-> TQueue (Writ [Effect])
-> (Writ [Effect] -> STM ())
-> IO (Async ())
persistThread env inputQueue onPersist = async $
do
writs <- atomically $ readQueue inputQueue
writeEvents writs
atomically $ traverse_ onPersist writs
where
writeEvents writs = do
txn <- mdb_txn_begin env Nothing False
db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY]
writeEvent :: State -> Word64 -> Atom -> Noun -> IO () let flags = compileWriteFlags [MDB_NOOVERWRITE]
writeEvent s id event effect = atomically $
writeTQueue (q s) (id, event, effect)
for_ writs $ \w ->
putJam flags txn db (eventId w) (event w)
mdb_txn_commit txn
deriving instance Show MDB_val
-- TODO: This will read len items and will error if there are less than that -- TODO: This will read len items and will error if there are less than that
-- available. This differs from the current pier.c's expectations. -- available. This differs from the current pier.c's expectations.
readEvents :: MDB_env -> Word64 -> Word64 -> IO (V.Vector (Word64,Atom)) readEvents :: LogState -> Word64 -> Word64 -> IO (V.Vector (Word64,Atom))
readEvents env first len = readEvents (LogState env _ _ _) first len =
withWordPtr first $ \pIdx -> withWordPtr first $ \pIdx ->
withKVPtrs (MDB_val 64 (castPtr pIdx)) (MDB_val 0 nullPtr) $ \pKey pVal -> withKVPtrs (MDB_val 8 (castPtr pIdx)) (MDB_val 0 nullPtr) $ \pKey pVal ->
do do
txn <- mdb_txn_begin env Nothing True txn <- mdb_txn_begin env Nothing True
db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY] db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY]
@ -132,10 +163,12 @@ readEvents env first len =
let idx = first + (fromIntegral i) let idx = first + (fromIntegral i)
assertErr (key /= idx) "missing event in database" assertErr (key == idx) ("missing event in database " <> (show idx))
when (i + 1 /= (int len)) do
found <- mdb_cursor_get MDB_NEXT cur pKey pVal
assertErr found "lmdb: next event not found"
found <- mdb_cursor_get MDB_NEXT cur pKey pVal
assertErr found "lmdb: next event not found"
pure (idx, val) pure (idx, val)
mdb_cursor_close cur mdb_cursor_close cur
@ -143,7 +176,6 @@ readEvents env first len =
pure vec pure vec
int :: Word64 -> Int int :: Word64 -> Int
int = fromIntegral int = fromIntegral
@ -151,10 +183,10 @@ assertErr :: Bool -> String -> IO ()
assertErr True _ = pure () assertErr True _ = pure ()
assertErr False m = error m assertErr False m = error m
latestEventNumber :: MDB_env -> IO Word64 latestEventNumber :: LogState -> IO Word64
latestEventNumber env = latestEventNumber (LogState env _ _ _) =
do do
txn <- mdb_txn_begin env Nothing False txn <- mdb_txn_begin env Nothing True
db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY] db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY]
cur <- mdb_cursor_open txn db cur <- mdb_cursor_open txn db
res <- fetch txn db cur res <- fetch txn db cur
@ -170,22 +202,10 @@ latestEventNumber env =
False -> pure 0 False -> pure 0
True -> peek pKey >>= mdbValToWord64 True -> peek pKey >>= mdbValToWord64
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
writeLogIdentity :: MDB_env -> LogIdentity -> IO () readLogIdentity :: LogState -> IO LogIdentity
writeLogIdentity env LogIdentity{..} = do readLogIdentity (LogState env _ _ _) = do
txn <- mdb_txn_begin env Nothing False
db <- mdb_dbi_open txn (Just "META") []
let flags = compileWriteFlags []
put flags txn db "who" who
put flags txn db "is-fake" is_fake
put flags txn db "life" life
mdb_txn_commit txn
pure ()
readLogIdentity :: MDB_env -> IO LogIdentity
readLogIdentity env = do
txn <- mdb_txn_begin env Nothing True txn <- mdb_txn_begin env Nothing True
db <- mdb_dbi_open txn (Just "META") [] db <- mdb_dbi_open txn (Just "META") []
who <- get txn db "who" who <- get txn db "who"
@ -193,3 +213,14 @@ readLogIdentity env = do
life <- get txn db "life" life <- get txn db "life"
mdb_txn_abort txn mdb_txn_abort txn
pure (LogIdentity who is_fake life) pure (LogIdentity who is_fake life)
writeLogIdentity :: LogState -> LogIdentity -> IO ()
writeLogIdentity (LogState env _ _ _) LogIdentity{..} = do
txn <- mdb_txn_begin env Nothing False
db <- mdb_dbi_open txn (Just "META") []
let flags = compileWriteFlags []
putNoun flags txn db "who" who
putNoun flags txn db "is-fake" is_fake
putNoun flags txn db "life" life
mdb_txn_commit txn
pure ()

17
pkg/hair/lib/Vere/Pier.hs Normal file
View File

@ -0,0 +1,17 @@
module Vere.Pier where
import ClassyPrelude
import Vere.Pier.Types
import qualified Vere.Log as Log
initPier :: FilePath -> IO Pier
initPier top = do
let logPath = top <> "/log"
computeQueue <- newTQueueIO
persistQueue <- newTQueueIO
releaseQueue <- newTQueueIO
logState <- Log.init logPath persistQueue (writeTQueue releaseQueue)
pure (Pier{..})

View File

@ -0,0 +1,45 @@
module Vere.Pier.Types where
import ClassyPrelude
import Data.Noun
import Data.Noun.Atom
import Database.LMDB.Raw
import Urbit.Time
data Effect
data Ovum
newtype Mug = Mug Word32
newtype Jam = Jam Atom
data Writ a = Writ
{ eventId :: Word64
, job :: (Wen, Ovum) -- (pair date ovum)
, timeout :: Maybe Word
, mug :: Mug
, event :: Jam -- mat
, payload :: a
}
data Pier = Pier
{ computeQueue :: TQueue (Writ Word)
, persistQueue :: TQueue (Writ [Effect])
, releaseQueue :: TQueue (Writ [Effect])
, logState :: LogState
}
-- TODO: We are uncertain about q's type. There's some serious entanglement
-- with u3_pier in this logic in the C code, and you might not be able to get
-- away with anything less than passing the full u3_writ around.
data LogState = LogState
{ env :: MDB_env
, inputQueue :: TQueue (Writ [Effect])
, onPersist :: Writ [Effect] -> STM ()
, writer :: Async ()
}
data LogIdentity = LogIdentity
{ who :: Noun
, is_fake :: Noun
, life :: Noun
} deriving Show

View File

@ -0,0 +1,27 @@
module Vere.TestReadPier where
import ClassyPrelude
import Data.Noun.Jam
import qualified Vere.Log as Log
main :: IO ()
main = do
let logPath = "/Users/erg/src/urbit/zod/.urb/log/"
-- junk
persistQueue <- newTQueueIO
releaseQueue <- newTQueueIO
logState <- Log.init logPath persistQueue (writeTQueue releaseQueue)
--
log <- Log.readLogIdentity logState
print log
--
latestEvent <- Log.latestEventNumber logState
print latestEvent
--
events <- Log.readEvents logState 1000 1
print $ cue . snd <$> events

View File

@ -102,6 +102,7 @@ default-extensions:
- Rank2Types - Rank2Types
- RankNTypes - RankNTypes
- RecordWildCards - RecordWildCards
- StandaloneDeriving
- ScopedTypeVariables - ScopedTypeVariables
- TemplateHaskell - TemplateHaskell
- TupleSections - TupleSections