TODO Effects storage logic is messy.
module Vere.Log ( EventLog, identity, nextEv, lastEv
, new, existing
, streamEvents, appendEvents, trimEvents
, streamEffectsRows, writeEffectsRow
) where
import UrbitPrelude hiding (init)
import Data.RAcquire
import Data.Conduit
import Database.LMDB.Raw
import Foreign.Marshal.Alloc
import Foreign.Ptr
import Vere.Pier.Types
import Foreign.Storable (peek, poke, sizeOf)
import qualified Data.ByteString.Unsafe as BU
import qualified Data.Vector as V
-- Types -----------------------------------------------------------------------
type Env = MDB_env
type Val = MDB_val
type Txn = MDB_txn
type Dbi = MDB_dbi
type Cur = MDB_cursor
data EventLog = EventLog
{ env :: Env
, _metaTbl :: Dbi
, eventsTbl :: Dbi
, effectsTbl :: Dbi
, identity :: LogIdentity
, numEvents :: IORef EventId
nextEv :: EventLog -> RIO e EventId
nextEv = fmap succ . readIORef . numEvents
2019-08-29 03:26:59 +03:00
lastEv :: EventLog -> RIO e EventId
lastEv = readIORef . numEvents
data EventLogExn
= NoLogIdentity
| MissingEvent EventId
| BadNounInLogIdentity ByteString DecodeErr ByteString
| BadKeyInEventLog
| BadWriteLogIdentity LogIdentity
| BadWriteEvent EventId
| BadWriteEffect EventId
deriving Show
-- Instances -------------------------------------------------------------------
instance Exception EventLogExn where
-- Open/Close an Event Log -----------------------------------------------------
2019-08-29 03:26:59 +03:00
rawOpen :: MonadIO m => FilePath -> m Env
rawOpen dir = io $ do
env <- mdb_env_create
mdb_env_set_maxdbs env 3
mdb_env_set_mapsize env (40 * 1024 * 1024 * 1024)
mdb_env_open env dir []
pure env
create :: HasLogFunc e => FilePath -> LogIdentity -> RIO e EventLog
create dir id = do
logDebug $ display (pack @Text $ "Creating LMDB database: " <> dir)
logDebug $ display (pack @Text $ "Log Identity: " <> show id)
env <- rawOpen dir
(m, e, f) <- createTables env
clearEvents env e
writeIdent env m id
EventLog env m e f id <$> newIORef 0
createTables env =
2019-08-29 03:26:59 +03:00
rwith (writeTxn env) $ \txn -> io $
2019-08-21 02:32:46 +03:00
(,,) <$> mdb_dbi_open txn (Just "META") [MDB_CREATE]
<*> mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY]
<*> mdb_dbi_open txn (Just "EFFECTS") [MDB_CREATE, MDB_INTEGERKEY]
open :: HasLogFunc e => FilePath -> RIO e EventLog
open dir = do
2019-08-29 03:26:59 +03:00
logDebug $ display (pack @Text $ "Opening LMDB database: " <> dir)
2019-08-21 02:32:46 +03:00
env <- rawOpen dir
(m, e, f) <- openTables env
id <- getIdent env m
logDebug $ display (pack @Text $ "Log Identity: " <> show id)
2019-08-21 02:32:46 +03:00
numEvs <- getNumEvents env e
EventLog env m e f id <$> newIORef numEvs
openTables env =
2019-08-29 03:26:59 +03:00
rwith (writeTxn env) $ \txn -> io $
2019-08-21 02:32:46 +03:00
(,,) <$> mdb_dbi_open txn (Just "META") []
<*> mdb_dbi_open txn (Just "EVENTS") [MDB_INTEGERKEY]
<*> mdb_dbi_open txn (Just "EFFECTS") [MDB_CREATE, MDB_INTEGERKEY]
close :: HasLogFunc e => FilePath -> EventLog -> RIO e ()
close dir (EventLog env meta events effects _ _) = do
logDebug $ display (pack @Text $ "Closing LMDB database: " <> dir)
io $ do mdb_dbi_close env meta
mdb_dbi_close env events
mdb_dbi_close env effects
mdb_env_sync_flush env
mdb_env_close env
-- Create a new event log or open an existing one. -----------------------------
2019-08-29 03:26:59 +03:00
existing :: HasLogFunc e => FilePath -> RAcquire e EventLog
existing dir = mkRAcquire (open dir) (close dir)
2019-08-29 03:26:59 +03:00
new :: HasLogFunc e => FilePath -> LogIdentity -> RAcquire e EventLog
new dir id = mkRAcquire (create dir id) (close dir)
-- Read/Write Log Identity -----------------------------------------------------
2019-05-30 02:43:51 +03:00
A read-only transaction that commits at the end.
Use this when opening database handles.
2019-08-29 03:26:59 +03:00
_openTxn :: Env -> RAcquire e Txn
_openTxn env = mkRAcquire begin commit
2019-08-29 03:26:59 +03:00
begin = io $ mdb_txn_begin env Nothing True
commit = io . mdb_txn_commit
A read-only transaction that aborts at the end.
Use this when reading data from already-opened databases.
2019-08-29 03:26:59 +03:00
readTxn :: Env -> RAcquire e Txn
readTxn env = mkRAcquire begin abort
2019-08-29 03:26:59 +03:00
begin = io $ mdb_txn_begin env Nothing True
abort = io . mdb_txn_abort
A read-write transaction that commits upon sucessful completion and
aborts on exception.
Use this when reading data from already-opened databases.
2019-08-29 03:26:59 +03:00
writeTxn :: Env -> RAcquire e Txn
writeTxn env = mkRAcquireType begin finalize
2019-08-29 03:26:59 +03:00
begin = io $ mdb_txn_begin env Nothing False
finalize txn = io . \case
ReleaseNormal -> mdb_txn_commit txn
ReleaseEarly -> mdb_txn_commit txn
ReleaseException -> mdb_txn_abort txn
cursor :: Txn -> Dbi -> RAcquire e Cur
cursor txn dbi = mkRAcquire open close
2019-08-29 03:26:59 +03:00
open = io $ mdb_cursor_open txn dbi
close = io . mdb_cursor_close
getIdent :: HasLogFunc e => Env -> Dbi -> RIO e LogIdentity
getIdent env dbi = do
logDebug "Reading log identity"
getTbl env >>= traverse decodeIdent >>= \case
Nothing -> throwIO NoLogIdentity
2019-08-29 03:26:59 +03:00
decodeIdent :: (Noun, Noun, Noun) -> RIO e LogIdentity
decodeIdent = fromNounExn . toNoun
2019-08-29 03:26:59 +03:00
getTbl :: Env -> RIO e (Maybe (Noun, Noun, Noun))
getTbl env = do
rwith (readTxn env) $ \txn -> do
who <- getMb txn dbi "who"
fake <- getMb txn dbi "is-fake"
life <- getMb txn dbi "life"
pure $ (,,) <$> who <*> fake <*> life
2019-08-29 03:26:59 +03:00
writeIdent env metaTbl ident@LogIdentity{..} = do
2019-08-29 03:26:59 +03:00
logDebug "Writing log identity"
let flags = compileWriteFlags []
2019-08-29 03:26:59 +03:00
rwith (writeTxn env) $ \txn -> do
x <- putNoun flags txn metaTbl "who" (toNoun who)
y <- putNoun flags txn metaTbl "is-fake" (toNoun isFake)
z <- putNoun flags txn metaTbl "life" (toNoun lifecycleLen)
unless (x && y && z) $ do
throwIO (BadWriteLogIdentity ident)
-- Latest Event Number ---------------------------------------------------------
2019-08-29 03:26:59 +03:00
getNumEvents :: Env -> Dbi -> RIO e Word64
getNumEvents env eventsTbl =
rwith (readTxn env) $ \txn ->
rwith (cursor txn eventsTbl) $ \cur ->
withKVPtrs' nullVal nullVal $ \pKey pVal ->
io $ mdb_cursor_get MDB_LAST cur pKey pVal >>= \case
False -> pure 0
True -> peek pKey >>= mdbValToWord64
-- Write Events ----------------------------------------------------------------
2019-08-29 03:26:59 +03:00
clearEvents :: Env -> Dbi -> RIO e ()
clearEvents env eventsTbl =
2019-08-29 03:26:59 +03:00
rwith (cursor txn eventsTbl) $ \cur ->
withKVPtrs' nullVal nullVal $ \pKey pVal -> do
let loop = io (mdb_cursor_get MDB_LAST cur pKey pVal) >>= \case
False -> pure ()
2019-08-29 03:26:59 +03:00
True -> do io $ mdb_cursor_del (compileWriteFlags []) cur
2019-08-29 03:26:59 +03:00
appendEvents log !events = do
numEvs <- readIORef (numEvents log)
next <- pure (numEvs + 1)
doAppend $ zip [next..] $ toList events
writeIORef (numEvents log) (numEvs + word (length events))
flags = compileWriteFlags [MDB_NOOVERWRITE]
doAppend = \kvs ->
2019-08-29 03:26:59 +03:00
for_ kvs $ \(k,v) -> do
2019-08-21 02:32:46 +03:00
putBytes flags txn (eventsTbl log) k v >>= \case
True -> pure ()
False -> throwIO (BadWriteEvent k)
2019-05-30 02:19:07 +03:00
writeEffectsRow :: EventLog -> EventId -> ByteString -> RIO e ()
2019-08-21 02:32:46 +03:00
writeEffectsRow log k v = do
2019-08-29 03:26:59 +03:00
rwith (writeTxn $ env log) $ \txn ->
2019-08-21 02:32:46 +03:00
putBytes flags txn (effectsTbl log) k v >>= \case
True -> pure ()
False -> throwIO (BadWriteEffect k)
flags = compileWriteFlags []
2019-08-21 02:32:46 +03:00
-- Read Events -----------------------------------------------------------------
2019-05-30 02:19:07 +03:00
trimEvents :: HasLogFunc e => EventLog -> Word64 -> RIO e ()
trimEvents log start = do
rwith (writeTxn $ env log) $ \txn -> do
logError "(trimEvents): Not implemented."
pure ()
writeIORef (numEvents log) (pred start)
streamEvents :: HasLogFunc e
=> EventLog -> Word64
2019-08-29 03:26:59 +03:00
-> ConduitT () ByteString (RIO e) ()
streamEvents log first = do
2019-08-29 03:26:59 +03:00
batch <- lift $ readBatch log first
unless (null batch) $ do
for_ batch yield
streamEvents log (first + word (length batch))
streamEffectsRows :: e. HasLogFunc e
=> EventLog -> EventId
2019-08-29 03:26:59 +03:00
-> ConduitT () (Word64, ByteString) (RIO e) ()
2019-08-21 02:32:46 +03:00
streamEffectsRows log = go
2019-08-29 03:26:59 +03:00
2019-08-21 02:32:46 +03:00
go next = do
2019-08-29 03:26:59 +03:00
batch <- lift $ readRowsBatch (env log) (effectsTbl log) next
2019-08-21 02:32:46 +03:00
unless (null batch) $ do
for_ batch yield
go (next + fromIntegral (length batch))
Read 1000 rows from the events table, starting from event `first`.
Throws `MissingEvent` if an event was missing from the log.
readBatch :: EventLog -> Word64 -> RIO e (V.Vector ByteString)
readBatch log first = start
start = do
last <- lastEv log
if (first > last)
then pure mempty
else readRows $ fromIntegral $ min 1000 $ ((last+1) - first)
2019-08-29 03:26:59 +03:00
assertFound id found = do
unless found $ throwIO $ MissingEvent id
readRows count =
withWordPtr first $ \pIdx ->
2019-08-29 03:26:59 +03:00
rwith (readTxn $ env log) $ \txn ->
rwith (cursor txn $ eventsTbl log) $ \cur -> do
assertFound first =<< io (mdb_cursor_get MDB_SET_KEY cur pKey pVal)
fetchRows count cur pKey pVal
fetchRows count cur pKey pVal = do
2019-08-29 03:26:59 +03:00
env <- ask
key <- io $ peek pKey >>= mdbValToWord64
val <- io $ peek pVal >>= mdbValToBytes
idx <- pure (first + word i)
unless (key == idx) $ throwIO $ MissingEvent idx
when (count /= succ i) $ do
2019-08-29 03:26:59 +03:00
pure val
2019-05-30 02:19:07 +03:00
Read 1000 rows from the database, starting from key `first`.
readRowsBatch :: e. HasLogFunc e
=> Env -> Dbi -> Word64 -> RIO e (V.Vector (Word64, ByteString))
2019-08-21 02:32:46 +03:00
readRowsBatch env dbi first = readRows
readRows = do
logDebug $ display ("(readRowsBatch) From: " <> tshow first)
2019-08-21 02:32:46 +03:00
withWordPtr first $ \pIdx ->
2019-08-29 03:26:59 +03:00
rwith (readTxn env) $ \txn ->
rwith (cursor txn dbi) $ \cur ->
io (mdb_cursor_get MDB_SET_RANGE cur pKey pVal) >>= \case
False -> pure mempty
True -> V.unfoldrM (fetchBatch cur pKey pVal) 1000
2019-08-29 03:26:59 +03:00
2019-08-29 03:26:59 +03:00
-> RIO e (Maybe ((Word64, ByteString), Word))
fetchBatch cur pKey pVal 0 = pure Nothing
fetchBatch cur pKey pVal n = do
2019-08-29 03:26:59 +03:00
key <- io $ peek pKey >>= mdbValToWord64
val <- io $ peek pVal >>= mdbValToBytes
io $ mdb_cursor_get MDB_NEXT cur pKey pVal >>= \case
2019-08-21 02:32:46 +03:00
False -> pure $ Just ((key, val), 0)
True -> pure $ Just ((key, val), pred n)
-- Utils -----------------------------------------------------------------------
withKVPtrs' :: (MonadIO m, MonadUnliftIO m)
=> Val -> Val -> (Ptr Val -> Ptr Val -> m a) -> m a
withKVPtrs' k v cb =
withRunInIO $ \run ->
withKVPtrs k v $ \x y -> run (cb x y)
nullVal :: MDB_val
nullVal = MDB_val 0 nullPtr
word :: Int -> Word64
word = fromIntegral
2019-05-30 02:19:07 +03:00
assertExn :: Exception e => Bool -> e -> IO ()
assertExn True _ = pure ()
assertExn False e = throwIO e
2019-05-30 02:19:07 +03:00
eitherExn :: Exception e => Either a b -> (a -> e) -> IO b
eitherExn eat exn = either (throwIO . exn) pure eat
2019-05-30 02:19:07 +03:00
byteStringAsMdbVal :: ByteString -> (MDB_val -> IO a) -> IO a
byteStringAsMdbVal bs k =
BU.unsafeUseAsCStringLen bs $ \(ptr,sz) ->
k (MDB_val (fromIntegral sz) (castPtr ptr))
2019-05-30 02:19:07 +03:00
mdbValToWord64 :: MDB_val -> IO Word64
mdbValToWord64 (MDB_val sz ptr) = do
assertExn (sz == 8) BadKeyInEventLog
peek (castPtr ptr)
withWord64AsMDBval :: (MonadIO m, MonadUnliftIO m)
=> Word64 -> (MDB_val -> m a) -> m a
withWord64AsMDBval w cb = do
withWordPtr w $ \p ->
cb (MDB_val (fromIntegral (sizeOf w)) (castPtr p))
withWordPtr :: (MonadIO m, MonadUnliftIO m)
=> Word64 -> (Ptr Word64 -> m a) -> m a
withWordPtr w cb =
withRunInIO $ \run ->
allocaBytes (sizeOf w) (\p -> poke p w >> run (cb p))
-- Lower-Level Operations ------------------------------------------------------
getMb :: MonadIO m => Txn -> Dbi -> ByteString -> m (Maybe Noun)
getMb txn db key =
io $
2019-07-12 22:24:44 +03:00
byteStringAsMdbVal key $ \mKey ->
mdb_get txn db mKey >>= traverse (mdbValToNoun key)
mdbValToBytes :: MDB_val -> IO ByteString
mdbValToBytes (MDB_val sz ptr) = do
BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
mdbValToNoun :: ByteString -> MDB_val -> IO Noun
mdbValToNoun key (MDB_val sz ptr) = do
bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
let res = cueBS bs
eitherExn res (\err -> BadNounInLogIdentity key err bs)
putNoun :: MonadIO m
=> MDB_WriteFlags -> Txn -> Dbi -> ByteString -> Noun -> m Bool
putNoun flags txn db key val =
io $
byteStringAsMdbVal key $ \mKey ->
byteStringAsMdbVal (jamBS val) $ \mVal ->
mdb_put flags txn db mKey mVal
putBytes :: MonadIO m
=> MDB_WriteFlags -> Txn -> Dbi -> Word64 -> ByteString -> m Bool
putBytes flags txn db id bs =
io $
withWord64AsMDBval id $ \idVal ->
byteStringAsMdbVal bs $ \mVal ->
mdb_put flags txn db idVal mVal