mirror of
https://github.com/urbit/shrub.git
synced 2024-12-21 09:51:36 +03:00
236 lines
7.2 KiB
Haskell
236 lines
7.2 KiB
Haskell
-- TODO: Make sure transaction closed in all error cases
|
|
-- TODO: Don't allow writing non-contiguous events
|
|
module Vere.Log (
|
|
init,
|
|
shutdown,
|
|
-- we don't export write; you use the queue
|
|
readEvents,
|
|
latestEventNumber,
|
|
readLogIdentity,
|
|
writeLogIdentity
|
|
) where
|
|
|
|
import ClassyPrelude hiding (init)
|
|
import Control.Lens hiding ((<|))
|
|
|
|
import Data.Noun
|
|
import Data.Noun.Atom
|
|
import Data.Noun.Jam
|
|
import Data.Noun.Pill
|
|
import Data.Void
|
|
import Database.LMDB.Raw
|
|
import Foreign.Ptr
|
|
import Foreign.Marshal.Alloc
|
|
import Vere.Pier.Types
|
|
|
|
import Control.Lens ((^.))
|
|
import Foreign.Storable (peek, poke, sizeOf)
|
|
|
|
import qualified Data.ByteString.Unsafe as BU
|
|
import qualified Data.ByteString as B
|
|
import qualified Data.Vector as V
|
|
import qualified Data.Vector.Mutable as MV
|
|
|
|
--------------------------------------------------------------------------------
|
|
|
|
-- TODO: Handle throws on the async
|
|
init :: FilePath -> TQueue (Writ [Effect]) -> (Writ [Effect] -> STM ())
|
|
-> IO LogState
|
|
init dir inp cb = do
|
|
env <- mdb_env_create
|
|
mdb_env_set_maxdbs env 3
|
|
mdb_env_set_mapsize env (40 * 1024 * 1024 * 1024)
|
|
mdb_env_open env dir []
|
|
writer <- persistThread env inp cb
|
|
pure (LogState env inp cb writer)
|
|
|
|
-- TODO: properly handle shutdowns during write
|
|
shutdown :: LogState -> IO ()
|
|
shutdown s = do
|
|
void $ waitCancel (writer s)
|
|
mdb_env_close (env s)
|
|
|
|
waitCancel :: Async a -> IO (Either SomeException a)
|
|
waitCancel async = cancel async >> waitCatch async
|
|
|
|
--------------------------------------------------------------------------------
|
|
|
|
{-
|
|
Read one or more items from a TQueue, only blocking on the first item.
|
|
-}
|
|
readQueue :: TQueue a -> STM (NonNull [a])
|
|
readQueue q =
|
|
readTQueue q >>= go . singleton
|
|
where
|
|
go acc =
|
|
tryReadTQueue q >>= \case
|
|
Nothing -> pure (reverse acc)
|
|
Just item -> go (item <| acc)
|
|
|
|
byteStringAsMdbVal :: ByteString -> (MDB_val -> IO a) -> IO a
|
|
byteStringAsMdbVal bs k =
|
|
BU.unsafeUseAsCStringLen bs \(ptr,sz) ->
|
|
k (MDB_val (fromIntegral sz) (castPtr ptr))
|
|
|
|
mdbValToAtom :: MDB_val -> IO Atom
|
|
mdbValToAtom (MDB_val sz ptr) = do
|
|
bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
|
|
pure (bs ^. from (pill . pillBS))
|
|
|
|
maybeErr :: Maybe a -> String -> IO a
|
|
maybeErr (Just x) _ = pure x
|
|
maybeErr Nothing msg = error msg
|
|
|
|
mdbValToNoun :: MDB_val -> IO Noun
|
|
mdbValToNoun (MDB_val sz ptr) = do
|
|
bs <- BU.unsafePackCStringLen (castPtr ptr, fromIntegral sz)
|
|
let res = (bs ^? from pillBS . from pill . _Cue)
|
|
maybeErr res "mdb bad cue"
|
|
|
|
putRaw :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> MDB_val -> MDB_val -> IO ()
|
|
putRaw flags txn db key val =
|
|
mdb_put flags txn db key val >>= \case
|
|
True -> pure ()
|
|
False -> error "mdb bad put"
|
|
|
|
putNoun :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> ByteString -> Noun -> IO ()
|
|
putNoun flags txn db key val =
|
|
byteStringAsMdbVal key $ \mKey ->
|
|
byteStringAsMdbVal (val ^. re _CueBytes) $ \mVal ->
|
|
putRaw flags txn db mKey mVal
|
|
|
|
putJam :: MDB_WriteFlags -> MDB_txn -> MDB_dbi -> Word64 -> Jam -> IO ()
|
|
putJam flags txn db id (Jam atom) = do
|
|
withWord64AsMDBval id $ \idVal -> do
|
|
let !bs = atom ^. pill . pillBS
|
|
byteStringAsMdbVal bs $ \mVal -> do
|
|
putRaw flags txn db idVal mVal
|
|
|
|
get :: MDB_txn -> MDB_dbi -> ByteString -> IO Noun
|
|
get txn db key =
|
|
byteStringAsMdbVal key \mKey ->
|
|
mdb_get txn db mKey >>= maybe (error "mdb bad get") mdbValToNoun
|
|
|
|
mdbValToWord64 :: MDB_val -> IO Word64
|
|
mdbValToWord64 (MDB_val sz ptr) = do
|
|
assertErr (sz == 8) "wrong size in mdbValToWord64"
|
|
peek (castPtr ptr)
|
|
|
|
withWord64AsMDBval :: Word64 -> (MDB_val -> IO a) -> IO a
|
|
withWord64AsMDBval w cb = do
|
|
withWordPtr w $ \p ->
|
|
cb (MDB_val (fromIntegral (sizeOf w)) (castPtr p))
|
|
|
|
--------------------------------------------------------------------------------
|
|
|
|
withWordPtr :: Word64 -> (Ptr Word64 -> IO a) -> IO a
|
|
withWordPtr w cb = do
|
|
allocaBytes (sizeOf w) (\p -> poke p w >> cb p)
|
|
|
|
-- TODO: We need to be able to send back an exception to the main thread on an
|
|
-- exception on the persistence thread.
|
|
persistThread :: MDB_env
|
|
-> TQueue (Writ [Effect])
|
|
-> (Writ [Effect] -> STM ())
|
|
-> IO (Async ())
|
|
persistThread env inputQueue onPersist = asyncBound $
|
|
forever do
|
|
writs <- atomically $ readQueue inputQueue
|
|
writeEvents writs
|
|
atomically $ traverse_ onPersist writs
|
|
where
|
|
writeEvents writs = do
|
|
txn <- mdb_txn_begin env Nothing False
|
|
db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY]
|
|
|
|
let flags = compileWriteFlags [MDB_NOOVERWRITE]
|
|
|
|
for_ writs $ \w -> do
|
|
putJam flags txn db (eventId w) (event w)
|
|
|
|
mdb_txn_commit txn
|
|
|
|
deriving instance Show MDB_val
|
|
|
|
-- TODO: This will read len items and will error if there are less than that
|
|
-- available. This differs from the current pier.c's expectations.
|
|
readEvents :: LogState -> Word64 -> Word64 -> IO (V.Vector (Word64,Atom))
|
|
readEvents (LogState env _ _ _) first len =
|
|
withWordPtr first $ \pIdx ->
|
|
withKVPtrs (MDB_val 8 (castPtr pIdx)) (MDB_val 0 nullPtr) $ \pKey pVal ->
|
|
do
|
|
txn <- mdb_txn_begin env Nothing True
|
|
db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY]
|
|
cur <- mdb_cursor_open txn db
|
|
|
|
found <- mdb_cursor_get MDB_SET_KEY cur pKey pVal
|
|
assertErr found "mdb could not read initial event of sequence"
|
|
|
|
vec <- V.generateM (int len) \i -> do
|
|
key <- peek pKey >>= mdbValToWord64
|
|
val <- peek pVal >>= mdbValToAtom
|
|
|
|
let idx = first + (fromIntegral i)
|
|
|
|
assertErr (key == idx) ("missing event in database " <> (show idx))
|
|
|
|
when (i + 1 /= (int len)) do
|
|
found <- mdb_cursor_get MDB_NEXT cur pKey pVal
|
|
assertErr found "lmdb: next event not found"
|
|
|
|
pure (idx, val)
|
|
|
|
mdb_cursor_close cur
|
|
mdb_txn_abort txn
|
|
|
|
pure vec
|
|
|
|
int :: Word64 -> Int
|
|
int = fromIntegral
|
|
|
|
assertErr :: Bool -> String -> IO ()
|
|
assertErr True _ = pure ()
|
|
assertErr False m = error m
|
|
|
|
latestEventNumber :: LogState -> IO Word64
|
|
latestEventNumber (LogState env _ _ _) =
|
|
do
|
|
txn <- mdb_txn_begin env Nothing True
|
|
db <- mdb_dbi_open txn (Just "EVENTS") [MDB_CREATE, MDB_INTEGERKEY]
|
|
cur <- mdb_cursor_open txn db
|
|
res <- fetch txn db cur
|
|
mdb_cursor_close cur
|
|
mdb_txn_abort txn
|
|
pure res
|
|
where
|
|
key = MDB_val 0 nullPtr
|
|
val = MDB_val 0 nullPtr
|
|
fetch txn db cur =
|
|
withKVPtrs key val $ \pKey pVal ->
|
|
mdb_cursor_get MDB_LAST cur pKey pVal >>= \case
|
|
False -> pure 0
|
|
True -> peek pKey >>= mdbValToWord64
|
|
|
|
--------------------------------------------------------------------------------
|
|
|
|
readLogIdentity :: LogState -> IO LogIdentity
|
|
readLogIdentity (LogState env _ _ _) = do
|
|
txn <- mdb_txn_begin env Nothing True
|
|
db <- mdb_dbi_open txn (Just "META") []
|
|
who <- get txn db "who"
|
|
is_fake <- get txn db "is-fake"
|
|
life <- get txn db "life"
|
|
mdb_txn_abort txn
|
|
pure (LogIdentity who is_fake life)
|
|
|
|
writeLogIdentity :: LogState -> LogIdentity -> IO ()
|
|
writeLogIdentity (LogState env _ _ _) LogIdentity{..} = do
|
|
txn <- mdb_txn_begin env Nothing False
|
|
db <- mdb_dbi_open txn (Just "META") [MDB_CREATE]
|
|
let flags = compileWriteFlags []
|
|
putNoun flags txn db "who" who
|
|
putNoun flags txn db "is-fake" is_fake
|
|
putNoun flags txn db "life" life
|
|
mdb_txn_commit txn
|
|
pure ()
|