mirror of
https://github.com/ilyakooo0/urbit.git
synced 2024-09-21 07:28:30 +03:00
vere: refactors lmdb.c, separating it from u3 and uv
This commit is contained in:
parent
24b6190a3d
commit
ef83c0160e
@ -2,7 +2,7 @@ include config.mk
|
||||
|
||||
jets = jets/tree.c $(wildcard jets/*/*.c)
|
||||
noun = $(wildcard noun/*.c)
|
||||
vere = $(wildcard vere/*.c)
|
||||
vere = $(wildcard vere/*.c) $(wildcard vere/*/*.c)
|
||||
daemon = $(wildcard daemon/*.c)
|
||||
worker = $(wildcard worker/*.c)
|
||||
tests = $(wildcard tests/*.c)
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include <h2o.h>
|
||||
#include <curl/curl.h>
|
||||
#include <argon2.h>
|
||||
#include <lmdb.h>
|
||||
|
||||
#define U3_GLOBAL
|
||||
#define C3_GLOBAL
|
||||
|
56
pkg/urbit/include/vere/db/lmdb.h
Normal file
56
pkg/urbit/include/vere/db/lmdb.h
Normal file
@ -0,0 +1,56 @@
|
||||
/* include/vere/db/lmdb-impl.h
|
||||
*/
|
||||
|
||||
#include <lmdb.h>
|
||||
|
||||
/* lmdb api wrapper
|
||||
*/
|
||||
|
||||
/* c3_lmdb_init(): open lmdb at [pax_c], mmap up to [siz_i].
|
||||
*/
|
||||
MDB_env*
|
||||
c3_lmdb_init(const c3_c* pax_c, size_t siz_i);
|
||||
|
||||
/* c3_lmdb_exit(): close lmdb.
|
||||
*/
|
||||
void
|
||||
c3_lmdb_exit(MDB_env* env_u);
|
||||
|
||||
/* c3_lmdb_gulf(): read first and last event numbers.
|
||||
*/
|
||||
c3_o
|
||||
c3_lmdb_gulf(MDB_env* env_u, c3_d* low_d, c3_d* hig_d);
|
||||
|
||||
/* c3_lmdb_read(): read [len_d] events starting at [eve_d].
|
||||
*/
|
||||
c3_o
|
||||
c3_lmdb_read(MDB_env* env_u,
|
||||
void* vod_p,
|
||||
c3_d eve_d,
|
||||
c3_d len_d,
|
||||
c3_o (*read_f)(void*, c3_d, size_t , void*));
|
||||
|
||||
/* c3_lmdb_save(): save [len_d] events starting at [eve_d].
|
||||
*/
|
||||
c3_o
|
||||
c3_lmdb_save(MDB_env* env_u,
|
||||
c3_d eve_d,
|
||||
c3_d len_d,
|
||||
void** byt_p,
|
||||
size_t* siz_i);
|
||||
|
||||
/* c3_lmdb_read_meta(): read by string from the META db.
|
||||
*/
|
||||
void
|
||||
c3_lmdb_read_meta(MDB_env* env_u,
|
||||
void* vod_p,
|
||||
const c3_c* key_c,
|
||||
void (*read_f)(void*, size_t, void*));
|
||||
|
||||
/* c3_lmdb_save_meta(): save by string into the META db.
|
||||
*/
|
||||
c3_o
|
||||
c3_lmdb_save_meta(MDB_env* env_u,
|
||||
const c3_c* key_c,
|
||||
size_t val_i,
|
||||
void* val_p);
|
@ -4,7 +4,6 @@
|
||||
*/
|
||||
|
||||
#include "h2o.h"
|
||||
#include <lmdb.h>
|
||||
|
||||
/** Quasi-tunable parameters.
|
||||
**/
|
||||
@ -447,7 +446,7 @@
|
||||
u3_dire* urb_u; // urbit system data
|
||||
u3_dire* com_u; // log directory
|
||||
c3_o liv_o; // live
|
||||
MDB_env* db_u; // lmdb environment.
|
||||
void* mdb_u; // lmdb environment.
|
||||
c3_d sen_d; // commit requested
|
||||
c3_d dun_d; // committed
|
||||
u3_disk_cb cb_u; // callbacks
|
||||
@ -1054,77 +1053,3 @@
|
||||
|
||||
c3_w
|
||||
u3_readdir_r(DIR *dirp, struct dirent *entry, struct dirent **result);
|
||||
|
||||
/* Database
|
||||
*/
|
||||
/* u3_lmdb_init(): Initializes lmdb inside log_path
|
||||
*/
|
||||
MDB_env* u3_lmdb_init(const char* log_path);
|
||||
|
||||
/* u3_lmdb_shutdown(): Shuts down the entire logging system
|
||||
*/
|
||||
void u3_lmdb_shutdown(MDB_env* env);
|
||||
|
||||
/* u3_lmdb_get_latest_event_number(): Gets last event id persisted
|
||||
*/
|
||||
c3_o u3_lmdb_get_latest_event_number(MDB_env* environment,
|
||||
c3_d* event_number);
|
||||
|
||||
/* u3_lmdb_write_request: opaque write request structures
|
||||
*/
|
||||
struct u3_lmdb_write_request;
|
||||
|
||||
/* u3_lmdb_build_write_reuqest(): allocates and builds a write request
|
||||
**
|
||||
** Reads count sequential writs starting with event_u and creates a
|
||||
** single write request for all those writs.
|
||||
*/
|
||||
struct u3_lmdb_write_request*
|
||||
u3_lmdb_build_write_request(u3_writ* event_u, c3_d count);
|
||||
|
||||
/* u3_lmdb_free_write_request(): frees a write requst
|
||||
*/
|
||||
void u3_lmdb_free_write_request(struct u3_lmdb_write_request* request);
|
||||
|
||||
/* u3_lmdb_write_event(): Persists an event to the database
|
||||
*/
|
||||
void u3_lmdb_write_event(MDB_env* environment,
|
||||
u3_pier* pir_u,
|
||||
struct u3_lmdb_write_request* request_u,
|
||||
void (*on_complete)(c3_o success, u3_pier*,
|
||||
c3_d, c3_d));
|
||||
|
||||
/* u3_lmdb_read_events(): Reads events back from the database
|
||||
**
|
||||
** Reads back up to |len_d| events starting with |first_event_d|. For
|
||||
** each event, the event will be passed to |on_event_read| and further
|
||||
** reading will be aborted if the callback returns c3n.
|
||||
**
|
||||
** Returns c3y on complete success; c3n on any error.
|
||||
*/
|
||||
c3_o
|
||||
u3_lmdb_read_events(MDB_env* db_u,
|
||||
c3_d first_event_d,
|
||||
c3_d len_d,
|
||||
void* vod_p,
|
||||
c3_o(*on_event_read)(void*, c3_d, u3_atom));
|
||||
|
||||
/* u3_lmdb_write_identity(): Writes log identity
|
||||
**
|
||||
** Returns c3y on complete success; c3n on any error.
|
||||
*/
|
||||
c3_o u3_lmdb_write_identity(MDB_env* environment,
|
||||
u3_noun who,
|
||||
u3_noun is_fake,
|
||||
u3_noun life);
|
||||
|
||||
/* u3_lmdb_read_identity(): Reads log identity
|
||||
**
|
||||
** Returns c3y on complete success; c3n on any error.
|
||||
*/
|
||||
c3_o u3_lmdb_read_identity(MDB_env* environment,
|
||||
u3_noun* who,
|
||||
u3_noun* is_fake,
|
||||
u3_noun* life);
|
||||
|
||||
|
||||
|
464
pkg/urbit/vere/db/lmdb.c
Normal file
464
pkg/urbit/vere/db/lmdb.c
Normal file
@ -0,0 +1,464 @@
|
||||
/* vere/db/lmdb.c
|
||||
*/
|
||||
|
||||
#include <lmdb.h>
|
||||
|
||||
#include "c/portable.h"
|
||||
#include "c/types.h"
|
||||
#include "c/defs.h"
|
||||
|
||||
#include <vere/db/lmdb.h>
|
||||
|
||||
// lmdb api wrapper
|
||||
//
|
||||
// this module implements a simple persistence api on top of lmdb.
|
||||
// outside of its use of c3 type definitions, this module has no
|
||||
// dependence on anything u3, or on any library besides lmdb itself.
|
||||
//
|
||||
// urbit requires very little from a persist store -- it merely
|
||||
// needs to store variable-length buffers in:
|
||||
//
|
||||
// - a metadata store with c3_c (unsigned char) keys
|
||||
// - an event store with contiguous c3_d (uint64_t) keys
|
||||
//
|
||||
// supported operations are as follows
|
||||
//
|
||||
// - open/close an environment
|
||||
// - read/save metadata
|
||||
// - read the first and last event numbers
|
||||
// - read/save ranges of events
|
||||
//
|
||||
|
||||
/* c3_lmdb_init(): open lmdb at [pax_c], mmap up to [siz_i].
|
||||
*/
|
||||
MDB_env*
|
||||
c3_lmdb_init(const c3_c* pax_c, size_t siz_i)
|
||||
{
|
||||
MDB_env* env_u;
|
||||
c3_w ret_w;
|
||||
|
||||
if ( (ret_w = mdb_env_create(&env_u)) ) {
|
||||
fprintf(stderr, "lmdb: init fail: %s\n", mdb_strerror(ret_w));
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Our databases have two tables: META and EVENTS
|
||||
//
|
||||
if ( (ret_w = mdb_env_set_maxdbs(env_u, 2)) ) {
|
||||
fprintf(stderr, "lmdb: failed to set number of databases: %s\r\n",
|
||||
mdb_strerror(ret_w));
|
||||
// XX dispose env_u
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ( (ret_w = mdb_env_set_mapsize(env_u, siz_i)) ) {
|
||||
fprintf(stderr, "lmdb: failed to set database size: %s\r\n",
|
||||
mdb_strerror(ret_w));
|
||||
// XX dispose env_u
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ( (ret_w = mdb_env_open(env_u, pax_c, 0, 0664)) ) {
|
||||
fprintf(stderr, "lmdb: failed to open event log: %s\n",
|
||||
mdb_strerror(ret_w));
|
||||
// XX dispose env_u
|
||||
//
|
||||
return 0;
|
||||
}
|
||||
|
||||
return env_u;
|
||||
}
|
||||
|
||||
/* c3_lmdb_exit(): close lmdb.
|
||||
*/
|
||||
void
|
||||
c3_lmdb_exit(MDB_env* env_u)
|
||||
{
|
||||
mdb_env_close(env_u);
|
||||
}
|
||||
|
||||
/* c3_lmdb_gulf(): read first and last event numbers.
|
||||
*/
|
||||
c3_o
|
||||
c3_lmdb_gulf(MDB_env* env_u, c3_d* low_d, c3_d* hig_d)
|
||||
{
|
||||
MDB_txn* txn_u;
|
||||
MDB_dbi mdb_u;
|
||||
c3_w ret_w;
|
||||
|
||||
// create a read-only transaction.
|
||||
//
|
||||
// XX why no MDB_RDONLY?
|
||||
//
|
||||
if ( (ret_w = mdb_txn_begin(env_u, 0, 0, &txn_u)) ) {
|
||||
fprintf(stderr, "lmdb: gulf: txn_begin fail: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// open the database in the transaction
|
||||
//
|
||||
{
|
||||
c3_w ops_w = MDB_CREATE | MDB_INTEGERKEY;
|
||||
|
||||
if ( (ret_w = mdb_dbi_open(txn_u, "EVENTS", ops_w, &mdb_u)) ) {
|
||||
fprintf(stderr, "lmdb: gulf: dbi_open fail: %s\n", mdb_strerror(ret_w));
|
||||
// XX confirm
|
||||
//
|
||||
mdb_txn_abort(txn_u);
|
||||
return c3n;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
MDB_cursor* cur_u;
|
||||
MDB_val key_u;
|
||||
MDB_val val_u;
|
||||
c3_d fir_d, las_d;
|
||||
|
||||
// creates a cursor to point to the last event
|
||||
//
|
||||
if ( (ret_w = mdb_cursor_open(txn_u, mdb_u, &cur_u)) ) {
|
||||
fprintf(stderr, "lmdb: gulf: cursor_open fail: %s\n",
|
||||
mdb_strerror(ret_w));
|
||||
// XX confirm
|
||||
//
|
||||
mdb_txn_abort(txn_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// read with the cursor from the start of the database
|
||||
//
|
||||
ret_w = mdb_cursor_get(cur_u, &key_u, &val_u, MDB_FIRST);
|
||||
|
||||
if ( MDB_NOTFOUND == ret_w ) {
|
||||
*low_d = 0;
|
||||
*hig_d = 0;
|
||||
mdb_cursor_close(cur_u);
|
||||
mdb_txn_abort(txn_u);
|
||||
return c3y;
|
||||
}
|
||||
else if ( ret_w ) {
|
||||
fprintf(stderr, "lmdb: gulf: head fail: %s\n",
|
||||
mdb_strerror(ret_w));
|
||||
mdb_cursor_close(cur_u);
|
||||
mdb_txn_abort(txn_u);
|
||||
return c3n;
|
||||
}
|
||||
else {
|
||||
fir_d = *(c3_d*)key_u.mv_data;
|
||||
}
|
||||
|
||||
// read with the cursor from the end of the database
|
||||
//
|
||||
ret_w = mdb_cursor_get(cur_u, &key_u, &val_u, MDB_LAST);
|
||||
|
||||
if ( !ret_w ) {
|
||||
las_d = *(c3_d*)key_u.mv_data;
|
||||
}
|
||||
|
||||
// clean up unconditionally, we're done
|
||||
//
|
||||
mdb_cursor_close(cur_u);
|
||||
mdb_txn_abort(txn_u);
|
||||
|
||||
if ( ret_w ) {
|
||||
fprintf(stderr, "lmdb: gulf: last fail: %s\r\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
else {
|
||||
*low_d = fir_d;
|
||||
*hig_d = las_d;
|
||||
return c3y;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* c3_lmdb_read(): read [len_d] events starting at [eve_d].
|
||||
*/
|
||||
c3_o
|
||||
c3_lmdb_read(MDB_env* env_u,
|
||||
void* vod_p,
|
||||
c3_d eve_d,
|
||||
c3_d len_d,
|
||||
c3_o (*read_f)(void*, c3_d, size_t, void*))
|
||||
{
|
||||
MDB_txn* txn_u;
|
||||
MDB_dbi mdb_u;
|
||||
c3_w ret_w;
|
||||
|
||||
// create a read-only transaction.
|
||||
//
|
||||
if ( (ret_w = mdb_txn_begin(env_u, 0, MDB_RDONLY, &txn_u)) ) {
|
||||
fprintf(stderr, "lmdb: read txn_begin fail: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// open the database in the transaction
|
||||
//
|
||||
{
|
||||
c3_w ops_w = MDB_CREATE | MDB_INTEGERKEY;
|
||||
|
||||
if ( (ret_w = mdb_dbi_open(txn_u, "EVENTS", ops_w, &mdb_u)) ) {
|
||||
fprintf(stderr, "lmdb: read: dbi_open fail: %s\n", mdb_strerror(ret_w));
|
||||
// XX confirm
|
||||
//
|
||||
mdb_txn_abort(txn_u);
|
||||
return c3n;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
MDB_cursor* cur_u;
|
||||
MDB_val val_u;
|
||||
// set the initial key to [eve_d]
|
||||
//
|
||||
MDB_val key_u = { .mv_size = sizeof(c3_d), .mv_data = &eve_d };
|
||||
|
||||
// creates a cursor to iterate over keys starting at [eve_d]
|
||||
//
|
||||
if ( (ret_w = mdb_cursor_open(txn_u, mdb_u, &cur_u)) ) {
|
||||
fprintf(stderr, "lmdb: read: cursor_open fail: %s\n",
|
||||
mdb_strerror(ret_w));
|
||||
// XX confirm
|
||||
//
|
||||
mdb_txn_abort(txn_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// set the cursor to the position of [eve_d]
|
||||
//
|
||||
if ( (ret_w = mdb_cursor_get(cur_u, &key_u, &val_u, MDB_SET_KEY)) ) {
|
||||
fprintf(stderr, "lmdb: read: initial cursor_get failed at %" PRIu64 ": %s\r\n",
|
||||
eve_d,
|
||||
mdb_strerror(ret_w));
|
||||
mdb_cursor_close(cur_u);
|
||||
// XX confirm
|
||||
//
|
||||
mdb_txn_abort(txn_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// load up to [len_d] events, iterating forward across the cursor.
|
||||
//
|
||||
{
|
||||
c3_o ret_o = c3y;
|
||||
c3_d i_d;
|
||||
|
||||
for ( i_d = 0; (ret_w != MDB_NOTFOUND) && (i_d < len_d); ++i_d) {
|
||||
c3_d cur_d = (eve_d + i_d);
|
||||
if ( sizeof(c3_d) != key_u.mv_size ) {
|
||||
fprintf(stderr, "lmdb: read: invalid key size\r\n");
|
||||
ret_o = c3n;
|
||||
break;
|
||||
}
|
||||
|
||||
// sanity check: ensure contiguous event numbers
|
||||
//
|
||||
if ( *(c3_d*)key_u.mv_data != cur_d ) {
|
||||
fprintf(stderr, "lmdb: read gap: expected %" PRIu64
|
||||
", received %" PRIu64 "\r\n",
|
||||
cur_d,
|
||||
*(c3_d*)key_u.mv_data);
|
||||
ret_o = c3n;
|
||||
break;
|
||||
}
|
||||
|
||||
// invoke read callback with [val_u]
|
||||
//
|
||||
if ( c3n == read_f(vod_p, cur_d, val_u.mv_size, val_u.mv_data) ) {
|
||||
ret_o = c3n;
|
||||
break;
|
||||
}
|
||||
|
||||
// read the next event from the cursor
|
||||
//
|
||||
if ( (ret_w = mdb_cursor_get(cur_u, &key_u, &val_u, MDB_NEXT))
|
||||
&& (MDB_NOTFOUND != ret_w) )
|
||||
{
|
||||
fprintf(stderr, "lmdb: read: error: %s\r\n",
|
||||
mdb_strerror(ret_w));
|
||||
ret_o = c3n;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
mdb_cursor_close(cur_u);
|
||||
|
||||
// read-only transactions are aborted when complete
|
||||
//
|
||||
mdb_txn_abort(txn_u);
|
||||
|
||||
return ret_o;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* c3_lmdb_save(): save [len_d] events starting at [eve_d].
|
||||
*/
|
||||
c3_o
|
||||
c3_lmdb_save(MDB_env* env_u,
|
||||
c3_d eve_d, // first event
|
||||
c3_d len_d, // number of events
|
||||
void** byt_p, // array of bytes
|
||||
size_t* siz_i) // array of lengths
|
||||
{
|
||||
MDB_txn* txn_u;
|
||||
MDB_dbi mdb_u;
|
||||
c3_w ret_w;
|
||||
|
||||
// create a write transaction
|
||||
//
|
||||
if ( (ret_w = mdb_txn_begin(env_u, 0, 0, &txn_u)) ) {
|
||||
fprintf(stderr, "lmdb: write: txn_begin fail: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// opens the database in the transaction
|
||||
//
|
||||
{
|
||||
c3_w ops_w = MDB_CREATE | MDB_INTEGERKEY;
|
||||
|
||||
if ( (ret_w = mdb_dbi_open(txn_u, "EVENTS", ops_w, &mdb_u)) ) {
|
||||
fprintf(stderr, "lmdb: write: dbi_open fail: %s\n", mdb_strerror(ret_w));
|
||||
mdb_txn_abort(txn_u);
|
||||
return c3n;
|
||||
}
|
||||
}
|
||||
|
||||
// write every event in the batch
|
||||
//
|
||||
{
|
||||
c3_w ops_w = MDB_NOOVERWRITE;
|
||||
c3_d las_d = (eve_d + len_d);
|
||||
c3_d key_d, i_d;
|
||||
|
||||
for ( i_d = 0; i_d < len_d; ++i_d) {
|
||||
key_d = eve_d + i_d;
|
||||
|
||||
{
|
||||
MDB_val key_u = { .mv_size = sizeof(c3_d), .mv_data = &key_d };
|
||||
MDB_val val_u = { .mv_size = siz_i[i_d], .mv_data = byt_p[i_d] };
|
||||
|
||||
if ( (ret_w = mdb_put(txn_u, mdb_u, &key_u, &val_u, ops_w)) ) {
|
||||
fprintf(stderr, "lmdb: write failed on event %" PRIu64 "\n", key_d);
|
||||
mdb_txn_abort(txn_u);
|
||||
return c3n;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// commit transaction
|
||||
//
|
||||
if ( (ret_w = mdb_txn_commit(txn_u)) ) {
|
||||
fprintf(stderr, "lmdb: write failed: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
return c3y;
|
||||
}
|
||||
|
||||
/* c3_lmdb_read_meta(): read by string from the META db.
|
||||
*/
|
||||
void
|
||||
c3_lmdb_read_meta(MDB_env* env_u,
|
||||
void* vod_p,
|
||||
const c3_c* key_c,
|
||||
void (*read_f)(void*, size_t, void*))
|
||||
{
|
||||
MDB_txn* txn_u;
|
||||
MDB_dbi mdb_u;
|
||||
c3_w ret_w;
|
||||
|
||||
// create a read transaction
|
||||
//
|
||||
if ( (ret_w = mdb_txn_begin(env_u, 0, MDB_RDONLY, &txn_u)) ) {
|
||||
fprintf(stderr, "lmdb: meta read: txn_begin fail: %s\n",
|
||||
mdb_strerror(ret_w));
|
||||
return read_f(vod_p, 0, 0);
|
||||
}
|
||||
|
||||
// open the database in the transaction
|
||||
//
|
||||
if ( (ret_w = mdb_dbi_open(txn_u, "META", 0, &mdb_u)) ) {
|
||||
fprintf(stderr, "lmdb: meta read: dbi_open fail: %s\n",
|
||||
mdb_strerror(ret_w));
|
||||
mdb_txn_abort(txn_u);
|
||||
return read_f(vod_p, 0, 0);
|
||||
}
|
||||
|
||||
// read by string key, invoking callback with result
|
||||
{
|
||||
MDB_val key_u = { .mv_size = strlen(key_c), .mv_data = (void*)key_c };
|
||||
MDB_val val_u;
|
||||
|
||||
if ( (ret_w = mdb_get(txn_u, mdb_u, &key_u, &val_u)) ) {
|
||||
fprintf(stderr, "lmdb: read failed: %s\n", mdb_strerror(ret_w));
|
||||
mdb_txn_abort(txn_u);
|
||||
return read_f(vod_p, 0, 0);
|
||||
}
|
||||
else {
|
||||
read_f(vod_p, val_u.mv_size, val_u.mv_data);
|
||||
|
||||
// read-only transactions are aborted when complete
|
||||
//
|
||||
mdb_txn_abort(txn_u);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* c3_lmdb_save_meta(): save by string into the META db.
|
||||
*/
|
||||
c3_o
|
||||
c3_lmdb_save_meta(MDB_env* env_u,
|
||||
const c3_c* key_c,
|
||||
size_t val_i,
|
||||
void* val_p)
|
||||
{
|
||||
MDB_txn* txn_u;
|
||||
MDB_dbi mdb_u;
|
||||
c3_w ret_w;
|
||||
|
||||
// create a write transaction
|
||||
//
|
||||
if ( (ret_w = mdb_txn_begin(env_u, 0, 0, &txn_u)) ) {
|
||||
fprintf(stderr, "lmdb: meta write: txn_begin fail: %s\n",
|
||||
mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// opens the database in the transaction
|
||||
//
|
||||
if ( (ret_w = mdb_dbi_open(txn_u, "META", MDB_CREATE, &mdb_u)) ) {
|
||||
fprintf(stderr, "lmdb: meta write: dbi_open fail: %s\n",
|
||||
mdb_strerror(ret_w));
|
||||
mdb_txn_abort(txn_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// put value by string key
|
||||
//
|
||||
{
|
||||
MDB_val key_u = { .mv_size = strlen(key_c), .mv_data = (void*)key_c };
|
||||
MDB_val val_u = { .mv_size = val_i, .mv_data = val_p };
|
||||
|
||||
if ( (ret_w = mdb_put(txn_u, mdb_u, &key_u, &val_u, 0)) ) {
|
||||
fprintf(stderr, "lmdb: write failed: %s\n", mdb_strerror(ret_w));
|
||||
mdb_txn_abort(txn_u);
|
||||
return c3n;
|
||||
}
|
||||
}
|
||||
|
||||
// commit txn
|
||||
//
|
||||
if ( (ret_w = mdb_txn_commit(txn_u)) ) {
|
||||
fprintf(stderr, "lmdb: meta write: commit failed: %s\n",
|
||||
mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
return c3y;
|
||||
}
|
@ -18,6 +18,7 @@
|
||||
|
||||
#include "all.h"
|
||||
#include "vere/vere.h"
|
||||
#include <vere/db/lmdb.h>
|
||||
|
||||
struct _cd_read {
|
||||
c3_d eve_d;
|
||||
@ -27,12 +28,26 @@ struct _cd_read {
|
||||
struct _u3_disk* log_u;
|
||||
};
|
||||
|
||||
typedef struct _u3_db_batch {
|
||||
c3_d eve_d; // first event
|
||||
c3_d len_d; // number of events
|
||||
void** byt_p; // array of bytes
|
||||
size_t* siz_i; // array of lengths
|
||||
} u3_db_batch;
|
||||
/* u3_db_batch: database write batch
|
||||
*/
|
||||
typedef struct _u3_db_batch {
|
||||
c3_d eve_d; // first event
|
||||
c3_d len_d; // number of events
|
||||
void** byt_p; // array of bytes
|
||||
size_t* siz_i; // array of lengths
|
||||
} u3_db_batch;
|
||||
|
||||
/* _write_request: callback struct for c3_lmdb_write_event()
|
||||
**
|
||||
** Note that [env_u] is thread-safe, but, transactions and handles
|
||||
** opened from it are explicitly not. [dun_f] is called on the main thread
|
||||
**
|
||||
*/
|
||||
struct _cd_save {
|
||||
c3_o ret_o; // result
|
||||
u3_db_batch* bat_u; // write batch
|
||||
struct _u3_disk* log_u;
|
||||
};
|
||||
|
||||
#undef VERBOSE_DISK
|
||||
|
||||
@ -109,11 +124,23 @@ u3_disk_init(c3_c* pax_c, u3_disk_cb cb_u)
|
||||
return 0;
|
||||
}
|
||||
|
||||
if ( 0 == (log_u->db_u = u3_lmdb_init(log_c)) ) {
|
||||
fprintf(stderr, "disk: failed to initialize database");
|
||||
c3_free(log_c);
|
||||
c3_free(log_u);
|
||||
return 0;
|
||||
{
|
||||
// TODO: Start with forty gigabytes on macOS and sixty otherwise for the
|
||||
// maximum event log size. We'll need to do something more sophisticated for
|
||||
// real in the long term, though.
|
||||
//
|
||||
#ifdef U3_OS_osx
|
||||
const size_t siz_w = 42949672960;
|
||||
#else
|
||||
const size_t siz_w = 64424509440;;
|
||||
#endif
|
||||
|
||||
if ( 0 == (log_u->mdb_u = c3_lmdb_init(log_c, siz_w)) ) {
|
||||
fprintf(stderr, "disk: failed to initialize database");
|
||||
c3_free(log_c);
|
||||
c3_free(log_u);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
c3_free(log_c);
|
||||
@ -123,8 +150,9 @@ u3_disk_init(c3_c* pax_c, u3_disk_cb cb_u)
|
||||
//
|
||||
{
|
||||
log_u->dun_d = 0;
|
||||
c3_d fir_d;
|
||||
|
||||
if ( c3n == u3_lmdb_get_latest_event_number(log_u->db_u, &log_u->dun_d) ) {
|
||||
if ( c3n == c3_lmdb_gulf(log_u->mdb_u, &fir_d, &log_u->dun_d) ) {
|
||||
fprintf(stderr, "disk: failed to load latest event from database");
|
||||
c3_free(log_u);
|
||||
return 0;
|
||||
@ -138,12 +166,59 @@ u3_disk_init(c3_c* pax_c, u3_disk_cb cb_u)
|
||||
return log_u;
|
||||
}
|
||||
|
||||
static void
|
||||
_disk_meta_read_cb(void* vod_p, size_t val_i, void* val_p)
|
||||
{
|
||||
u3_weak* mat = vod_p;
|
||||
|
||||
if ( val_p ) {
|
||||
*mat = u3i_bytes(val_i, val_p);
|
||||
}
|
||||
}
|
||||
|
||||
static u3_weak
|
||||
_disk_read_meta(u3_disk* log_u, const c3_c* key_c)
|
||||
{
|
||||
u3_weak mat = u3_none;
|
||||
|
||||
c3_lmdb_read_meta(log_u->mdb_u, &mat, "who", _disk_meta_read_cb);
|
||||
|
||||
if ( u3_none == mat ) {
|
||||
return u3_none;
|
||||
}
|
||||
|
||||
{
|
||||
u3_noun pro = u3m_soft(0, u3ke_cue, mat);
|
||||
u3_noun tag, dat;
|
||||
u3x_cell(pro, &tag, &dat);
|
||||
|
||||
if ( u3_blip == tag ) {
|
||||
u3k(dat);
|
||||
u3z(pro);
|
||||
return dat;
|
||||
}
|
||||
else {
|
||||
fprintf(stderr, "disk: meta cue failed\r\n");
|
||||
u3z(pro);
|
||||
return u3_none;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
c3_o
|
||||
u3_disk_read_header(u3_disk* log_u, c3_d* who_d, c3_o* fak_o, c3_w* lif_w)
|
||||
{
|
||||
u3_noun who, fak, lif;
|
||||
u3_weak who = _disk_read_meta(log_u, "who");
|
||||
u3_weak fak = _disk_read_meta(log_u, "is-fake");
|
||||
u3_weak lif = _disk_read_meta(log_u, "life");
|
||||
|
||||
if ( c3n == u3_lmdb_read_identity(log_u->db_u, &who, &fak, &lif) ) {
|
||||
if ( u3_none == who ) {
|
||||
return c3n;
|
||||
}
|
||||
else if ( (u3_none == fak)
|
||||
|| (u3_none == lif) )
|
||||
{
|
||||
u3z(who);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
@ -167,25 +242,64 @@ u3_disk_read_header(u3_disk* log_u, c3_d* who_d, c3_o* fak_o, c3_w* lif_w)
|
||||
}
|
||||
|
||||
u3z(who);
|
||||
|
||||
return c3y;
|
||||
}
|
||||
|
||||
static c3_o
|
||||
_disk_save_meta(u3_disk* log_u, const c3_c* key_c, u3_atom dat)
|
||||
{
|
||||
u3_atom mat = u3ke_jam(dat);
|
||||
c3_w len_w = u3r_met(3, mat);
|
||||
c3_y* byt_y = c3_malloc(len_w);
|
||||
c3_o ret_o;
|
||||
|
||||
u3r_bytes(0, len_w, byt_y, mat);
|
||||
|
||||
ret_o = c3_lmdb_save_meta(log_u->mdb_u, key_c, len_w, byt_y);
|
||||
|
||||
u3z(mat);
|
||||
c3_free(byt_y);
|
||||
|
||||
return ret_o;
|
||||
}
|
||||
|
||||
c3_o
|
||||
u3_disk_write_header(u3_disk* log_u, c3_d who_d[2], c3_o fak_o, c3_w lif_w)
|
||||
{
|
||||
c3_assert( c3y == u3a_is_cat(lif_w) );
|
||||
u3_noun who = u3i_chubs(2, who_d);
|
||||
return u3_lmdb_write_identity(log_u->db_u, who, fak_o, lif_w);
|
||||
|
||||
if ( (c3n == _disk_save_meta(log_u, "who", u3i_chubs(2, who_d)))
|
||||
|| (c3n == _disk_save_meta(log_u, "is-fake", fak_o))
|
||||
|| (c3n == _disk_save_meta(log_u, "life", lif_w)) )
|
||||
{
|
||||
// XX dispose?
|
||||
//
|
||||
return c3n;
|
||||
}
|
||||
|
||||
return c3y;
|
||||
}
|
||||
|
||||
static void
|
||||
_disk_free_batch(u3_db_batch* bat_u)
|
||||
{
|
||||
while ( bat_u->len_d-- ) {
|
||||
c3_free(bat_u->byt_p[bat_u->len_d]);
|
||||
}
|
||||
|
||||
c3_free(bat_u->byt_p);
|
||||
c3_free(bat_u->siz_i);
|
||||
c3_free(bat_u);
|
||||
}
|
||||
|
||||
/* _disk_commit_done(): commit complete.
|
||||
*/
|
||||
static void
|
||||
_disk_commit_done(c3_o ret_o, void* vod_p, c3_d eve_d, c3_d len_d)
|
||||
_disk_commit_done(void* vod_p, c3_o ret_o, u3_db_batch* bat_u)
|
||||
{
|
||||
u3_disk* log_u = vod_p;
|
||||
c3_d eve_d = bat_u->eve_d;
|
||||
c3_d len_d = bat_u->len_d;
|
||||
|
||||
if ( c3n == ret_o ) {
|
||||
log_u->cb_u.write_bail_f(log_u->cb_u.vod_p, eve_d + (len_d - 1ULL));
|
||||
@ -232,10 +346,71 @@ _disk_commit_done(c3_o ret_o, void* vod_p, c3_d eve_d, c3_d len_d)
|
||||
log_u->put_u.ent_u = 0;
|
||||
}
|
||||
|
||||
_disk_free_batch(bat_u);
|
||||
|
||||
log_u->hol_o = c3n;
|
||||
_disk_commit(log_u);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/* _disk_commit_after_cb(): Implementation of c3_lmdb_write_event()
|
||||
**
|
||||
** This is always run on the main loop thread after the worker thread event
|
||||
** completes.
|
||||
*/
|
||||
static void
|
||||
_disk_commit_after_cb(uv_work_t* ted_u, int status)
|
||||
{
|
||||
struct _cd_save* req_u = ted_u->data;
|
||||
_disk_commit_done(req_u->log_u, req_u->ret_o, req_u->bat_u);
|
||||
c3_free(req_u);
|
||||
c3_free(ted_u);
|
||||
}
|
||||
|
||||
/* _lmdb_write_event_cb(): Implementation of c3_lmdb_write_event()
|
||||
**
|
||||
** This is always run on a libuv background worker thread; actual nouns cannot
|
||||
** be touched here.
|
||||
*/
|
||||
static void
|
||||
_disk_commit_cb(uv_work_t* ted_u)
|
||||
{
|
||||
struct _cd_save* req_u = ted_u->data;
|
||||
u3_db_batch* bat_u = req_u->bat_u;
|
||||
req_u->ret_o = c3_lmdb_save(req_u->log_u->mdb_u,
|
||||
bat_u->eve_d,
|
||||
bat_u->len_d,
|
||||
bat_u->byt_p,
|
||||
bat_u->siz_i);
|
||||
}
|
||||
|
||||
/* c3_lmdb_write_event(): Asynchronously writes events to the database.
|
||||
**
|
||||
** This writes all the passed in events along with log metadata updates to the
|
||||
** database as a single transaction on a worker thread. Once the transaction
|
||||
** is completed, it calls the passed in callback on the main loop thread.
|
||||
*/
|
||||
static void
|
||||
_disk_commit_start(u3_disk* log_u, u3_db_batch* bat_u)
|
||||
{
|
||||
// structure to pass to the worker thread.
|
||||
//
|
||||
struct _cd_save* req_u = c3_malloc(sizeof(*req_u));
|
||||
req_u->log_u = log_u;
|
||||
req_u->bat_u = bat_u;
|
||||
req_u->ret_o = c3n;
|
||||
|
||||
// queue asynchronous work to happen on another thread
|
||||
//
|
||||
uv_work_t* ted_u = c3_malloc(sizeof(*ted_u));
|
||||
ted_u->data = req_u;
|
||||
|
||||
uv_queue_work(u3L, ted_u, _disk_commit_cb,
|
||||
_disk_commit_after_cb);
|
||||
}
|
||||
|
||||
static void
|
||||
_disk_commit(u3_disk* log_u)
|
||||
{
|
||||
@ -281,10 +456,7 @@ _disk_commit(u3_disk* log_u)
|
||||
}
|
||||
#endif
|
||||
|
||||
u3_lmdb_write_event(log_u->db_u, (u3_pier*)log_u,
|
||||
(struct u3_lmdb_write_request*)bat_u,
|
||||
(void(*)(c3_o, u3_pier*, c3_d, c3_d))_disk_commit_done);
|
||||
|
||||
_disk_commit_start(log_u, bat_u);
|
||||
log_u->hol_o = c3y;
|
||||
}
|
||||
}
|
||||
@ -369,7 +541,7 @@ _disk_read_done_cb(uv_timer_t* tim_u)
|
||||
}
|
||||
|
||||
static c3_o
|
||||
_disk_read_one_cb(void* vod_p, c3_d eve_d, u3_atom mat)
|
||||
_disk_read_one_cb(void* vod_p, c3_d eve_d, size_t val_i, void* val_p)
|
||||
{
|
||||
struct _cd_read* red_u = vod_p;
|
||||
u3_disk* log_u = red_u->log_u;
|
||||
@ -379,7 +551,7 @@ _disk_read_one_cb(void* vod_p, c3_d eve_d, u3_atom mat)
|
||||
{
|
||||
// xx soft?
|
||||
//
|
||||
u3_noun dat = u3ke_cue(mat);
|
||||
u3_noun dat = u3ke_cue(u3i_bytes(val_i, val_p));
|
||||
u3_noun mug, job;
|
||||
|
||||
if ( (c3n == u3r_cell(dat, &mug, &job))
|
||||
@ -420,11 +592,11 @@ _disk_read_start_cb(uv_timer_t* tim_u)
|
||||
|
||||
uv_timer_start(&log_u->tim_u, _disk_read_done_cb, 0, 0);
|
||||
|
||||
if ( c3n == u3_lmdb_read_events(log_u->db_u,
|
||||
red_u->eve_d,
|
||||
red_u->len_d,
|
||||
red_u,
|
||||
_disk_read_one_cb) )
|
||||
if ( c3n == c3_lmdb_read(log_u->mdb_u,
|
||||
red_u,
|
||||
red_u->eve_d,
|
||||
red_u->len_d,
|
||||
_disk_read_one_cb) )
|
||||
{
|
||||
log_u->cb_u.read_bail_f(log_u->cb_u.vod_p, red_u->eve_d);
|
||||
}
|
||||
@ -448,7 +620,7 @@ u3_disk_read(u3_disk* log_u, c3_d eve_d, c3_d len_d)
|
||||
void
|
||||
u3_disk_exit(u3_disk* log_u)
|
||||
{
|
||||
u3_lmdb_shutdown(log_u->db_u);
|
||||
c3_lmdb_exit(log_u->mdb_u);
|
||||
// XX dispose
|
||||
//
|
||||
}
|
||||
|
@ -1,673 +0,0 @@
|
||||
/* vere/lmdb.c
|
||||
*/
|
||||
|
||||
#include "all.h"
|
||||
|
||||
#include <uv.h>
|
||||
#include <lmdb.h>
|
||||
|
||||
#include "vere/vere.h"
|
||||
|
||||
// Event log persistence for Urbit
|
||||
//
|
||||
// Persistence works by having an lmdb environment opened on the main
|
||||
// thread. This environment is used to create read-only transactions
|
||||
// synchronously when needed.
|
||||
//
|
||||
// But the majority of lmdb writes operate asynchronously in the uv worker
|
||||
// pool. Since individual transactions are bound to threads, we perform all
|
||||
// blocking writing on worker threads.
|
||||
//
|
||||
// We perform the very first metadata writes on the main thread because we
|
||||
// can't do anything until they persist.
|
||||
|
||||
/* u3_lmdb_init(): Opens up a log environment
|
||||
**
|
||||
** Precondition: log_path points to an already created directory
|
||||
*/
|
||||
MDB_env* u3_lmdb_init(const char* log_path)
|
||||
{
|
||||
MDB_env* env = 0;
|
||||
c3_w ret_w = mdb_env_create(&env);
|
||||
if (ret_w != 0) {
|
||||
u3l_log("lmdb: init fail: %s\n", mdb_strerror(ret_w));
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Our databases have up to three tables: META, EVENTS, and GRAINS.
|
||||
ret_w = mdb_env_set_maxdbs(env, 3);
|
||||
if (ret_w != 0) {
|
||||
u3l_log("lmdb: failed to set number of databases: %s\n", mdb_strerror(ret_w));
|
||||
return 0;
|
||||
}
|
||||
|
||||
// TODO: Start with forty gigabytes on macOS and sixty otherwise for the
|
||||
// maximum event log size. We'll need to do something more sophisticated for
|
||||
// real in the long term, though.
|
||||
//
|
||||
#ifdef U3_OS_osx
|
||||
const size_t lmdb_mapsize = 42949672960;
|
||||
#else
|
||||
const size_t lmdb_mapsize = 64424509440;;
|
||||
#endif
|
||||
ret_w = mdb_env_set_mapsize(env, lmdb_mapsize);
|
||||
if (ret_w != 0) {
|
||||
u3l_log("lmdb: failed to set database size: %s\n", mdb_strerror(ret_w));
|
||||
return 0;
|
||||
}
|
||||
|
||||
ret_w = mdb_env_open(env, log_path, 0, 0664);
|
||||
if (ret_w != 0) {
|
||||
u3l_log("lmdb: failed to open event log: %s\n", mdb_strerror(ret_w));
|
||||
return 0;
|
||||
}
|
||||
|
||||
return env;
|
||||
}
|
||||
|
||||
/* u3_lmdb_shutdown(): Shuts down lmdb
|
||||
*/
|
||||
void u3_lmdb_shutdown(MDB_env* env)
|
||||
{
|
||||
mdb_env_close(env);
|
||||
}
|
||||
|
||||
/* _perform_put_on_database_raw(): Writes a key/value pair to a specific
|
||||
** database as part of a transaction.
|
||||
**
|
||||
** The raw version doesn't take ownership of either key/value and performs no
|
||||
** nock calculations, so it is safe to call from any thread.
|
||||
*/
|
||||
static
|
||||
c3_o _perform_put_on_database_raw(MDB_txn* transaction_u,
|
||||
MDB_dbi database_u,
|
||||
c3_w flags,
|
||||
void* key,
|
||||
size_t key_len,
|
||||
void* value,
|
||||
size_t value_len) {
|
||||
MDB_val key_val, value_val;
|
||||
|
||||
key_val.mv_size = key_len;
|
||||
key_val.mv_data = key;
|
||||
|
||||
value_val.mv_size = value_len;
|
||||
value_val.mv_data = value;
|
||||
|
||||
c3_w ret_w = mdb_put(transaction_u, database_u, &key_val, &value_val, flags);
|
||||
if (ret_w != 0) {
|
||||
u3l_log("lmdb: write failed: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
return c3y;
|
||||
}
|
||||
|
||||
/* _perform_get_on_database_raw(): Reads a key/value pair to a specific
|
||||
** database as part of a transaction.
|
||||
*/
|
||||
static
|
||||
c3_o _perform_get_on_database_raw(MDB_txn* transaction_u,
|
||||
MDB_dbi database_u,
|
||||
void* key,
|
||||
size_t key_len,
|
||||
MDB_val* value) {
|
||||
MDB_val key_val;
|
||||
key_val.mv_size = key_len;
|
||||
key_val.mv_data = key;
|
||||
|
||||
c3_w ret_w = mdb_get(transaction_u, database_u, &key_val, value);
|
||||
if (ret_w != 0) {
|
||||
u3l_log("lmdb: read failed: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
return c3y;
|
||||
}
|
||||
|
||||
/* _perform_put_on_database_noun(): Writes a noun to the database.
|
||||
**
|
||||
** This requires access to the loom so it must only be run from the libuv
|
||||
** thread.
|
||||
*/
|
||||
static
|
||||
c3_o _perform_put_on_database_noun(MDB_txn* transaction_u,
|
||||
MDB_dbi database_u,
|
||||
c3_c* key,
|
||||
u3_noun noun) {
|
||||
// jam noun into an atom representation
|
||||
u3_atom mat = u3ke_jam(noun);
|
||||
|
||||
// copy the jammed noun into a byte buffer we can hand to lmdb
|
||||
c3_w len_w = u3r_met(3, mat);
|
||||
c3_y* bytes_y = c3_malloc(len_w);
|
||||
u3r_bytes(0, len_w, bytes_y, mat);
|
||||
|
||||
c3_o ret = _perform_put_on_database_raw(
|
||||
transaction_u,
|
||||
database_u,
|
||||
0,
|
||||
key, strlen(key),
|
||||
bytes_y, len_w);
|
||||
|
||||
c3_free(bytes_y);
|
||||
u3z(mat);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* _perform_get_on_database_noun(): Reads a noun from the database.
|
||||
**
|
||||
** This requires access to the loom so it must only be run from the libuv
|
||||
** thread.
|
||||
*/
|
||||
static
|
||||
c3_o _perform_get_on_database_noun(MDB_txn* transaction_u,
|
||||
MDB_dbi database_u,
|
||||
c3_c* key,
|
||||
u3_noun* noun) {
|
||||
MDB_val value_val;
|
||||
c3_o ret = _perform_get_on_database_raw(transaction_u,
|
||||
database_u,
|
||||
key, strlen(key),
|
||||
&value_val);
|
||||
if (ret == c3n) {
|
||||
return c3y;
|
||||
}
|
||||
|
||||
// Take the bytes and cue them.
|
||||
u3_atom raw_atom = u3i_bytes(value_val.mv_size, value_val.mv_data);
|
||||
*noun = u3qe_cue(raw_atom);
|
||||
return c3y;
|
||||
}
|
||||
|
||||
/* u3_lmdb_write_request: Events to be written together
|
||||
*/
|
||||
struct u3_lmdb_write_request {
|
||||
// The event number of the first event.
|
||||
c3_d first_event;
|
||||
|
||||
// The number of events in this write request. Nonzero.
|
||||
c3_d event_count;
|
||||
|
||||
// An array of serialized event datas. The array size is |event_count|. We
|
||||
// perform the event serialization on the main thread so we can read the loom
|
||||
// and write into a malloced structure for the worker thread.
|
||||
void** malloced_event_data;
|
||||
|
||||
// An array of sizes of serialized event datas. We keep track of this for the
|
||||
// database write.
|
||||
size_t* malloced_event_data_size;
|
||||
};
|
||||
|
||||
/* u3_lmdb_build_write_request(): Allocates and builds a write request
|
||||
*/
|
||||
struct u3_lmdb_write_request*
|
||||
u3_lmdb_build_write_request(u3_writ* event_u, c3_d count)
|
||||
{
|
||||
struct u3_lmdb_write_request* request =
|
||||
c3_malloc(sizeof(struct u3_lmdb_write_request));
|
||||
request->first_event = event_u->evt_d;
|
||||
request->event_count = count;
|
||||
request->malloced_event_data = c3_malloc(sizeof(void*) * count);
|
||||
request->malloced_event_data_size = c3_malloc(sizeof(size_t) * count);
|
||||
|
||||
for (c3_d i = 0; i < count; ++i) {
|
||||
// Sanity check that the events in u3_writ are in order.
|
||||
c3_assert(event_u->evt_d == (request->first_event + i));
|
||||
|
||||
// Serialize the jammed event log entry into a malloced buffer we can send
|
||||
// to the other thread.
|
||||
c3_w siz_w = u3r_met(3, event_u->mat);
|
||||
c3_y* data_u = c3_calloc(siz_w);
|
||||
u3r_bytes(0, siz_w, data_u, event_u->mat);
|
||||
|
||||
request->malloced_event_data[i] = data_u;
|
||||
request->malloced_event_data_size[i] = siz_w;
|
||||
|
||||
event_u = event_u->nex_u;
|
||||
}
|
||||
|
||||
return request;
|
||||
}
|
||||
|
||||
/* u3_lmdb_free_write_request(): Frees a write request
|
||||
*/
|
||||
void u3_lmdb_free_write_request(struct u3_lmdb_write_request* request) {
|
||||
for (c3_d i = 0; i < request->event_count; ++i)
|
||||
c3_free(request->malloced_event_data[i]);
|
||||
|
||||
c3_free(request->malloced_event_data);
|
||||
c3_free(request->malloced_event_data_size);
|
||||
c3_free(request);
|
||||
}
|
||||
|
||||
/* _write_request_data: callback struct for u3_lmdb_write_event()
|
||||
*/
|
||||
struct _write_request_data {
|
||||
// The database environment to write to. This object is thread-safe, though
|
||||
// the transactions and handles opened from it are explicitly not.
|
||||
MDB_env* environment;
|
||||
|
||||
// The pier that we're writing for.
|
||||
u3_pier* pir_u;
|
||||
|
||||
// The encapsulated request. This may contain multiple event writes.
|
||||
struct u3_lmdb_write_request* request;
|
||||
|
||||
// Whether the write completed successfully.
|
||||
c3_o success;
|
||||
|
||||
// Called on main loop thread on completion.
|
||||
void (*on_complete)(c3_o, u3_pier*, c3_d, c3_d);
|
||||
};
|
||||
|
||||
/* _u3_lmdb_write_event_cb(): Implementation of u3_lmdb_write_event()
|
||||
**
|
||||
** This is always run on a libuv background worker thread; actual nouns cannot
|
||||
** be touched here.
|
||||
*/
|
||||
static void _u3_lmdb_write_event_cb(uv_work_t* req) {
|
||||
struct _write_request_data* data = req->data;
|
||||
|
||||
// Creates the write transaction.
|
||||
MDB_txn* transaction_u;
|
||||
c3_w ret_w = mdb_txn_begin(data->environment,
|
||||
(MDB_txn *) NULL,
|
||||
0, /* flags */
|
||||
&transaction_u);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: write: txn_begin fail: %s\n", mdb_strerror(ret_w));
|
||||
return;
|
||||
}
|
||||
|
||||
// Opens the database as part of the transaction.
|
||||
c3_w flags_w = MDB_CREATE | MDB_INTEGERKEY;
|
||||
MDB_dbi database_u;
|
||||
ret_w = mdb_dbi_open(transaction_u,
|
||||
"EVENTS",
|
||||
flags_w,
|
||||
&database_u);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: write: dbi_open fail: %s\n", mdb_strerror(ret_w));
|
||||
return;
|
||||
}
|
||||
|
||||
struct u3_lmdb_write_request* request = data->request;
|
||||
for (c3_d i = 0; i < request->event_count; ++i) {
|
||||
c3_d event_number = request->first_event + i;
|
||||
|
||||
c3_o success = _perform_put_on_database_raw(
|
||||
transaction_u,
|
||||
database_u,
|
||||
MDB_NOOVERWRITE,
|
||||
&event_number,
|
||||
sizeof(c3_d),
|
||||
request->malloced_event_data[i],
|
||||
request->malloced_event_data_size[i]);
|
||||
|
||||
if (success == c3n) {
|
||||
u3l_log("lmdb: failed to write event %" PRIu64 "\n", event_number);
|
||||
mdb_txn_abort(transaction_u);
|
||||
data->success = c3n;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
ret_w = mdb_txn_commit(transaction_u);
|
||||
if (0 != ret_w) {
|
||||
if ( request->event_count == 1 ) {
|
||||
u3l_log("lmdb: failed to commit event %" PRIu64 ": %s\n",
|
||||
request->first_event,
|
||||
mdb_strerror(ret_w));
|
||||
} else {
|
||||
c3_d through = request->first_event + request->event_count - 1ULL;
|
||||
u3l_log("lmdb: failed to commit events %" PRIu64 " through %" PRIu64
|
||||
": %s\n",
|
||||
request->first_event,
|
||||
through,
|
||||
mdb_strerror(ret_w));
|
||||
}
|
||||
data->success = c3n;
|
||||
return;
|
||||
}
|
||||
|
||||
data->success = c3y;
|
||||
}
|
||||
|
||||
/* _u3_lmdb_write_event_after_cb(): Implementation of u3_lmdb_write_event()
|
||||
**
|
||||
** This is always run on the main loop thread after the worker thread event
|
||||
** completes.
|
||||
*/
|
||||
static void _u3_lmdb_write_event_after_cb(uv_work_t* req, int status) {
|
||||
struct _write_request_data* data = req->data;
|
||||
|
||||
data->on_complete(data->success,
|
||||
data->pir_u,
|
||||
data->request->first_event,
|
||||
data->request->event_count);
|
||||
|
||||
u3_lmdb_free_write_request(data->request);
|
||||
c3_free(data);
|
||||
c3_free(req);
|
||||
}
|
||||
|
||||
/* u3_lmdb_write_event(): Asynchronously writes events to the database.
|
||||
**
|
||||
** This writes all the passed in events along with log metadata updates to the
|
||||
** database as a single transaction on a worker thread. Once the transaction
|
||||
** is completed, it calls the passed in callback on the main loop thread.
|
||||
*/
|
||||
void u3_lmdb_write_event(MDB_env* environment,
|
||||
u3_pier* pir_u,
|
||||
struct u3_lmdb_write_request* request_u,
|
||||
void (*on_complete)(c3_o, u3_pier*, c3_d, c3_d))
|
||||
{
|
||||
// Structure to pass to the worker thread.
|
||||
struct _write_request_data* data = c3_malloc(sizeof(struct _write_request_data));
|
||||
data->environment = environment;
|
||||
data->pir_u = pir_u;
|
||||
data->request = request_u;
|
||||
data->on_complete = on_complete;
|
||||
data->success = c3n;
|
||||
|
||||
// Queue asynchronous work to happen on the other thread.
|
||||
uv_work_t* req = c3_malloc(sizeof(uv_work_t));
|
||||
req->data = data;
|
||||
|
||||
uv_queue_work(uv_default_loop(),
|
||||
req,
|
||||
_u3_lmdb_write_event_cb,
|
||||
_u3_lmdb_write_event_after_cb);
|
||||
}
|
||||
|
||||
/* u3_lmdb_read_events(): Synchronously reads events from the database.
|
||||
**
|
||||
** Reads back up to |len_d| events starting with |first_event_d|. For
|
||||
** each event, the event will be passed to |on_event_read| and further
|
||||
** reading will be aborted if the callback returns c3n.
|
||||
**
|
||||
** Returns c3y on complete success; c3n on any error.
|
||||
*/
|
||||
c3_o
|
||||
u3_lmdb_read_events(MDB_env* db_u,
|
||||
c3_d first_event_d,
|
||||
c3_d len_d,
|
||||
void* vod_p,
|
||||
c3_o(*on_event_read)(void*, c3_d, u3_atom))
|
||||
{
|
||||
// Creates the read transaction.
|
||||
MDB_txn* transaction_u;
|
||||
c3_w ret_w = mdb_txn_begin(db_u,
|
||||
//environment,
|
||||
(MDB_txn *) NULL,
|
||||
MDB_RDONLY, /* flags */
|
||||
&transaction_u);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: read txn_begin fail: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// Opens the database as part of the transaction.
|
||||
c3_w flags_w = MDB_CREATE | MDB_INTEGERKEY;
|
||||
MDB_dbi database_u;
|
||||
ret_w = mdb_dbi_open(transaction_u,
|
||||
"EVENTS",
|
||||
flags_w,
|
||||
&database_u);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: read: dbi_open fail: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// Creates a cursor to iterate over keys starting at first_event_d.
|
||||
MDB_cursor* cursor_u;
|
||||
ret_w = mdb_cursor_open(transaction_u, database_u, &cursor_u);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: read: cursor_open fail: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// Sets the cursor to the position of first_event_d.
|
||||
MDB_val key;
|
||||
MDB_val val;
|
||||
key.mv_size = sizeof(c3_d);
|
||||
key.mv_data = &first_event_d;
|
||||
|
||||
ret_w = mdb_cursor_get(cursor_u, &key, &val, MDB_SET_KEY);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: read: could not find initial event %" PRIu64 ": %s\r\n",
|
||||
first_event_d, mdb_strerror(ret_w));
|
||||
mdb_cursor_close(cursor_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// Load up to len_d events, iterating forward across the cursor.
|
||||
for (c3_d loaded = 0; (ret_w != MDB_NOTFOUND) && (loaded < len_d); ++loaded) {
|
||||
// As a sanity check, we make sure that there aren't any discontinuities in
|
||||
// the sequence of loaded events.
|
||||
c3_d current_id = first_event_d + loaded;
|
||||
if (key.mv_size != sizeof(c3_d)) {
|
||||
u3l_log("lmdb: read: invalid cursor key\r\n");
|
||||
return c3n;
|
||||
}
|
||||
if (*(c3_d*)key.mv_data != current_id) {
|
||||
u3l_log("lmdb: read: missing event in database. Expected %" PRIu64 ", received %"
|
||||
PRIu64 "\r\n",
|
||||
current_id,
|
||||
*(c3_d*)key.mv_data);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// Now build the atom version and then the cued version from the raw data
|
||||
if ( c3n == on_event_read(vod_p, current_id, u3i_bytes(val.mv_size, val.mv_data)) ) {
|
||||
// XX remove
|
||||
//
|
||||
u3l_log("lmdb: read: aborting replay due to error.\r\n");
|
||||
return c3n;
|
||||
}
|
||||
|
||||
ret_w = mdb_cursor_get(cursor_u, &key, &val, MDB_NEXT);
|
||||
if (ret_w != 0 && ret_w != MDB_NOTFOUND) {
|
||||
u3l_log("lmdb: read: error while loading events: %s\r\n",
|
||||
mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
}
|
||||
|
||||
mdb_cursor_close(cursor_u);
|
||||
|
||||
// Read-only transactions are aborted since we don't need to record the fact
|
||||
// that we performed a read.
|
||||
mdb_txn_abort(transaction_u);
|
||||
|
||||
return c3y;
|
||||
}
|
||||
|
||||
/* u3_lmdb_get_latest_event_number(): Gets last event id persisted
|
||||
**
|
||||
** Reads the last key in order from the EVENTS table as the latest event
|
||||
** number. On table empty, returns c3y but doesn't modify event_number.
|
||||
*/
|
||||
c3_o u3_lmdb_get_latest_event_number(MDB_env* environment, c3_d* event_number)
|
||||
{
|
||||
// Creates the read transaction.
|
||||
MDB_txn* transaction_u;
|
||||
c3_w ret_w = mdb_txn_begin(environment,
|
||||
(MDB_txn *) NULL,
|
||||
0, /* flags */
|
||||
&transaction_u);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: last: txn_begin fail: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// Opens the database as part of the transaction.
|
||||
c3_w flags_w = MDB_CREATE | MDB_INTEGERKEY;
|
||||
MDB_dbi database_u;
|
||||
ret_w = mdb_dbi_open(transaction_u,
|
||||
"EVENTS",
|
||||
flags_w,
|
||||
&database_u);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: last: dbi_open fail: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// Creates a cursor to point to the last event
|
||||
MDB_cursor* cursor_u;
|
||||
ret_w = mdb_cursor_open(transaction_u, database_u, &cursor_u);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: last: cursor_open fail: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// Set the cursor at the end of the line.
|
||||
MDB_val key;
|
||||
MDB_val val;
|
||||
ret_w = mdb_cursor_get(cursor_u, &key, &val, MDB_LAST);
|
||||
if (MDB_NOTFOUND == ret_w) {
|
||||
// Clean up, but don't error out.
|
||||
mdb_cursor_close(cursor_u);
|
||||
mdb_txn_abort(transaction_u);
|
||||
return c3y;
|
||||
}
|
||||
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: could not find last event: %s\r\n", mdb_strerror(ret_w));
|
||||
mdb_cursor_close(cursor_u);
|
||||
mdb_txn_abort(transaction_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
*event_number = *(c3_d*)key.mv_data;
|
||||
|
||||
mdb_cursor_close(cursor_u);
|
||||
|
||||
// Read-only transactions are aborted since we don't need to record the fact
|
||||
// that we performed a read.
|
||||
mdb_txn_abort(transaction_u);
|
||||
|
||||
return c3y;
|
||||
}
|
||||
|
||||
/* u3_lmdb_write_identity(): Writes the event log identity information
|
||||
**
|
||||
** We have a secondary database (table) in this environment named META where we
|
||||
** read/write identity information from/to.
|
||||
*/
|
||||
c3_o u3_lmdb_write_identity(MDB_env* environment,
|
||||
u3_noun who,
|
||||
u3_noun is_fake,
|
||||
u3_noun life)
|
||||
{
|
||||
// Creates the write transaction.
|
||||
MDB_txn* transaction_u;
|
||||
c3_w ret_w = mdb_txn_begin(environment,
|
||||
(MDB_txn *) NULL,
|
||||
0, /* flags */
|
||||
&transaction_u);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: meta write: txn_begin fail: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// Opens the database as part of the transaction.
|
||||
c3_w flags_w = MDB_CREATE;
|
||||
MDB_dbi database_u;
|
||||
ret_w = mdb_dbi_open(transaction_u,
|
||||
"META",
|
||||
flags_w,
|
||||
&database_u);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: meta write: dbi_open fail: %s\n", mdb_strerror(ret_w));
|
||||
mdb_txn_abort(transaction_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
c3_o ret;
|
||||
ret = _perform_put_on_database_noun(transaction_u, database_u, "who", who);
|
||||
if (ret == c3n) {
|
||||
mdb_txn_abort(transaction_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
ret = _perform_put_on_database_noun(transaction_u, database_u, "is-fake",
|
||||
is_fake);
|
||||
if (ret == c3n) {
|
||||
mdb_txn_abort(transaction_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
ret = _perform_put_on_database_noun(transaction_u, database_u, "life", life);
|
||||
if (ret == c3n) {
|
||||
mdb_txn_abort(transaction_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
ret_w = mdb_txn_commit(transaction_u);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: meta write: failed to commit transaction: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
return c3y;
|
||||
}
|
||||
|
||||
|
||||
/* u3_lmdb_read_identity(): Reads the event log identity information.
|
||||
*/
|
||||
c3_o u3_lmdb_read_identity(MDB_env* environment,
|
||||
u3_noun* who,
|
||||
u3_noun* is_fake,
|
||||
u3_noun* life) {
|
||||
// Creates the write transaction.
|
||||
MDB_txn* transaction_u;
|
||||
c3_w ret_w = mdb_txn_begin(environment,
|
||||
(MDB_txn *) NULL,
|
||||
MDB_RDONLY, /* flags */
|
||||
&transaction_u);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: meta read: txn_begin fail: %s\n", mdb_strerror(ret_w));
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// Opens the database as part of the transaction.
|
||||
MDB_dbi database_u;
|
||||
ret_w = mdb_dbi_open(transaction_u,
|
||||
"META",
|
||||
0,
|
||||
&database_u);
|
||||
if (0 != ret_w) {
|
||||
u3l_log("lmdb: meta read: dbi_open fail: %s\n", mdb_strerror(ret_w));
|
||||
mdb_txn_abort(transaction_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
c3_o ret;
|
||||
ret = _perform_get_on_database_noun(transaction_u, database_u, "who", who);
|
||||
if (ret == c3n) {
|
||||
mdb_txn_abort(transaction_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
ret = _perform_get_on_database_noun(transaction_u, database_u, "is-fake",
|
||||
is_fake);
|
||||
if (ret == c3n) {
|
||||
mdb_txn_abort(transaction_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
ret = _perform_get_on_database_noun(transaction_u, database_u, "life", life);
|
||||
if (ret == c3n) {
|
||||
mdb_txn_abort(transaction_u);
|
||||
return c3n;
|
||||
}
|
||||
|
||||
// Read-only transactions are aborted since we don't need to record the fact
|
||||
// that we performed a read.
|
||||
mdb_txn_abort(transaction_u);
|
||||
|
||||
return c3y;
|
||||
}
|
Loading…
Reference in New Issue
Block a user