First minor integration with lmdb.

This stores the ship's identity information in a separate META table in
the lmdb datastore.
This commit is contained in:
Elliot Glaysher 2019-04-18 14:01:44 -07:00
parent afb9911636
commit 2bef1a30da
11 changed files with 388 additions and 10 deletions

View File

@ -10,7 +10,7 @@ let
libs =
with pkgs;
[ openssl zlib curl gmp scrypt libsigsegv ncurses openssl zlib ];
[ openssl zlib curl gmp scrypt libsigsegv ncurses openssl zlib lmdb ];
osx =
with pkgs;

View File

@ -9,7 +9,7 @@ let
deps =
with pkgs;
[ curl gmp libsigsegv ncurses openssl zlib ];
[ curl gmp libsigsegv ncurses openssl zlib lmdb ];
vendor =
[ argon2 softfloat3 ed25519 ent h2o scrypt uv murmur3 secp256k1 sni ];

View File

@ -6,7 +6,7 @@ let
crossdeps =
with env;
[ curl libgmp libsigsegv ncurses openssl zlib ];
[ curl libgmp libsigsegv ncurses openssl zlib lmdb ];
vendor =
with deps;

View File

@ -9,6 +9,7 @@ include/config.h
*.o
/urbit
/urbit-worker
/urbit-tests
/tags
#
# Editor Bullshit

View File

@ -5,6 +5,7 @@ noun = $(wildcard noun/*.c)
vere = $(wildcard vere/*.c)
daemon = $(wildcard daemon/*.c)
worker = $(wildcard worker/*.c)
test = tests/test_lmdb.c
common = $(jets) $(noun) $(vere)
headers = $(shell find include -type f)
@ -12,6 +13,7 @@ headers = $(shell find include -type f)
common_objs = $(shell echo $(common) | sed 's/\.c/.o/g')
daemon_objs = $(shell echo $(daemon) | sed 's/\.c/.o/g')
worker_objs = $(shell echo $(worker) | sed 's/\.c/.o/g')
test_objs = $(shell echo $(test) | sed 's/\.c/.o/g')
all_objs = $(common_objs) $(daemon_objs) $(worker_objs)
all_srcs = $(common) $(daemon) $(worker)
@ -40,6 +42,10 @@ urbit-worker: $(common_objs) $(worker_objs)
@echo CC -o $@
@$(CC) $^ $(LDFLAGS) -o $@
urbit-tests: $(common_objs) $(test_objs)
@echo CC -o $@
@$(CC) $^ $(LDFLAGS) -o $@
%.o: %.c $(headers)
@echo CC $<
@$(CC) -I./include $(CFLAGS) -c $< -o $@

2
pkg/urbit/configure vendored
View File

@ -6,7 +6,7 @@ URBIT_VERSION=0.8.0
deps=" \
curl gmp sigsegv argon2 ed25519 ent h2o scrypt sni uv murmur3 secp256k1 \
softfloat3 ncurses ssl crypto z \
softfloat3 ncurses ssl crypto z lmdb \
"
echo '#pragma once' >include/config.h

View File

@ -0,0 +1,17 @@
#include <lmdb.h>
MDB_env* u3m_lmdb_init(const char* log_path);
void u3m_lmdb_shutdown(MDB_env* env);
void u3m_lmdb_write_identity(MDB_env* environment,
u3_noun who,
u3_noun is_fake,
u3_noun life);
void u3m_lmdb_read_identity(MDB_env* environment,
u3_noun* who,
u3_noun* is_fake,
u3_noun* life);

View File

@ -4,6 +4,7 @@
*/
#include "h2o.h"
#include "lmdb.h"
/** Quasi-tunable parameters.
**/
@ -649,6 +650,7 @@
u3_foil* fol_u; // logfile
c3_o liv_o; // live
c3_d end_d; // byte end of file
MDB_env* db_u; // lmdb environment. replaces fol_u.
c3_d moc_d; // commit requested
c3_d com_d; // committed
struct _u3_pier* pir_u; // pier backpointer

View File

@ -0,0 +1,41 @@
#define C3_GLOBAL
#include "all.h"
#include "vere/lmdb.h"
// A simple memory tester.
//
int
main(int argc, char *argv[])
{
u3m_init();
u3m_pave(c3y, c3n);
{
MDB_env* env = u3m_lmdb_init("./test-env");
u3_noun who = u3i_string("~littel-ponnys");
u3_noun is_fake = u3i_string("is-fake");
u3_noun life = u3i_string("life");
u3m_lmdb_write_identity(env, who, is_fake, life);
u3m_lmdb_shutdown(env);
}
{
MDB_env* env = u3m_lmdb_init("./test-env");
u3_noun who;
u3_noun is_fake;
u3_noun life;
u3m_lmdb_read_identity(env, &who, &is_fake, &life);
u3m_lmdb_shutdown(env);
u3m_p("who", who);
u3m_p("is-fake", is_fake);
u3m_p("life", life);
}
}

264
pkg/urbit/vere/lmdb.c Normal file
View File

@ -0,0 +1,264 @@
/* vere/lmdb.c
*/
#include "all.h"
#include <uv.h>
#include <lmdb.h>
// Event log persistence for Urbit
//
// Persistence works by having an lmdb environment opened on the main
// thread. This environment is used to create read-only transactions
// synchronously when needed.
//
// But most of the lmdb operates asynchronously in the uv worker pool. Since
// individual transactions are bound to threads, we perform all blocking
// writing on worker threads. We do this so we can perform event batching on
// the main thread instead of blocking it; the main thread is still a libuv
// loop.
//
// There are several metadata writes which we can perform from whatever thread
// because we they're inherently stop-the-world blocking. Thankfully, these
// should be "cheap" and "rare".
// Opens up a log environment. This can eventually be made
//
// Precondition: log_path points to an already created directory
//
MDB_env* u3m_lmdb_init(const char* log_path)
{
MDB_env* env = 0;
c3_w ret_w = mdb_env_create(&env);
if (ret_w != 0) {
u3l_log("lmdb: init fail: %s\n", mdb_strerror(ret_w));
return 0;
}
// Our databases have up to three tables: META, EVENTS, and GRAINS.
ret_w = mdb_env_set_maxdbs(env, 3);
if (ret_w != 0) {
u3l_log("lmdb: failed to set number of databases: %s\n", mdb_strerror(ret_w));
return 0;
}
ret_w = mdb_env_open(env, log_path, 0, 0664);
if (ret_w != 0) {
u3l_log("lmdb: failed to open event log: %s\n", mdb_strerror(ret_w));
return 0;
}
return env;
}
void u3m_lmdb_shutdown(MDB_env* env)
{
mdb_env_close(env);
}
// Writes a key/value pair to a specific database as part of a transaction.
//
// The raw version doesn't take ownership of either key/value and performs no
// nock calculations, so it is safe to call from any thread.
//
static
void _perform_put_on_databse_raw(MDB_txn* transaction_u,
MDB_dbi database_u,
void* key,
size_t key_len,
void* value,
size_t value_len) {
MDB_val key_val, value_val;
key_val.mv_size = key_len;
key_val.mv_data = key;
value_val.mv_size = value_len;
value_val.mv_data = value;
c3_w ret_w = mdb_put(transaction_u, database_u, &key_val, &value_val, 0);
if (ret_w != 0) {
u3l_log("lmdb: write failed: %s\n", mdb_strerror(ret_w));
u3m_bail(c3__fail);
}
}
static
void _perform_get_on_databse_raw(MDB_txn* transaction_u,
MDB_dbi database_u,
void* key,
size_t key_len,
MDB_val* value) {
MDB_val key_val;
key_val.mv_size = key_len;
key_val.mv_data = key;
c3_w ret_w = mdb_get(transaction_u, database_u, &key_val, value);
if (ret_w != 0) {
u3l_log("lmdb: read failed: %s\n", mdb_strerror(ret_w));
u3m_bail(c3__fail);
}
}
static
void _perform_put_on_databse_noun(MDB_txn* transaction_u,
MDB_dbi database_u,
c3_c* key,
u3_noun noun) {
// jam noun into an atom representation
u3_atom mat = u3ke_jam(noun);
// copy the jammed noun into a byte buffer we can hand to lmdb
c3_w len_w = u3r_met(3, mat);
c3_y* bytes_y = (c3_y*) malloc(len_w);
u3r_bytes(0, len_w, bytes_y, mat);
_perform_put_on_databse_raw(transaction_u,
database_u,
key, strlen(key),
bytes_y, len_w);
free(bytes_y);
u3z(mat);
}
static
void _perform_get_on_databse_noun(MDB_txn* transaction_u,
MDB_dbi database_u,
c3_c* key,
u3_noun* noun) {
MDB_val value_val;
_perform_get_on_databse_raw(transaction_u,
database_u,
key, strlen(key),
&value_val);
// Take the bytes and cue them.
u3_atom raw_atom = u3i_bytes(value_val.mv_size, value_val.mv_data);
*noun = u3qe_cue(raw_atom);
}
// Implementation of u3m_lmdb_write_events() called on a worker thread.
//
static void u3m_lmdb_write_events_cb(uv_work_t* req) {
}
// Implementation of u3m_lmdb_write_events() called on the main loop thread
// after the worker thread event completes.
//
static void u3m_lmdb_write_events_after_cb(uv_work_t* req, int status) {
// Calls the user provided cb
// Cleans up req->data.
}
// u3m_lmdb_write_events(): Asynchronously writes events to the database.
//
// This writes all the passed in events along with log metadata updates to the
// database as a single transaction on a worker thread. Once the transaction
// is completed, it calls the passed in callback on the main loop thread.
//
void u3m_lmdb_write_events(MDB_env* environment,
int first_event_id,
u3_noun* events,
int event_count
/*, TODO: ADD CALLBACK */
)
{
// Packs up all the events for transport.
//
uv_work_t req;
req.data = /* malloc() */ 0;
uv_queue_work(uv_default_loop(),
&req,
u3m_lmdb_write_events_cb,
u3m_lmdb_write_events_after_cb);
}
// Writes the event log identity information.
//
// We have a secondary database (table) in this environment named META where we
// read/write identity information from/to.
//
void u3m_lmdb_write_identity(MDB_env* environment,
u3_noun who,
u3_noun is_fake,
u3_noun life)
{
// Creates the write transaction.
MDB_txn* transaction_u;
c3_w ret_w = mdb_txn_begin(environment,
(MDB_txn *) NULL,
0, /* flags */
&transaction_u);
if (0 != ret_w) {
u3l_log("lmdb: txn_begin fail: %s\n", mdb_strerror(ret_w));
u3m_bail(c3__fail);
}
// Opens the database as part of the transaction.
c3_w flags_w = MDB_CREATE;
MDB_dbi database_u;
ret_w = mdb_dbi_open(transaction_u,
"META",
flags_w,
&database_u);
if (0 != ret_w) {
u3l_log("lmdb: dbi_open fail: %s\n", mdb_strerror(ret_w));
u3m_bail(c3__fail);
}
_perform_put_on_databse_noun(transaction_u, database_u, "who", who);
_perform_put_on_databse_noun(transaction_u, database_u, "is-fake", is_fake);
_perform_put_on_databse_noun(transaction_u, database_u, "life", life);
ret_w = mdb_txn_commit(transaction_u);
if (0 != ret_w) {
u3l_log("lmdb: failed to commit transaction: %s\n", mdb_strerror(ret_w));
u3m_bail(c3__fail);
}
}
// Reads the event log identity information.
//
void u3m_lmdb_read_identity(MDB_env* environment,
u3_noun* who,
u3_noun* is_fake,
u3_noun* life) {
// Creates the write transaction.
MDB_txn* transaction_u;
c3_w ret_w = mdb_txn_begin(environment,
(MDB_txn *) NULL,
0, /* flags */
&transaction_u);
if (0 != ret_w) {
u3l_log("lmdb: txn_begin fail: %s\n", mdb_strerror(ret_w));
u3m_bail(c3__fail);
}
// Opens the database as part of the transaction.
MDB_dbi database_u;
ret_w = mdb_dbi_open(transaction_u,
"META",
0,
&database_u);
if (0 != ret_w) {
u3l_log("lmdb: dbi_open fail: %s\n", mdb_strerror(ret_w));
u3m_bail(c3__fail);
}
_perform_get_on_databse_noun(transaction_u, database_u, "who", who);
_perform_get_on_databse_noun(transaction_u, database_u, "is-fake", is_fake);
_perform_get_on_databse_noun(transaction_u, database_u, "life", life);
// Read-only transactions are aborted since we don't need to record the fact
// that we performed a read.
mdb_txn_abort(transaction_u);
}

View File

@ -85,6 +85,7 @@ _pier_disk_bail(void* vod_p, const c3_c* err_c)
static void
_pier_disk_shutdown(u3_pier* pir_u)
{
u3m_lmdb_shutdown(pir_u->log_u->db_u);
}
/* _pier_disk_commit_complete(): commit complete.
@ -177,6 +178,38 @@ _pier_disk_write_header(u3_pier* pir_u, u3_atom mat)
len_d);
}
static void
_pier_db_write_header(u3_pier* pir_u,
u3_noun who,
u3_noun is_fake,
u3_noun life)
{
u3m_lmdb_write_identity(pir_u->log_u->db_u,
who, is_fake, life);
u3z(who);
u3z(is_fake);
u3z(life);
}
/* _pier_db_read_header(): reads the ships metadata from lmdb
*/
static void
_pier_db_read_header(u3_pier* pir_u)
{
u3_noun who, is_fake, life;
u3m_lmdb_read_identity(pir_u->log_u->db_u,
&who, &is_fake, &life);
_pier_boot_set_ship(pir_u, u3k(who), u3k(is_fake));
pir_u->lif_d = u3r_chub(0, life);
u3z(who);
u3z(is_fake);
u3z(life);
}
/* _pier_disk_read_header_complete():
*/
static void
@ -397,7 +430,11 @@ _pier_disk_init_complete(u3_disk* log_u, c3_d evt_d)
// restore pier identity (XX currently a no-op, see comment)
//
_pier_disk_read_header(log_u);
//_pier_disk_read_header(log_u);
// TODO: We want to restore our identity right here?
//
//_pier_db_read_header(log_u->pir_u);
_pier_boot_ready(log_u->pir_u);
}
@ -504,6 +541,12 @@ _pier_disk_create(u3_pier* pir_u)
c3_free(log_c);
return c3n;
}
if ( 0 == (log_u->db_u = u3m_lmdb_init(log_c)) ) {
c3_free(log_c);
return c3n;
}
c3_free(log_c);
}
@ -637,7 +680,7 @@ _pier_work_bail(void* vod_p,
fprintf(stderr, "pier: work error: %s\r\n", err_c);
}
/* _pier_work_boot(): prepare serf boot.
/* _pier_work_boot(): prepare for boot.
*/
static void
_pier_work_boot(u3_pier* pir_u, c3_o sav_o)
@ -649,8 +692,12 @@ _pier_work_boot(u3_pier* pir_u, c3_o sav_o)
u3_noun who = u3i_chubs(2, pir_u->who_d);
u3_noun len = u3i_chubs(1, &pir_u->lif_d);
u3_noun msg = u3nq(c3__boot, who, pir_u->fak_o, len);
u3_atom mat = u3ke_jam(msg);
if ( c3y == sav_o ) {
_pier_db_write_header(pir_u, u3k(who), u3k(pir_u->fak_o), u3k(len));
}
u3_atom mat = u3we_jam(msg);
if ( c3y == sav_o ) {
_pier_disk_write_header(pir_u, u3k(mat));
}
@ -896,7 +943,7 @@ _pier_work_play(u3_pier* pir_u,
c3_assert( c3n == god_u->liv_o );
god_u->liv_o = c3y;
// all events in the serf are complete
// all events in the worker are complete
//
god_u->rel_d = god_u->dun_d = god_u->sen_d = (lav_d - 1ULL);
@ -1667,7 +1714,7 @@ _pier_boot_ready(u3_pier* pir_u)
_pier_boot_vent(pir_u->bot_u);
_pier_boot_dispose(pir_u->bot_u);
// prepare serf for boot sequence, write log header
// prepare worker for boot sequence, write log header
//
_pier_work_boot(pir_u, c3y);
@ -1696,7 +1743,7 @@ _pier_boot_ready(u3_pier* pir_u)
fprintf(stderr, "pier: replaying events 1 through %" PRIu64 "\r\n",
log_u->com_d);
// prepare serf for replay of boot sequence, don't write log header
// prepare worker for replay of boot sequence, don't write log header
//
_pier_work_boot(pir_u, c3n);
}