diff --git a/nix/deps-env.nix b/nix/deps-env.nix index 2810652db..370939365 100644 --- a/nix/deps-env.nix +++ b/nix/deps-env.nix @@ -10,7 +10,7 @@ let libs = with pkgs; - [ openssl zlib curl gmp scrypt libsigsegv ncurses openssl zlib ]; + [ openssl zlib curl gmp scrypt libsigsegv ncurses openssl zlib lmdb ]; osx = with pkgs; diff --git a/nix/pkgs/urbit/default.nix b/nix/pkgs/urbit/default.nix index 8c80427d4..5eedebfea 100644 --- a/nix/pkgs/urbit/default.nix +++ b/nix/pkgs/urbit/default.nix @@ -11,7 +11,7 @@ let deps = with pkgs; - [ curl gmp libsigsegv ncurses openssl zlib ]; + [ curl gmp libsigsegv ncurses openssl zlib lmdb ]; vendor = [ argon2 softfloat3 ed25519 ent h2o scrypt uv murmur3 secp256k1 sni ]; diff --git a/nix/pkgs/urbit/release.nix b/nix/pkgs/urbit/release.nix index 615344c0f..005f001b8 100644 --- a/nix/pkgs/urbit/release.nix +++ b/nix/pkgs/urbit/release.nix @@ -6,7 +6,7 @@ let crossdeps = with env; - [ curl libgmp libsigsegv ncurses openssl zlib ]; + [ curl libgmp libsigsegv ncurses openssl zlib lmdb ]; vendor = with deps; diff --git a/pkg/urbit/configure b/pkg/urbit/configure index f45b7b1fc..fff3e354f 100755 --- a/pkg/urbit/configure +++ b/pkg/urbit/configure @@ -6,7 +6,7 @@ URBIT_VERSION=0.8.0 deps=" \ curl gmp sigsegv argon2 ed25519 ent h2o scrypt sni uv murmur3 secp256k1 \ - softfloat3 ncurses ssl crypto z \ + softfloat3 ncurses ssl crypto z lmdb \ " echo '#pragma once' >include/config.h diff --git a/pkg/urbit/include/vere/vere.h b/pkg/urbit/include/vere/vere.h index de181b5ab..69db1bb9e 100644 --- a/pkg/urbit/include/vere/vere.h +++ b/pkg/urbit/include/vere/vere.h @@ -4,6 +4,7 @@ */ #include "h2o.h" +#include /** Quasi-tunable parameters. **/ @@ -306,15 +307,6 @@ struct _u3_moor* nex_u; } u3_moor; - /* u3_foil: abstract chub-addressed file. - */ - typedef struct _u3_foil { - uv_file fil_u; // libuv file handle - struct _u3_dire* dir_u; // parent directory - c3_c* nam_c; // name within parent - c3_d end_d; // end of file - } u3_foil; - /* u3_dent: directory entry. */ typedef struct _u3_dent { @@ -646,9 +638,9 @@ u3_dire* dir_u; // main pier directory u3_dire* urb_u; // urbit system data u3_dire* com_u; // log directory - u3_foil* fol_u; // logfile c3_o liv_o; // live c3_d end_d; // byte end of file + MDB_env* db_u; // lmdb environment. c3_d moc_d; // commit requested c3_d com_d; // committed struct _u3_pier* pir_u; // pier backpointer @@ -862,69 +854,6 @@ u3_dire* u3_foil_folder(const c3_c* pax_c); // directory object, or 0 - /* u3_foil_create(): create a new, empty file, not syncing. - */ - void - u3_foil_create(void (*fun_f)(void*, // context pointer - u3_foil*),// file object - void* vod_p, // context pointer - u3_dire* dir_u, // directory - const c3_c* nam_c); // name of new file - - /* u3_foil_absorb(): absorb logfile, truncating to last good frame; block. - */ - u3_foil* - u3_foil_absorb(u3_dire* dir_u, // directory - c3_c* nam_c); // filename - - /* u3_foil_delete(): delete a file; free descriptor. - */ - void - u3_foil_delete(void (*fun_f)(void*), // context pointer - void* vod_p, // context pointer - u3_foil* fol_u); // file to delete - - /* u3_foil_append(): write a frame at the end of a file, freeing buffer. - */ - void - u3_foil_append(void (*fun_f)(void*), // context pointer - void* vod_p, // context pointer - u3_foil* fol_u, // file - c3_d* buf_d, // buffer to write from - c3_d len_d); // length in chubs - - /* u3_foil_reveal(): read the frame before a position, blocking. - */ - c3_d* - u3_foil_reveal(u3_foil* fol_u, // file from - c3_d* pos_d, // end position/prev end - c3_d* len_d); // length return - - /* u3_foil_commit(): reveal from one file, append to another. - */ - void - u3_foil_commit(void (*fun_f)(void*, // context pointer - u3_foil*, // file from - c3_d, // previous from - u3_foil*, // file to - c3_d), // end of to - void* vod_p, // context pointer - u3_foil* del_u, // file from - c3_d del_d, // end of from frame - u3_foil* unt_u, // file to - c3_d unt_d); // end of to frame - - /* u3_foil_invent(): make new file with one frame; free buffer, sync. - */ - void - u3_foil_invent(void (*fun_f)(void*, // context pointer - u3_foil*), // new file - void* vod_p, // context pointer - u3_dire* dir_u, // directory - c3_c* nam_c, // filename - c3_d* buf_d, // buffer (to free) - c3_d len_d); // length - /** Output. **/ /* u3_reck_kick(): handle effect. @@ -1344,5 +1273,62 @@ void u3_daemon_grab(void* vod_p); + c3_w u3_readdir_r(DIR *dirp, struct dirent *entry, struct dirent **result); + + /* Database + */ + /* u3_lmdb_init(): Initializes lmdb inside log_path + */ + MDB_env* u3_lmdb_init(const char* log_path); + + /* u3_lmdb_shutdown(): Shuts down the entire logging system + */ + void u3_lmdb_shutdown(MDB_env* env); + + /* u3_lmdb_get_latest_event_number(): Gets last event id persisted + */ + c3_o u3_lmdb_get_latest_event_number(MDB_env* environment, + c3_d* event_number); + + /* u3_lmdb_write_event(): Persists an event to the database + */ + void u3_lmdb_write_event(MDB_env* environment, + u3_writ* event_u, + void (*on_complete)(c3_o success, u3_writ*)); + + /* u3_lmdb_read_events(): Reads events back from the database + ** + ** Reads back up to |len_d| events starting with |first_event_d|. For + ** each event, the event will be passed to |on_event_read| and further + ** reading will be aborted if the callback returns c3n. + ** + ** Returns c3y on complete success; c3n on any error. + */ + c3_o u3_lmdb_read_events(u3_pier* pir_u, + c3_d first_event_d, + c3_d len_d, + c3_o(*on_event_read)(u3_pier* pir_u, + c3_d id, + u3_noun mat, + u3_noun ovo)); + + /* u3_lmdb_write_identity(): Writes log identity + ** + ** Returns c3y on complete success; c3n on any error. + */ + c3_o u3_lmdb_write_identity(MDB_env* environment, + u3_noun who, + u3_noun is_fake, + u3_noun life); + + /* u3_lmdb_read_identity(): Reads log identity + ** + ** Returns c3y on complete success; c3n on any error. + */ + c3_o u3_lmdb_read_identity(MDB_env* environment, + u3_noun* who, + u3_noun* is_fake, + u3_noun* life); + diff --git a/pkg/urbit/vere/foil.c b/pkg/urbit/vere/foil.c index 591b2783b..4bd4a401a 100644 --- a/pkg/urbit/vere/foil.c +++ b/pkg/urbit/vere/foil.c @@ -168,462 +168,3 @@ u3_foil_folder(const c3_c* pax_c) } return dir_u; } - -/* u3_foil_create(): create a new, empty, open file, not syncing. -*/ - struct _foil_create_request { - uv_fs_t ruq_u; - void (*fun_f)(void*, u3_foil*); - void* vod_p; - u3_dire* dir_u; - c3_c* nam_c; - c3_c* pax_c; - }; - - static void - _foil_create_cb(uv_fs_t* ruq_u) - { - struct _foil_create_request* req_u = (void *)ruq_u; - u3_foil* fol_u; - - fol_u = c3_malloc(sizeof(*fol_u)); - fol_u->fil_u = ruq_u->result; - fol_u->dir_u = req_u->dir_u; - fol_u->nam_c = req_u->nam_c; - fol_u->end_d = 0; - - req_u->fun_f(req_u->vod_p, fol_u); - - c3_free(req_u->pax_c); - uv_fs_req_cleanup(ruq_u); - c3_free(req_u); - } -void -u3_foil_create(void (*fun_f)(void*, // context pointer - u3_foil*),// file object - void* vod_p, // context pointer - u3_dire* dir_u, // directory - const c3_c* nam_c) // name of new file -{ - c3_c* pax_c; - c3_i err_i; - - /* construct full path - */ - pax_c = _foil_path(dir_u, nam_c); - - /* perform create - */ - { - struct _foil_create_request* req_u; - - req_u = c3_malloc(sizeof(*req_u)); - - req_u->fun_f = fun_f; - req_u->vod_p = vod_p; - req_u->dir_u = dir_u; - req_u->nam_c = c3_malloc(1 + strlen(nam_c)); - strcpy(req_u->nam_c, nam_c); - req_u->pax_c = pax_c; - - if ( 0 != (err_i = uv_fs_open(u3L, - &req_u->ruq_u, - pax_c, - O_CREAT | O_WRONLY, - 0600, - _foil_create_cb)) ) - { - _foil_fail("uv_fs_open", err_i); - } - } -} - -/* u3_foil_absorb(): open logfile, truncating to last good frame; blocking. -*/ -u3_foil* -u3_foil_absorb(u3_dire* dir_u, // directory - c3_c* nam_c) // filename -{ - u3_foil* fol_u; - uv_fs_t ruq_u; - c3_i err_i; - - /* open file and create wrapper - */ - { - c3_c* pax_c = _foil_path(dir_u, nam_c); - - if ( 0 > (err_i = uv_fs_open(u3L, - &ruq_u, - pax_c, - O_RDWR | O_CREAT, - 0600, - 0)) ) - { - _foil_fail(pax_c, err_i); - c3_free(pax_c); - return 0; - } - c3_free(pax_c); - - fol_u = c3_malloc(sizeof(*fol_u)); - fol_u->dir_u = dir_u; - fol_u->fil_u = ruq_u.result; - fol_u->nam_c = c3_malloc(1 + strlen(nam_c)); - strcpy(fol_u->nam_c, nam_c); - - uv_fs_req_cleanup(&ruq_u); - } - - /* measure file - */ - { - if ( 0 != (err_i = uv_fs_fstat(u3L, &ruq_u, fol_u->fil_u, 0)) ) { - _foil_fail("uv_fs_fstat", err_i); - return 0; - } - if ( 0 != (7 & ruq_u.statbuf.st_size) ) { - _foil_fail("logfile size corrupt", 0); - return 0; - } - fol_u->end_d = (ruq_u.statbuf.st_size >> 3ULL); - } - - /* XX: scan for good frame. - */ - return fol_u; -} - -/* u3_foil_delete(): delete a file; free descriptor. -*/ - struct _foil_delete_request { - uv_fs_t ruq_u; - void (*fun_f)(void*); - void* vod_p; - u3_foil* fol_u; - c3_c* pax_c; - }; - - static void - _foil_delete_cb(uv_fs_t* ruq_u) - { - struct _foil_delete_request* req_u = (void *)ruq_u; - - if ( req_u->fun_f ) { - req_u->fun_f(req_u->vod_p); - } - - c3_free(req_u->pax_c); - c3_free(req_u->fol_u->nam_c); - c3_free(req_u->fol_u); - - uv_fs_req_cleanup(ruq_u); - c3_free(req_u); - } -void -u3_foil_delete(void (*fun_f)(void*), // context pointer - void* vod_p, // context pointer - u3_foil* fol_u) // file to delete -{ - c3_i err_i; - c3_c* pax_c; - - /* construct full path - */ - pax_c = _foil_path(fol_u->dir_u, fol_u->nam_c); - - /* perform delete - */ - { - struct _foil_delete_request* req_u; - - req_u = c3_malloc(sizeof(*req_u)); - - req_u->fun_f = fun_f; - req_u->vod_p = vod_p; - req_u->fol_u = fol_u; - req_u->pax_c = pax_c; - - if ( 0 != (err_i = uv_fs_unlink(u3L, - &req_u->ruq_u, - pax_c, - _foil_delete_cb)) ) - { - _foil_fail("uv_fs_unlink", err_i); - } - } -} - -/* u3_foil_append(): write a frame at the end of a file, freeing the buffer. -*/ - struct _foil_append_request { - uv_fs_t ruq_u; - void (*fun_f)(void*); - void* vod_p; - u3_foil* fol_u; - c3_d* fam_d; - c3_d* buf_d; - }; - static void - _foil_append_cb_2(uv_fs_t* ruq_u) - { - struct _foil_append_request* req_u = (void*) ruq_u; - - req_u->fun_f(req_u->vod_p); - uv_fs_req_cleanup(ruq_u); - c3_free(req_u); - } - static void - _foil_append_cb_1(uv_fs_t* ruq_u) - { - struct _foil_append_request* req_u = (void*) ruq_u; - - uv_fs_req_cleanup(ruq_u); - c3_free(req_u->buf_d); - - uv_fs_fsync(u3L, &req_u->ruq_u, - req_u->fol_u->fil_u, - _foil_append_cb_2); - } -void -u3_foil_append(void (*fun_f)(void*), // context pointer - void* vod_p, // context pointer - u3_foil* fol_u, // file - c3_d* buf_d, // buffer to write from - c3_d len_d) // length in chubs -{ - c3_d pos_d = fol_u->end_d; - struct _foil_append_request* req_u; - c3_i err_i; - - /* set up request - */ - { - req_u = c3_malloc(sizeof(*req_u)); - req_u->fun_f = fun_f; - req_u->vod_p = vod_p; - req_u->fol_u = fol_u; - req_u->buf_d = buf_d; - req_u->fam_d = c3_malloc(16); - } - - /* framing - */ - { - c3_w top_w, bot_w; - - fol_u->end_d = pos_d + len_d + 2; - - /* XX: assumes "little-endian won", 32-bit frame length. - */ - top_w = u3r_mug_words((c3_w *)(void *) buf_d, (2 * len_d)); - bot_w = (req_u->fol_u->end_d & 0xffffffff); - bot_w = u3r_mug_words(&bot_w, 1); - - req_u->fam_d[0] = len_d; - req_u->fam_d[1] = ((c3_d)top_w) << 32ULL | ((c3_d) bot_w); - } - - /* do it - */ - { - uv_buf_t buf_u[2]; - - buf_u[0] = uv_buf_init((void *)buf_d, (len_d * 8)); - buf_u[1] = uv_buf_init((void *)req_u->fam_d, 16); - - if ( 0 != (err_i = uv_fs_write(u3L, - &req_u->ruq_u, - fol_u->fil_u, - buf_u, - 2, - (8ULL * pos_d), - _foil_append_cb_1)) ) - { - _foil_fail("uv_fs_write", err_i); - } - } -} - -/* u3_foil_reveal(): read the frame before a position, blocking. -*/ -c3_d* -u3_foil_reveal(u3_foil* fol_u, // file from - c3_d* sop_d, // end position/prev end - c3_d* len_d) // length return -{ - c3_d pos_d = *sop_d; - c3_d fam_d[2]; - c3_l mug_l; - uv_fs_t ruq_u; - c3_i err_i; - - c3_assert(pos_d >= 2); - c3_assert(pos_d <= fol_u->end_d); - - /* read frame data - */ - { - uv_buf_t buf_u = uv_buf_init((void *)fam_d, 16); - - fam_d[0] = fam_d[1] = 0; - if ( 0 > (err_i = uv_fs_read(u3L, - &ruq_u, - fol_u->fil_u, - &buf_u, 1, - (8ULL * (pos_d - 2ULL)), - 0)) ) - { - _foil_fail("uv_fs_read", err_i); - return 0; - } - uv_fs_req_cleanup(&ruq_u); - } - - /* validate frame - */ - { - c3_w top_w, bot_w; - c3_l chk_l; - - *len_d = fam_d[0]; - if ( *len_d > (pos_d - 2ULL) ) { - _foil_fail("corrupt frame a", 0); - return 0; - } - - top_w = fam_d[1] >> 32ULL; - mug_l = top_w; - - bot_w = fam_d[1] & 0xffffffff; - chk_l = (pos_d & 0xffffffff); - chk_l = u3r_mug_words(&chk_l, 1); - if ( bot_w != chk_l ) { - _foil_fail("corrupt frame b", 0); - } - } - - /* read frame - */ - { - c3_d* buf_d = c3_malloc(8 * *len_d); - uv_buf_t buf_u = uv_buf_init((void *)buf_d, 8 * *len_d); - c3_l gum_l; - - if ( 0 > (err_i = uv_fs_read(u3L, - &ruq_u, - fol_u->fil_u, - &buf_u, 1, - (8ULL * (pos_d - (*len_d + 2ULL))), - 0) ) ) - { - _foil_fail("uv_fs_read", err_i); - return 0; - } - uv_fs_req_cleanup(&ruq_u); - - gum_l = u3r_mug_words((c3_w *)(void *) buf_d, (2 * *len_d)); - if ( mug_l != gum_l ) { - _foil_fail("corrupt frame c", 0); - return 0; - } - *sop_d = (pos_d - (*len_d + 2ULL)); - return buf_d; - } -} - -/* u3_foil_invent(): create a new file with one frame, freeing buffer; sync. -*/ - struct _foil_invent_request { - uv_fs_t ruq_u; - void (*fun_f)(void*, u3_foil*); - u3_foil* fol_u; - void* vod_p; - c3_d* buf_d; - c3_d len_d; - c3_d num_d; -#if 0 - struct timeval bef_u; -#endif - }; - static void - _foil_invent_cb_2a(void* req_p) - { - struct _foil_invent_request* req_u = req_p; - - if ( 1 == req_u->num_d ) { -#if 0 - { - struct timeval aft_u, gap_u; - c3_w mls_w; - - gettimeofday(&aft_u, 0); - timersub(&aft_u, &req_u->bef_u, &gap_u); - mls_w = (gap_u.tv_sec * 1000) + (gap_u.tv_usec / 1000); - - u3l_log("invent ms: %d\r\n", mls_w); - } -#endif - req_u->fun_f(req_u->vod_p, req_u->fol_u); - _foil_close(req_u->fol_u->fil_u); - - c3_free(req_u); - } - else { - req_u->num_d++; - } - } - - static void - _foil_invent_cb_2b(uv_fs_t* ruq_u) - { - struct _foil_invent_request* req_u = (void *)ruq_u; - - uv_fs_req_cleanup(ruq_u); - _foil_invent_cb_2a(req_u); - } - - static void - _foil_invent_cb_1(void* req_p, - u3_foil* fol_u) - { - struct _foil_invent_request* req_u = req_p; - - req_u->fol_u = fol_u; - - /* fsync the parent directory, since we just created a file. - */ - uv_fs_fsync(u3L, &req_u->ruq_u, - req_u->fol_u->dir_u->fil_u, - _foil_invent_cb_2b); - - u3_foil_append(_foil_invent_cb_2a, - req_u, - fol_u, - req_u->buf_d, - req_u->len_d); - } -void -u3_foil_invent(void (*fun_f)(void*, // context pointer - u3_foil*), // new file - void* vod_p, // context pointer - u3_dire* dir_u, // directory - c3_c* nam_c, // filename - c3_d* buf_d, // buffer (to free) - c3_d len_d) // length -{ - struct _foil_invent_request* req_u; - - req_u = malloc(sizeof(*req_u)); - req_u->fun_f = fun_f; - req_u->fol_u = 0; - req_u->vod_p = vod_p; - req_u->buf_d = buf_d; - req_u->len_d = len_d; - req_u->num_d = 0; -#if 0 - gettimeofday(&req_u->bef_u, 0); -#endif - - u3_foil_create(_foil_invent_cb_1, req_u, dir_u, nam_c); -} diff --git a/pkg/urbit/vere/lmdb.c b/pkg/urbit/vere/lmdb.c new file mode 100644 index 000000000..419c82fc7 --- /dev/null +++ b/pkg/urbit/vere/lmdb.c @@ -0,0 +1,602 @@ +/* vere/lmdb.c +*/ + +#include "all.h" + +#include +#include + +#include "vere/vere.h" + +// Event log persistence for Urbit +// +// Persistence works by having an lmdb environment opened on the main +// thread. This environment is used to create read-only transactions +// synchronously when needed. +// +// But the majority of lmdb writes operate asynchronously in the uv worker +// pool. Since individual transactions are bound to threads, we perform all +// blocking writing on worker threads. +// +// We perform the very first metadata writes on the main thread because we +// can't do anything until they persist. + +/* u3_lmdb_init(): Opens up a log environment +** +** Precondition: log_path points to an already created directory +*/ +MDB_env* u3_lmdb_init(const char* log_path) +{ + MDB_env* env = 0; + c3_w ret_w = mdb_env_create(&env); + if (ret_w != 0) { + u3l_log("lmdb: init fail: %s\n", mdb_strerror(ret_w)); + return 0; + } + + // Our databases have up to three tables: META, EVENTS, and GRAINS. + ret_w = mdb_env_set_maxdbs(env, 3); + if (ret_w != 0) { + u3l_log("lmdb: failed to set number of databases: %s\n", mdb_strerror(ret_w)); + return 0; + } + + // TODO: Start with a gigabyte for the event log. + // + ret_w = mdb_env_set_mapsize(env, 1024 * 1024 * 1024); + if (ret_w != 0) { + u3l_log("lmdb: failed to set database size: %s\n", mdb_strerror(ret_w)); + return 0; + } + + ret_w = mdb_env_open(env, log_path, 0, 0664); + if (ret_w != 0) { + u3l_log("lmdb: failed to open event log: %s\n", mdb_strerror(ret_w)); + return 0; + } + + return env; +} + +/* u3_lmdb_shutdown(): Shuts down lmdb +*/ +void u3_lmdb_shutdown(MDB_env* env) +{ + mdb_env_close(env); +} + +/* _perform_put_on_database_raw(): Writes a key/value pair to a specific +** database as part of a transaction. +** +** The raw version doesn't take ownership of either key/value and performs no +** nock calculations, so it is safe to call from any thread. +*/ +static +c3_o _perform_put_on_database_raw(MDB_txn* transaction_u, + MDB_dbi database_u, + c3_w flags, + void* key, + size_t key_len, + void* value, + size_t value_len) { + MDB_val key_val, value_val; + + key_val.mv_size = key_len; + key_val.mv_data = key; + + value_val.mv_size = value_len; + value_val.mv_data = value; + + c3_w ret_w = mdb_put(transaction_u, database_u, &key_val, &value_val, flags); + if (ret_w != 0) { + u3l_log("lmdb: write failed: %s\n", mdb_strerror(ret_w)); + return c3n; + } + + return c3y; +} + +/* _perform_get_on_database_raw(): Reads a key/value pair to a specific +** database as part of a transaction. +*/ +static +c3_o _perform_get_on_database_raw(MDB_txn* transaction_u, + MDB_dbi database_u, + void* key, + size_t key_len, + MDB_val* value) { + MDB_val key_val; + key_val.mv_size = key_len; + key_val.mv_data = key; + + c3_w ret_w = mdb_get(transaction_u, database_u, &key_val, value); + if (ret_w != 0) { + u3l_log("lmdb: read failed: %s\n", mdb_strerror(ret_w)); + return c3n; + } + + return c3y; +} + +/* _perform_put_on_database_noun(): Writes a noun to the database. +** +** This requires access to the loom so it must only be run from the libuv +** thread. +*/ +static +c3_o _perform_put_on_database_noun(MDB_txn* transaction_u, + MDB_dbi database_u, + c3_c* key, + u3_noun noun) { + // jam noun into an atom representation + u3_atom mat = u3ke_jam(noun); + + // copy the jammed noun into a byte buffer we can hand to lmdb + c3_w len_w = u3r_met(3, mat); + c3_y* bytes_y = (c3_y*) malloc(len_w); + u3r_bytes(0, len_w, bytes_y, mat); + + c3_o ret = _perform_put_on_database_raw( + transaction_u, + database_u, + 0, + key, strlen(key), + bytes_y, len_w); + + free(bytes_y); + u3z(mat); + + return ret; +} + +/* _perform_get_on_database_noun(): Reads a noun from the database. +** +** This requires access to the loom so it must only be run from the libuv +** thread. +*/ +static +c3_o _perform_get_on_database_noun(MDB_txn* transaction_u, + MDB_dbi database_u, + c3_c* key, + u3_noun* noun) { + MDB_val value_val; + c3_o ret = _perform_get_on_database_raw(transaction_u, + database_u, + key, strlen(key), + &value_val); + if (ret == c3n) { + return c3y; + } + + // Take the bytes and cue them. + u3_atom raw_atom = u3i_bytes(value_val.mv_size, value_val.mv_data); + *noun = u3qe_cue(raw_atom); + return c3y; +} + +/* _write_request_data: callback struct for u3_lmdb_write_event() +*/ +struct _write_request_data { + // The database environment to write to. This object is thread-safe, though + // the transactions and handles opened from it are explicitly not. + MDB_env* environment; + + // The original event. Not to be accessed from the worker thread; only used + // in the callback executed on the main loop thread. + u3_writ* event; + + // The event number from event separated out so we can access it on the other + // thread. + c3_d event_number; + + // The event serialized out of the loom into a malloced structure accessible + // from the worker thread. + void* malloced_event_data; + + // The size of the malloced_event_data. We keep track of this for the + // database write. + size_t malloced_event_data_size; + + // Whether the write completed successfully. + c3_o success; + + // Called on main loop thread on completion. + void (*on_complete)(c3_o, u3_writ*); +}; + +/* _u3_lmdb_write_event_cb(): Implementation of u3_lmdb_write_event() +** +** This is always run on a libuv background worker thread; actual nouns cannot +** be touched here. +*/ +static void _u3_lmdb_write_event_cb(uv_work_t* req) { + struct _write_request_data* data = req->data; + + // Creates the write transaction. + MDB_txn* transaction_u; + c3_w ret_w = mdb_txn_begin(data->environment, + (MDB_txn *) NULL, + 0, /* flags */ + &transaction_u); + if (0 != ret_w) { + u3l_log("lmdb: txn_begin fail: %s\n", mdb_strerror(ret_w)); + return; + } + + // Opens the database as part of the transaction. + c3_w flags_w = MDB_CREATE | MDB_INTEGERKEY; + MDB_dbi database_u; + ret_w = mdb_dbi_open(transaction_u, + "EVENTS", + flags_w, + &database_u); + if (0 != ret_w) { + u3l_log("lmdb: dbi_open fail: %s\n", mdb_strerror(ret_w)); + return; + } + + // TODO: We need to detect the database being full, making the database + // maxsize larger, and then retrying this transaction. + // + c3_o success = _perform_put_on_database_raw( + transaction_u, + database_u, + MDB_NOOVERWRITE, + &(data->event_number), + sizeof(c3_d), + data->malloced_event_data, + data->malloced_event_data_size); + + ret_w = mdb_txn_commit(transaction_u); + if (0 != ret_w) { + u3l_log("lmdb: failed to commit event %" PRIu64 ": %s\n", + data->event_number, + mdb_strerror(ret_w)); + return; + } + + data->success = c3y; +} + +/* _u3_lmdb_write_event_after_cb(): Implementation of u3_lmdb_write_event() +** +** This is always run on the main loop thread after the worker thread event +** completes. +*/ +static void _u3_lmdb_write_event_after_cb(uv_work_t* req, int status) { + struct _write_request_data* data = req->data; + + data->on_complete(data->success, data->event); + + free(data->malloced_event_data); + free(data); + free(req); +} + +/* u3_lmdb_write_event(): Asynchronously writes events to the database. +** +** This writes all the passed in events along with log metadata updates to the +** database as a single transaction on a worker thread. Once the transaction +** is completed, it calls the passed in callback on the main loop thread. +** +** TODO: Make this take multiple events in one commit once we have this +** working one at a time. +*/ +void u3_lmdb_write_event(MDB_env* environment, + u3_writ* event_u, + void (*on_complete)(c3_o, u3_writ*)) +{ + // Serialize the jammed $work into a malloced buffer we can send to the other + // thread. + c3_w siz_w = u3r_met(3, event_u->mat); + c3_y* data_u = c3_calloc(siz_w); + u3r_bytes(0, siz_w, data_u, event_u->mat); + + // Structure to pass to the worker thread. + struct _write_request_data* data = c3_malloc(sizeof(struct _write_request_data)); + data->environment = environment; + data->event = event_u; + data->event_number = event_u->evt_d; + data->malloced_event_data = data_u; + data->malloced_event_data_size = siz_w; + data->on_complete = on_complete; + data->success = c3n; + + // Queue asynchronous work to happen on the other thread. + uv_work_t* req = c3_malloc(sizeof(uv_work_t)); + req->data = data; + + uv_queue_work(uv_default_loop(), + req, + _u3_lmdb_write_event_cb, + _u3_lmdb_write_event_after_cb); +} + +/* u3_lmdb_read_events(): Synchronously reads events from the database. +** +** Reads back up to |len_d| events starting with |first_event_d|. For +** each event, the event will be passed to |on_event_read| and further +** reading will be aborted if the callback returns c3n. +** +** Returns c3y on complete success; c3n on any error. +*/ +c3_o u3_lmdb_read_events(u3_pier* pir_u, + c3_d first_event_d, + c3_d len_d, + c3_o(*on_event_read)(u3_pier* pir_u, c3_d id, + u3_noun mat, u3_noun ovo)) +{ + // Creates the read transaction. + MDB_txn* transaction_u; + c3_w ret_w = mdb_txn_begin(pir_u->log_u->db_u, + //environment, + (MDB_txn *) NULL, + MDB_RDONLY, /* flags */ + &transaction_u); + if (0 != ret_w) { + u3l_log("lmdb: txn_begin fail: %s\n", mdb_strerror(ret_w)); + return c3n; + } + + // Opens the database as part of the transaction. + c3_w flags_w = MDB_CREATE | MDB_INTEGERKEY; + MDB_dbi database_u; + ret_w = mdb_dbi_open(transaction_u, + "EVENTS", + flags_w, + &database_u); + if (0 != ret_w) { + u3l_log("lmdb: dbi_open fail: %s\n", mdb_strerror(ret_w)); + return c3n; + } + + // Creates a cursor to iterate over keys starting at first_event_d. + MDB_cursor* cursor_u; + ret_w = mdb_cursor_open(transaction_u, database_u, &cursor_u); + if (0 != ret_w) { + u3l_log("lmdb: cursor_open fail: %s\n", mdb_strerror(ret_w)); + return c3n; + } + + // Sets the cursor to the position of first_event_d. + MDB_val key; + MDB_val val; + key.mv_size = sizeof(c3_d); + key.mv_data = &first_event_d; + + ret_w = mdb_cursor_get(cursor_u, &key, &val, MDB_SET_KEY); + if (0 != ret_w) { + u3l_log("lmdb: could not find initial event %" PRIu64 ": %s\r\n", + first_event_d, mdb_strerror(ret_w)); + mdb_cursor_close(cursor_u); + return c3n; + } + + // Load up to len_d events, iterating forward across the cursor. + for (c3_d loaded = 0; (ret_w != MDB_NOTFOUND) && (loaded < len_d); ++loaded) { + // As a sanity check, we make sure that there aren't any discontinuities in + // the sequence of loaded events. + c3_d current_id = first_event_d + loaded; + if (key.mv_size != sizeof(c3_d) || + *(c3_d*)key.mv_data != current_id) { + u3l_log("lmdb: invalid cursor key\r\n"); + return c3n; + } + + // Now build the atom version and then the cued version from the raw data + u3_noun mat = u3i_bytes(val.mv_size, val.mv_data); + u3_noun ovo = u3ke_cue(u3k(mat)); + + if (on_event_read(pir_u, current_id, mat, ovo) == c3n) { + u3z(ovo); + u3z(mat); + u3l_log("lmdb: aborting replay due to error.\r\n"); + return c3n; + } + + u3z(ovo); + u3z(mat); + + ret_w = mdb_cursor_get(cursor_u, &key, &val, MDB_NEXT); + if (ret_w != 0 && ret_w != MDB_NOTFOUND) { + u3l_log("lmdb: error while loading events: %s\r\n", + mdb_strerror(ret_w)); + return c3n; + } + } + + mdb_cursor_close(cursor_u); + + // Read-only transactions are aborted since we don't need to record the fact + // that we performed a read. + mdb_txn_abort(transaction_u); + + return c3y; +} + +/* u3_lmdb_get_latest_event_number(): Gets last event id persisted +** +** Reads the last key in order from the EVENTS table as the latest event +** number. On table empty, returns c3y but doesn't modify event_number. +*/ +c3_o u3_lmdb_get_latest_event_number(MDB_env* environment, c3_d* event_number) +{ + // Creates the read transaction. + MDB_txn* transaction_u; + c3_w ret_w = mdb_txn_begin(environment, + (MDB_txn *) NULL, + 0, /* flags */ + &transaction_u); + if (0 != ret_w) { + u3l_log("lmdb: txn_begin fail: %s\n", mdb_strerror(ret_w)); + return c3n; + } + + // Opens the database as part of the transaction. + c3_w flags_w = MDB_CREATE | MDB_INTEGERKEY; + MDB_dbi database_u; + ret_w = mdb_dbi_open(transaction_u, + "EVENTS", + flags_w, + &database_u); + if (0 != ret_w) { + u3l_log("lmdb: dbi_open fail: %s\n", mdb_strerror(ret_w)); + return c3n; + } + + // Creates a cursor to point to the last event + MDB_cursor* cursor_u; + ret_w = mdb_cursor_open(transaction_u, database_u, &cursor_u); + if (0 != ret_w) { + u3l_log("lmdb: cursor_open fail: %s\n", mdb_strerror(ret_w)); + return c3n; + } + + // Set the cursor at the end of the line. + MDB_val key; + MDB_val val; + ret_w = mdb_cursor_get(cursor_u, &key, &val, MDB_LAST); + if (MDB_NOTFOUND == ret_w) { + // Clean up, but don't error out. + mdb_cursor_close(cursor_u); + mdb_txn_abort(transaction_u); + return c3y; + } + + if (0 != ret_w) { + u3l_log("lmdb: could not find last event: %s\r\n", mdb_strerror(ret_w)); + mdb_cursor_close(cursor_u); + mdb_txn_abort(transaction_u); + return c3n; + } + + *event_number = *(c3_d*)key.mv_data; + + mdb_cursor_close(cursor_u); + + // Read-only transactions are aborted since we don't need to record the fact + // that we performed a read. + mdb_txn_abort(transaction_u); + + return c3y; +} + +/* u3_lmdb_write_identity(): Writes the event log identity information +** +** We have a secondary database (table) in this environment named META where we +** read/write identity information from/to. +*/ +c3_o u3_lmdb_write_identity(MDB_env* environment, + u3_noun who, + u3_noun is_fake, + u3_noun life) +{ + // Creates the write transaction. + MDB_txn* transaction_u; + c3_w ret_w = mdb_txn_begin(environment, + (MDB_txn *) NULL, + 0, /* flags */ + &transaction_u); + if (0 != ret_w) { + u3l_log("lmdb: txn_begin fail: %s\n", mdb_strerror(ret_w)); + return c3n; + } + + // Opens the database as part of the transaction. + c3_w flags_w = MDB_CREATE; + MDB_dbi database_u; + ret_w = mdb_dbi_open(transaction_u, + "META", + flags_w, + &database_u); + if (0 != ret_w) { + u3l_log("lmdb: dbi_open fail: %s\n", mdb_strerror(ret_w)); + mdb_txn_abort(transaction_u); + return c3n; + } + + c3_o ret; + ret = _perform_put_on_database_noun(transaction_u, database_u, "who", who); + if (ret == c3n) { + mdb_txn_abort(transaction_u); + return c3n; + } + + ret = _perform_put_on_database_noun(transaction_u, database_u, "is-fake", + is_fake); + if (ret == c3n) { + mdb_txn_abort(transaction_u); + return c3n; + } + + ret = _perform_put_on_database_noun(transaction_u, database_u, "life", life); + if (ret == c3n) { + mdb_txn_abort(transaction_u); + return c3n; + } + + ret_w = mdb_txn_commit(transaction_u); + if (0 != ret_w) { + u3l_log("lmdb: failed to commit transaction: %s\n", mdb_strerror(ret_w)); + return c3n; + } + + return c3y; +} + + +/* u3_lmdb_read_identity(): Reads the event log identity information. +*/ +c3_o u3_lmdb_read_identity(MDB_env* environment, + u3_noun* who, + u3_noun* is_fake, + u3_noun* life) { + // Creates the write transaction. + MDB_txn* transaction_u; + c3_w ret_w = mdb_txn_begin(environment, + (MDB_txn *) NULL, + MDB_RDONLY, /* flags */ + &transaction_u); + if (0 != ret_w) { + u3l_log("lmdb: txn_begin fail: %s\n", mdb_strerror(ret_w)); + return c3n; + } + + // Opens the database as part of the transaction. + MDB_dbi database_u; + ret_w = mdb_dbi_open(transaction_u, + "META", + 0, + &database_u); + if (0 != ret_w) { + u3l_log("lmdb: dbi_open fail: %s\n", mdb_strerror(ret_w)); + mdb_txn_abort(transaction_u); + return c3n; + } + + c3_o ret; + ret = _perform_get_on_database_noun(transaction_u, database_u, "who", who); + if (ret == c3n) { + mdb_txn_abort(transaction_u); + return c3n; + } + + ret = _perform_get_on_database_noun(transaction_u, database_u, "is-fake", + is_fake); + if (ret == c3n) { + mdb_txn_abort(transaction_u); + return c3n; + } + + ret = _perform_get_on_database_noun(transaction_u, database_u, "life", life); + if (ret == c3n) { + mdb_txn_abort(transaction_u); + return c3n; + } + + // Read-only transactions are aborted since we don't need to record the fact + // that we performed a read. + mdb_txn_abort(transaction_u); + + return c3y; +} diff --git a/pkg/urbit/vere/pier.c b/pkg/urbit/vere/pier.c index 1c2b75372..dd9c50115 100644 --- a/pkg/urbit/vere/pier.c +++ b/pkg/urbit/vere/pier.c @@ -70,34 +70,37 @@ static void _pier_boot_ready(u3_pier* pir_u); static void _pier_boot_set_ship(u3_pier* pir_u, u3_noun who, u3_noun fak); static void _pier_exit_done(u3_pier* pir_u); -/* _pier_disk_bail(): bail from disk i/o. +/* _pier_db_bail(): bail from disk i/o. */ static void -_pier_disk_bail(void* vod_p, const c3_c* err_c) +_pier_db_bail(void* vod_p, const c3_c* err_c) { - // u3_writ* wit_u = vod_p; - u3l_log("disk error: %s\r\n", err_c); } -/* _pier_disk_shutdown(): close the log. +/* _pier_db_shutdown(): close the log. */ static void -_pier_disk_shutdown(u3_pier* pir_u) +_pier_db_shutdown(u3_pier* pir_u) { + u3_lmdb_shutdown(pir_u->log_u->db_u); } -/* _pier_disk_commit_complete(): commit complete. -*/ +/* _pier_db_commit_complete(): commit complete. + */ static void -_pier_disk_commit_complete(void* vod_p) +_pier_db_commit_complete(c3_o success, u3_writ* wit_u) { - u3_writ* wit_u = vod_p; u3_pier* pir_u = wit_u->pir_u; u3_disk* log_u = pir_u->log_u; + if (success == c3n) { + u3l_log("Failed to persist event. Exiting to prevent corruption."); + u3_pier_bail(); + } + #ifdef VERBOSE_EVENTS - u3l_log("pier: (%" PRIu64 "): commit: complete\r\n", wit_u->evt_d); + u3l_log("pier: (%" PRIu64 "): db commit completed\r\n", wit_u->evt_d); #endif /* advance commit counter @@ -111,10 +114,10 @@ _pier_disk_commit_complete(void* vod_p) _pier_apply(pir_u); } -/* _pier_disk_commit_request(): start commit. +/* _pier_db_commit_request(): start commit. */ static void -_pier_disk_commit_request(u3_writ* wit_u) +_pier_db_commit_request(u3_writ* wit_u) { u3_pier* pir_u = wit_u->pir_u; u3_disk* log_u = pir_u->log_u; @@ -123,18 +126,12 @@ _pier_disk_commit_request(u3_writ* wit_u) u3l_log("pier: (%" PRIu64 "): commit: request\r\n", wit_u->evt_d); #endif - /* append to logfile + /* put it in the database */ { - c3_d len_d = u3r_met(6, wit_u->mat); - c3_d* buf_d = c3_malloc(8 * len_d); - - u3r_chubs(0, len_d, buf_d, wit_u->mat); - u3_foil_append(_pier_disk_commit_complete, - wit_u, - log_u->fol_u, - buf_d, - len_d); + u3_lmdb_write_event(log_u->db_u, + wit_u, + _pier_db_commit_complete); } /* advance commit-request counter @@ -145,312 +142,144 @@ _pier_disk_commit_request(u3_writ* wit_u) } } -/* _pier_disk_write_header_complete(): commit complete. -*/ + static void -_pier_disk_write_header_complete(void* vod_p) +_pier_db_write_header(u3_pier* pir_u, + u3_noun who, + u3_noun is_fake, + u3_noun life) { - // no-op, callback required by u3_foil_append() - // + c3_o ret = u3_lmdb_write_identity(pir_u->log_u->db_u, + who, is_fake, life); + if (ret == c3n) { + u3_pier_bail(); + } } -/* _pier_disk_write_header(): save boot metadata. -*/ +/* _pier_db_read_header(): reads the ships metadata from lmdb + */ static void -_pier_disk_write_header(u3_pier* pir_u, u3_atom mat) +_pier_db_read_header(u3_pier* pir_u) { - // XX deduplicate with _pier_disk_commit_request - // - u3_disk* log_u = pir_u->log_u; - - c3_assert( 0ULL == log_u->fol_u->end_d ); - - c3_d len_d = u3r_met(6, mat); - c3_d* buf_d = c3_malloc(8 * len_d); - - u3r_chubs(0, len_d, buf_d, mat); - - u3_foil_append(_pier_disk_write_header_complete, - (void*)0, - log_u->fol_u, - buf_d, - len_d); -} - -/* _pier_disk_read_header_complete(): -*/ -static void -_pier_disk_read_header_complete(u3_disk* log_u, u3_noun dat) -{ - u3_pier* pir_u = log_u->pir_u; - - { - u3_noun who, fak, len; - - u3x_trel(dat, &who, &fak, &len); - - c3_assert( c3y == u3ud(who) ); - c3_assert( 1 >= u3r_met(7, who) ); - c3_assert( c3y == u3ud(fak) ); - c3_assert( 1 >= u3r_met(0, fak) ); - c3_assert( c3y == u3ud(len) ); - c3_assert( 1 >= u3r_met(3, len) ); - - _pier_boot_set_ship(pir_u, u3k(who), u3k(fak)); - - pir_u->lif_d = u3r_chub(0, len); + u3_noun who, is_fake, life; + c3_o ret = u3_lmdb_read_identity(pir_u->log_u->db_u, + &who, &is_fake, &life); + if (ret == c3n) { + u3l_log("Failed to load identity. Exiting..."); + u3_pier_bail(); } - u3z(dat); + _pier_boot_set_ship(pir_u, u3k(who), u3k(is_fake)); + pir_u->lif_d = u3r_chub(0, life); + + u3z(who); + u3z(is_fake); + u3z(life); } -/* _pier_disk_read_header(): -*/ -static void -_pier_disk_read_header(u3_disk* log_u) +static c3_o +_pier_db_on_commit_loaded(u3_pier* pir_u, + c3_d id, + u3_noun mat, + u3_noun ovo) { - // XX disabled - // - // This is very, very slow. - // The one situation in which we currently *need* it - - // full log replay - it's unnecessary thanks to the current - // _pier_disk_load_commit. - // In all other situations, we're covered by - // _pier_work_play or u3_pier_boot. - // In the long run, it seems best to always get identity - // from the log for restart/replay. - // -#if 0 - c3_assert( 0 != log_u->fol_u ); + u3_noun evt = u3h(u3t(ovo)); + u3_noun job = u3k(u3t(u3t(u3t(ovo)))); + c3_d evt_d = u3r_chub(0, evt); - c3_d pos_d = log_u->fol_u->end_d; - c3_o got_o = c3n; + if (evt_d != id) { + _pier_db_bail(0, "pier: load: commit: event order"); + return c3n; + } - // XX requires that writs be unlinked before effects are released - // - if ( (0 == pir_u->ent_u) && - (wit_u->evt_d < log_u->com_d) ) - { - _pier_disk_load_commit(pir_u, (1ULL + god_u->dun_d), 1000ULL); - } - } + // Need to grab references to the nouns above. + u3_writ* wit_u = c3_calloc(sizeof(u3_writ)); + wit_u->pir_u = pir_u; + wit_u->evt_d = evt_d; + wit_u->job = u3k(job); + wit_u->mat = u3k(mat); + + // Insert at queue front since we're loading events in order + if ( !pir_u->ent_u ) { + c3_assert(!pir_u->ext_u); + + pir_u->ent_u = pir_u->ext_u = wit_u; } else { -#ifdef VERBOSE_EVENTS - u3l_log("pier: (%" PRIu64 "): compute: release\r\n", wit_u->evt_d); -#endif - - while ( pos_d ) { - c3_d len_d, evt_d; - c3_d* buf_d; - u3_noun mat, ovo, job, evt; - - buf_d = u3_foil_reveal(log_u->fol_u, &pos_d, &len_d); - - if ( !buf_d ) { - _pier_disk_bail(0, "corrupt header"); - return; + if ( wit_u->evt_d != (1ULL + pir_u->ent_u->evt_d) ) { + fprintf(stderr, "pier: load: commit: event gap: %" PRIx64 ", %" + PRIx64 "\r\n", + wit_u->evt_d, + pir_u->ent_u->evt_d); + _pier_db_bail(0, "pier: load: comit: event gap"); + return c3n; } - if ( 0ULL == pos_d) { - u3_noun mat = u3i_chubs(len_d, buf_d); - u3_noun ovo = u3ke_cue(u3k(mat)); - - c3_assert( c3__boot == u3h(ovo) ); - - _pier_disk_read_header_complete(log_u, u3k(u3t(ovo))); - - u3z(ovo); u3z(mat); - } - - c3_free(buf_d); + pir_u->ent_u->nex_u = wit_u; + pir_u->ent_u = wit_u; } -#endif + + return c3y; } -/* _pier_disk_load_commit(): load len_d commits >= lav_d; enqueue for replay +/* _pier_db_load_commit(): load len_d commits >= lav_d; enqueue for replay */ static void -_pier_disk_load_commit(u3_pier* pir_u, - c3_d lav_d, - c3_d len_d) +_pier_db_load_commits(u3_pier* pir_u, + c3_d lav_d, + c3_d len_d) { - u3_disk* log_u = pir_u->log_u; - - c3_d max_d = lav_d + len_d; - c3_d pos_d = log_u->fol_u->end_d; - c3_d old_d = 0; - - c3_assert ( 0 != log_u->fol_u ); - -#ifdef VERBOSE_EVENTS - fprintf(stderr, "pier: load: commit: at %" PRIx64 "\r\n", pos_d); -#endif - - while ( pos_d ) { - c3_d len_d, evt_d; - c3_d* buf_d; - u3_noun mat, ovo, job, evt; - - buf_d = u3_foil_reveal(log_u->fol_u, &pos_d, &len_d); - - if ( !buf_d ) { - _pier_disk_bail(0, "pier: load: commit: corrupt"); - return; + if (lav_d == 1) { + // We are restarting from event 1. That means we need to set the ship from + // the log identity information. + u3_noun who, fak, len; + c3_o ret = u3_lmdb_read_identity(pir_u->log_u->db_u, + &who, + &fak, + &len); + if (ret == c3n) { + u3l_log("Failed to load identity for replay. Exiting..."); + u3_pier_bail(); } - mat = u3i_chubs(len_d, buf_d); - c3_free(buf_d); + _pier_boot_set_ship(pir_u, u3k(who), u3k(fak)); + pir_u->lif_d = u3r_chub(0, len); - ovo = u3ke_cue(u3k(mat)); + u3z(who); + u3z(fak); + u3z(len); + } - // reached header - // - if ( 0ULL == pos_d ) { - c3_assert( 1ULL == lav_d ); - c3_assert( c3__boot == u3h(ovo) ); - - _pier_disk_read_header_complete(log_u, u3k(u3t(ovo))); - - u3z(ovo); u3z(mat); - break; - } - - c3_assert(c3__work == u3h(ovo)); - evt = u3h(u3t(ovo)); - job = u3k(u3t(u3t(u3t(ovo)))); - evt_d = u3r_chub(0, evt); - u3z(ovo); - - // confirm event order - // - if ( (0 != old_d) && - ((old_d - 1ULL) != evt_d) ) { - _pier_disk_bail(0, "pier: load: commit: event order"); - return; - } - else { - old_d = evt_d; - } - - // done: read past the first event requested - // - if ( evt_d < lav_d ) { - u3z(mat); - u3z(job); - return; - } - // skip: haven't reached the last event requested - // - else if ( evt_d > max_d ) { - u3z(mat); - u3z(job); - continue; - } - // enqueue requested event - // - else { - u3_writ* wit_u = c3_calloc(sizeof(u3_writ)); - -#ifdef VERBOSE_EVENTS - fprintf(stderr, "pier: load: commit: %" PRIu64 "\r\n", evt_d); -#endif - - wit_u->pir_u = pir_u; - wit_u->evt_d = evt_d; - wit_u->job = job; - wit_u->mat = mat; - - /* insert at queue exit -- the oldest events run first - */ - if ( !pir_u->ent_u && !pir_u->ext_u ) { - pir_u->ent_u = pir_u->ext_u = wit_u; - } - else { - if ( (1ULL + wit_u->evt_d) != pir_u->ext_u->evt_d ) { - fprintf(stderr, "pier: load: commit: event gap: %" PRIx64 ", %" - PRIx64 "\r\n", - wit_u->evt_d, - pir_u->ext_u->evt_d); - u3z(mat); - u3z(job); - _pier_disk_bail(0, "pier: load: comit: event gap"); - return; - } - - wit_u->nex_u = pir_u->ext_u; - pir_u->ext_u = wit_u; - } - } + c3_o ret = u3_lmdb_read_events(pir_u, + lav_d, + len_d, + _pier_db_on_commit_loaded); + if (ret == c3n) { + u3l_log("Failed to read event log for replay. Exiting..."); + u3_pier_bail(); } } -/* _pier_disk_init_complete(): -*/ -static void -_pier_disk_init_complete(u3_disk* log_u, c3_d evt_d) -{ - c3_assert( c3n == log_u->liv_o ); - log_u->liv_o = c3y; - - log_u->com_d = log_u->moc_d = evt_d; - - // restore pier identity (XX currently a no-op, see comment) - // - _pier_disk_read_header(log_u); - - _pier_boot_ready(log_u->pir_u); -} - -/* _pier_disk_init(): +/* _pier_db_init(): */ static c3_o -_pier_disk_init(u3_disk* log_u) +_pier_db_init(u3_disk* log_u) { c3_d evt_d = 0; c3_d pos_d = 0; c3_assert( c3n == log_u->liv_o ); - log_u->fol_u = u3_foil_absorb(log_u->com_u, "commit.urbit-log"); - - if ( !log_u->fol_u ) { + // Request from the database the last event + if ( c3n == u3_lmdb_get_latest_event_number(log_u->db_u, &evt_d) ) { + u3l_log("disk init from lmdb failed."); return c3n; } - // use the last event in the log to set the commit point. - // - if ( 0 != (pos_d = log_u->fol_u->end_d) ) { - c3_d len_d = 0; + log_u->liv_o = c3y; + log_u->com_d = log_u->moc_d = evt_d; - c3_d* buf_d = u3_foil_reveal(log_u->fol_u, &pos_d, &len_d); - - if ( !buf_d ) { - fprintf(stderr, "pier: load: commit: corrupt\r\n"); - return c3n; - } - - { - u3_noun mat = u3i_chubs(len_d, buf_d); - u3_noun ovo = u3ke_cue(u3k(mat)); - - c3_assert(c3__work == u3h(ovo)); - - u3_noun evt = u3h(u3t(ovo)); - - evt_d = u3r_chub(0, evt); - - u3z(mat); u3z(ovo); u3z(evt); - } - -#ifdef VERBOSE_EVENTS - fprintf(stderr, "pier: load: last %" PRIu64 "\r\n", evt_d); -#endif - - c3_free(buf_d); - } - - _pier_disk_init_complete(log_u, evt_d); + _pier_boot_ready(log_u->pir_u); return c3y; } @@ -500,10 +329,18 @@ _pier_disk_create(u3_pier* pir_u) strcpy(log_c, pir_u->pax_c); strcat(log_c, "/.urb/log"); + // Creates the folder if ( 0 == (log_u->com_u = u3_foil_folder(log_c)) ) { c3_free(log_c); return c3n; } + + // Inits the database + if ( 0 == (log_u->db_u = u3_lmdb_init(log_c)) ) { + c3_free(log_c); + return c3n; + } + c3_free(log_c); } @@ -526,7 +363,7 @@ _pier_disk_create(u3_pier* pir_u) // create/load event log // - if ( c3n == _pier_disk_init(log_u) ) { + if ( c3n == _pier_db_init(log_u) ) { return c3n; } @@ -637,7 +474,7 @@ _pier_work_bail(void* vod_p, fprintf(stderr, "pier: work error: %s\r\n", err_c); } -/* _pier_work_boot(): prepare serf boot. +/* _pier_work_boot(): prepare for boot. */ static void _pier_work_boot(u3_pier* pir_u, c3_o sav_o) @@ -648,13 +485,13 @@ _pier_work_boot(u3_pier* pir_u, c3_o sav_o) u3_noun who = u3i_chubs(2, pir_u->who_d); u3_noun len = u3i_chubs(1, &pir_u->lif_d); - u3_noun msg = u3nq(c3__boot, who, pir_u->fak_o, len); - u3_atom mat = u3ke_jam(msg); if ( c3y == sav_o ) { - _pier_disk_write_header(pir_u, u3k(mat)); + _pier_db_write_header(pir_u, who, u3k(pir_u->fak_o), len); } + u3_noun msg = u3nq(c3__boot, who, pir_u->fak_o, len); + u3_atom mat = u3ke_jam(msg); u3_newt_write(&god_u->inn_u, mat, 0); } @@ -748,7 +585,7 @@ _pier_work_release(u3_writ* wit_u) if ( (0 == pir_u->ent_u) && (wit_u->evt_d < log_u->com_d) ) { - _pier_disk_load_commit(pir_u, (1ULL + god_u->dun_d), 1000ULL); + _pier_db_load_commits(pir_u, (1ULL + god_u->dun_d), 1000ULL); } } } @@ -896,7 +733,7 @@ _pier_work_play(u3_pier* pir_u, c3_assert( c3n == god_u->liv_o ); god_u->liv_o = c3y; - // all events in the serf are complete + // all events in the worker are complete // god_u->rel_d = god_u->dun_d = god_u->sen_d = (lav_d - 1ULL); @@ -975,7 +812,7 @@ _pier_work_exit(uv_process_t* req_u, u3l_log("pier: exit: status %" PRIu64 ", signal %d\r\n", sas_i, sig_i); uv_close((uv_handle_t*) req_u, 0); - _pier_disk_shutdown(pir_u); + _pier_db_shutdown(pir_u); _pier_work_shutdown(pir_u); } @@ -1108,6 +945,7 @@ _pier_work_poke(void* vod_p, c3_d evt_d = u3r_chub(0, p_jar); u3_writ* wit_u = _pier_writ_find(pir_u, evt_d); + // Unlike slog, we always reprint interpreter errors during replay. _pier_work_stdr(wit_u, q_jar); } break; @@ -1123,11 +961,19 @@ _pier_work_poke(void* vod_p, goto error; } else { + // XXX: The wit_u pointer will almost always be 0 because of how the + // worker process manages the difference between u3V.evt_d vs + // u3A->ent_d. Either stop communicating the evt_d in the wire protocol + // or fix the worker to keep track of and communicate the correct event + // number. c3_d evt_d = u3r_chub(0, p_jar); c3_w pri_w = u3r_word(0, q_jar); u3_writ* wit_u = _pier_writ_find(pir_u, evt_d); - _pier_work_slog(wit_u, pri_w, u3k(r_jar)); + // Only print this slog if the event is uncommitted. + if ( u3_psat_pace != pir_u->sat_e ) { + _pier_work_slog(wit_u, pri_w, u3k(r_jar)); + } } break; } @@ -1667,7 +1513,7 @@ _pier_boot_ready(u3_pier* pir_u) _pier_boot_vent(pir_u->bot_u); _pier_boot_dispose(pir_u->bot_u); - // prepare serf for boot sequence, write log header + // prepare worker for boot sequence, write log header // _pier_work_boot(pir_u, c3y); @@ -1690,13 +1536,13 @@ _pier_boot_ready(u3_pier* pir_u) // begin queuing batches of committed events // - _pier_disk_load_commit(pir_u, (1ULL + god_u->dun_d), 1000ULL); + _pier_db_load_commits(pir_u, (1ULL + god_u->dun_d), 1000ULL); if ( 0 == god_u->dun_d ) { fprintf(stderr, "pier: replaying events 1 through %" PRIu64 "\r\n", log_u->com_d); - // prepare serf for replay of boot sequence, don't write log header + // prepare worker for replay of boot sequence, don't write log header // _pier_work_boot(pir_u, c3n); } @@ -1770,7 +1616,8 @@ start: (wit_u->evt_d == (1 + log_u->moc_d)) && (wit_u->evt_d == (1 + log_u->com_d)) ) { - _pier_disk_commit_request(wit_u); + // TODO(erg): This is the place where we build up things into a queue. + _pier_db_commit_request(wit_u); act_o = c3y; } @@ -1877,6 +1724,7 @@ _pier_exit_done(u3_pier* pir_u) { u3l_log("pier: exit\r\n"); + _pier_db_shutdown(pir_u); _pier_work_shutdown(pir_u); _pier_loop_exit(pir_u); @@ -1917,7 +1765,8 @@ u3_pier_snap(u3_pier* pir_u) // save eagerly if all computed events are already committed // - if ( log_u->com_d >= top_d ) { + if ( (log_u->com_d >= top_d) && + (god_u->dun_d == top_d) ) { _pier_work_save(pir_u); } }