mirror of
https://github.com/ilyakooo0/urbit.git
synced 2025-01-07 07:30:23 +03:00
Merge branch 'vere_build_warnings' of https://github.com/BernardoDeLaPlaz/urbit into vere_build_warnings
This commit is contained in:
commit
e9a2784982
@ -10,7 +10,7 @@ let
|
|||||||
|
|
||||||
libs =
|
libs =
|
||||||
with pkgs;
|
with pkgs;
|
||||||
[ openssl zlib curl gmp scrypt libsigsegv ncurses openssl zlib ];
|
[ openssl zlib curl gmp scrypt libsigsegv ncurses openssl zlib lmdb ];
|
||||||
|
|
||||||
osx =
|
osx =
|
||||||
with pkgs;
|
with pkgs;
|
||||||
|
@ -11,7 +11,7 @@ let
|
|||||||
|
|
||||||
deps =
|
deps =
|
||||||
with pkgs;
|
with pkgs;
|
||||||
[ curl gmp libsigsegv ncurses openssl zlib ];
|
[ curl gmp libsigsegv ncurses openssl zlib lmdb ];
|
||||||
|
|
||||||
vendor =
|
vendor =
|
||||||
[ argon2 softfloat3 ed25519 ent h2o scrypt uv murmur3 secp256k1 sni ];
|
[ argon2 softfloat3 ed25519 ent h2o scrypt uv murmur3 secp256k1 sni ];
|
||||||
|
@ -6,7 +6,7 @@ let
|
|||||||
|
|
||||||
crossdeps =
|
crossdeps =
|
||||||
with env;
|
with env;
|
||||||
[ curl libgmp libsigsegv ncurses openssl zlib ];
|
[ curl libgmp libsigsegv ncurses openssl zlib lmdb ];
|
||||||
|
|
||||||
vendor =
|
vendor =
|
||||||
with deps;
|
with deps;
|
||||||
|
2
pkg/urbit/configure
vendored
2
pkg/urbit/configure
vendored
@ -6,7 +6,7 @@ URBIT_VERSION=0.8.0
|
|||||||
|
|
||||||
deps=" \
|
deps=" \
|
||||||
curl gmp sigsegv argon2 ed25519 ent h2o scrypt sni uv murmur3 secp256k1 \
|
curl gmp sigsegv argon2 ed25519 ent h2o scrypt sni uv murmur3 secp256k1 \
|
||||||
softfloat3 ncurses ssl crypto z \
|
softfloat3 ncurses ssl crypto z lmdb \
|
||||||
"
|
"
|
||||||
|
|
||||||
echo '#pragma once' >include/config.h
|
echo '#pragma once' >include/config.h
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "h2o.h"
|
#include "h2o.h"
|
||||||
|
#include <lmdb.h>
|
||||||
|
|
||||||
/** Quasi-tunable parameters.
|
/** Quasi-tunable parameters.
|
||||||
**/
|
**/
|
||||||
@ -306,15 +307,6 @@
|
|||||||
struct _u3_moor* nex_u;
|
struct _u3_moor* nex_u;
|
||||||
} u3_moor;
|
} u3_moor;
|
||||||
|
|
||||||
/* u3_foil: abstract chub-addressed file.
|
|
||||||
*/
|
|
||||||
typedef struct _u3_foil {
|
|
||||||
uv_file fil_u; // libuv file handle
|
|
||||||
struct _u3_dire* dir_u; // parent directory
|
|
||||||
c3_c* nam_c; // name within parent
|
|
||||||
c3_d end_d; // end of file
|
|
||||||
} u3_foil;
|
|
||||||
|
|
||||||
/* u3_dent: directory entry.
|
/* u3_dent: directory entry.
|
||||||
*/
|
*/
|
||||||
typedef struct _u3_dent {
|
typedef struct _u3_dent {
|
||||||
@ -646,9 +638,9 @@
|
|||||||
u3_dire* dir_u; // main pier directory
|
u3_dire* dir_u; // main pier directory
|
||||||
u3_dire* urb_u; // urbit system data
|
u3_dire* urb_u; // urbit system data
|
||||||
u3_dire* com_u; // log directory
|
u3_dire* com_u; // log directory
|
||||||
u3_foil* fol_u; // logfile
|
|
||||||
c3_o liv_o; // live
|
c3_o liv_o; // live
|
||||||
c3_d end_d; // byte end of file
|
c3_d end_d; // byte end of file
|
||||||
|
MDB_env* db_u; // lmdb environment.
|
||||||
c3_d moc_d; // commit requested
|
c3_d moc_d; // commit requested
|
||||||
c3_d com_d; // committed
|
c3_d com_d; // committed
|
||||||
struct _u3_pier* pir_u; // pier backpointer
|
struct _u3_pier* pir_u; // pier backpointer
|
||||||
@ -862,69 +854,6 @@
|
|||||||
u3_dire*
|
u3_dire*
|
||||||
u3_foil_folder(const c3_c* pax_c); // directory object, or 0
|
u3_foil_folder(const c3_c* pax_c); // directory object, or 0
|
||||||
|
|
||||||
/* u3_foil_create(): create a new, empty file, not syncing.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
u3_foil_create(void (*fun_f)(void*, // context pointer
|
|
||||||
u3_foil*),// file object
|
|
||||||
void* vod_p, // context pointer
|
|
||||||
u3_dire* dir_u, // directory
|
|
||||||
const c3_c* nam_c); // name of new file
|
|
||||||
|
|
||||||
/* u3_foil_absorb(): absorb logfile, truncating to last good frame; block.
|
|
||||||
*/
|
|
||||||
u3_foil*
|
|
||||||
u3_foil_absorb(u3_dire* dir_u, // directory
|
|
||||||
c3_c* nam_c); // filename
|
|
||||||
|
|
||||||
/* u3_foil_delete(): delete a file; free descriptor.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
u3_foil_delete(void (*fun_f)(void*), // context pointer
|
|
||||||
void* vod_p, // context pointer
|
|
||||||
u3_foil* fol_u); // file to delete
|
|
||||||
|
|
||||||
/* u3_foil_append(): write a frame at the end of a file, freeing buffer.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
u3_foil_append(void (*fun_f)(void*), // context pointer
|
|
||||||
void* vod_p, // context pointer
|
|
||||||
u3_foil* fol_u, // file
|
|
||||||
c3_d* buf_d, // buffer to write from
|
|
||||||
c3_d len_d); // length in chubs
|
|
||||||
|
|
||||||
/* u3_foil_reveal(): read the frame before a position, blocking.
|
|
||||||
*/
|
|
||||||
c3_d*
|
|
||||||
u3_foil_reveal(u3_foil* fol_u, // file from
|
|
||||||
c3_d* pos_d, // end position/prev end
|
|
||||||
c3_d* len_d); // length return
|
|
||||||
|
|
||||||
/* u3_foil_commit(): reveal from one file, append to another.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
u3_foil_commit(void (*fun_f)(void*, // context pointer
|
|
||||||
u3_foil*, // file from
|
|
||||||
c3_d, // previous from
|
|
||||||
u3_foil*, // file to
|
|
||||||
c3_d), // end of to
|
|
||||||
void* vod_p, // context pointer
|
|
||||||
u3_foil* del_u, // file from
|
|
||||||
c3_d del_d, // end of from frame
|
|
||||||
u3_foil* unt_u, // file to
|
|
||||||
c3_d unt_d); // end of to frame
|
|
||||||
|
|
||||||
/* u3_foil_invent(): make new file with one frame; free buffer, sync.
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
u3_foil_invent(void (*fun_f)(void*, // context pointer
|
|
||||||
u3_foil*), // new file
|
|
||||||
void* vod_p, // context pointer
|
|
||||||
u3_dire* dir_u, // directory
|
|
||||||
c3_c* nam_c, // filename
|
|
||||||
c3_d* buf_d, // buffer (to free)
|
|
||||||
c3_d len_d); // length
|
|
||||||
|
|
||||||
/** Output.
|
/** Output.
|
||||||
**/
|
**/
|
||||||
/* u3_reck_kick(): handle effect.
|
/* u3_reck_kick(): handle effect.
|
||||||
@ -1344,5 +1273,63 @@
|
|||||||
void
|
void
|
||||||
u3_daemon_grab(void* vod_p);
|
u3_daemon_grab(void* vod_p);
|
||||||
|
|
||||||
|
|
||||||
c3_w
|
c3_w
|
||||||
u3_readdir_r(DIR *dirp, struct dirent *entry, struct dirent **result);
|
u3_readdir_r(DIR *dirp, struct dirent *entry, struct dirent **result);
|
||||||
|
|
||||||
|
/* Database
|
||||||
|
*/
|
||||||
|
/* u3_lmdb_init(): Initializes lmdb inside log_path
|
||||||
|
*/
|
||||||
|
MDB_env* u3_lmdb_init(const char* log_path);
|
||||||
|
|
||||||
|
/* u3_lmdb_shutdown(): Shuts down the entire logging system
|
||||||
|
*/
|
||||||
|
void u3_lmdb_shutdown(MDB_env* env);
|
||||||
|
|
||||||
|
/* u3_lmdb_get_latest_event_number(): Gets last event id persisted
|
||||||
|
*/
|
||||||
|
c3_o u3_lmdb_get_latest_event_number(MDB_env* environment,
|
||||||
|
c3_d* event_number);
|
||||||
|
|
||||||
|
/* u3_lmdb_write_event(): Persists an event to the database
|
||||||
|
*/
|
||||||
|
void u3_lmdb_write_event(MDB_env* environment,
|
||||||
|
u3_writ* event_u,
|
||||||
|
void (*on_complete)(c3_o success, u3_writ*));
|
||||||
|
|
||||||
|
/* u3_lmdb_read_events(): Reads events back from the database
|
||||||
|
**
|
||||||
|
** Reads back up to |len_d| events starting with |first_event_d|. For
|
||||||
|
** each event, the event will be passed to |on_event_read| and further
|
||||||
|
** reading will be aborted if the callback returns c3n.
|
||||||
|
**
|
||||||
|
** Returns c3y on complete success; c3n on any error.
|
||||||
|
*/
|
||||||
|
c3_o u3_lmdb_read_events(u3_pier* pir_u,
|
||||||
|
c3_d first_event_d,
|
||||||
|
c3_d len_d,
|
||||||
|
c3_o(*on_event_read)(u3_pier* pir_u,
|
||||||
|
c3_d id,
|
||||||
|
u3_noun mat,
|
||||||
|
u3_noun ovo));
|
||||||
|
|
||||||
|
/* u3_lmdb_write_identity(): Writes log identity
|
||||||
|
**
|
||||||
|
** Returns c3y on complete success; c3n on any error.
|
||||||
|
*/
|
||||||
|
c3_o u3_lmdb_write_identity(MDB_env* environment,
|
||||||
|
u3_noun who,
|
||||||
|
u3_noun is_fake,
|
||||||
|
u3_noun life);
|
||||||
|
|
||||||
|
/* u3_lmdb_read_identity(): Reads log identity
|
||||||
|
**
|
||||||
|
** Returns c3y on complete success; c3n on any error.
|
||||||
|
*/
|
||||||
|
c3_o u3_lmdb_read_identity(MDB_env* environment,
|
||||||
|
u3_noun* who,
|
||||||
|
u3_noun* is_fake,
|
||||||
|
u3_noun* life);
|
||||||
|
|
||||||
|
|
||||||
|
@ -168,462 +168,3 @@ u3_foil_folder(const c3_c* pax_c)
|
|||||||
}
|
}
|
||||||
return dir_u;
|
return dir_u;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* u3_foil_create(): create a new, empty, open file, not syncing.
|
|
||||||
*/
|
|
||||||
struct _foil_create_request {
|
|
||||||
uv_fs_t ruq_u;
|
|
||||||
void (*fun_f)(void*, u3_foil*);
|
|
||||||
void* vod_p;
|
|
||||||
u3_dire* dir_u;
|
|
||||||
c3_c* nam_c;
|
|
||||||
c3_c* pax_c;
|
|
||||||
};
|
|
||||||
|
|
||||||
static void
|
|
||||||
_foil_create_cb(uv_fs_t* ruq_u)
|
|
||||||
{
|
|
||||||
struct _foil_create_request* req_u = (void *)ruq_u;
|
|
||||||
u3_foil* fol_u;
|
|
||||||
|
|
||||||
fol_u = c3_malloc(sizeof(*fol_u));
|
|
||||||
fol_u->fil_u = ruq_u->result;
|
|
||||||
fol_u->dir_u = req_u->dir_u;
|
|
||||||
fol_u->nam_c = req_u->nam_c;
|
|
||||||
fol_u->end_d = 0;
|
|
||||||
|
|
||||||
req_u->fun_f(req_u->vod_p, fol_u);
|
|
||||||
|
|
||||||
c3_free(req_u->pax_c);
|
|
||||||
uv_fs_req_cleanup(ruq_u);
|
|
||||||
c3_free(req_u);
|
|
||||||
}
|
|
||||||
void
|
|
||||||
u3_foil_create(void (*fun_f)(void*, // context pointer
|
|
||||||
u3_foil*),// file object
|
|
||||||
void* vod_p, // context pointer
|
|
||||||
u3_dire* dir_u, // directory
|
|
||||||
const c3_c* nam_c) // name of new file
|
|
||||||
{
|
|
||||||
c3_c* pax_c;
|
|
||||||
c3_i err_i;
|
|
||||||
|
|
||||||
/* construct full path
|
|
||||||
*/
|
|
||||||
pax_c = _foil_path(dir_u, nam_c);
|
|
||||||
|
|
||||||
/* perform create
|
|
||||||
*/
|
|
||||||
{
|
|
||||||
struct _foil_create_request* req_u;
|
|
||||||
|
|
||||||
req_u = c3_malloc(sizeof(*req_u));
|
|
||||||
|
|
||||||
req_u->fun_f = fun_f;
|
|
||||||
req_u->vod_p = vod_p;
|
|
||||||
req_u->dir_u = dir_u;
|
|
||||||
req_u->nam_c = c3_malloc(1 + strlen(nam_c));
|
|
||||||
strcpy(req_u->nam_c, nam_c);
|
|
||||||
req_u->pax_c = pax_c;
|
|
||||||
|
|
||||||
if ( 0 != (err_i = uv_fs_open(u3L,
|
|
||||||
&req_u->ruq_u,
|
|
||||||
pax_c,
|
|
||||||
O_CREAT | O_WRONLY,
|
|
||||||
0600,
|
|
||||||
_foil_create_cb)) )
|
|
||||||
{
|
|
||||||
_foil_fail("uv_fs_open", err_i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* u3_foil_absorb(): open logfile, truncating to last good frame; blocking.
|
|
||||||
*/
|
|
||||||
u3_foil*
|
|
||||||
u3_foil_absorb(u3_dire* dir_u, // directory
|
|
||||||
c3_c* nam_c) // filename
|
|
||||||
{
|
|
||||||
u3_foil* fol_u;
|
|
||||||
uv_fs_t ruq_u;
|
|
||||||
c3_i err_i;
|
|
||||||
|
|
||||||
/* open file and create wrapper
|
|
||||||
*/
|
|
||||||
{
|
|
||||||
c3_c* pax_c = _foil_path(dir_u, nam_c);
|
|
||||||
|
|
||||||
if ( 0 > (err_i = uv_fs_open(u3L,
|
|
||||||
&ruq_u,
|
|
||||||
pax_c,
|
|
||||||
O_RDWR | O_CREAT,
|
|
||||||
0600,
|
|
||||||
0)) )
|
|
||||||
{
|
|
||||||
_foil_fail(pax_c, err_i);
|
|
||||||
c3_free(pax_c);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
c3_free(pax_c);
|
|
||||||
|
|
||||||
fol_u = c3_malloc(sizeof(*fol_u));
|
|
||||||
fol_u->dir_u = dir_u;
|
|
||||||
fol_u->fil_u = ruq_u.result;
|
|
||||||
fol_u->nam_c = c3_malloc(1 + strlen(nam_c));
|
|
||||||
strcpy(fol_u->nam_c, nam_c);
|
|
||||||
|
|
||||||
uv_fs_req_cleanup(&ruq_u);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* measure file
|
|
||||||
*/
|
|
||||||
{
|
|
||||||
if ( 0 != (err_i = uv_fs_fstat(u3L, &ruq_u, fol_u->fil_u, 0)) ) {
|
|
||||||
_foil_fail("uv_fs_fstat", err_i);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
if ( 0 != (7 & ruq_u.statbuf.st_size) ) {
|
|
||||||
_foil_fail("logfile size corrupt", 0);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
fol_u->end_d = (ruq_u.statbuf.st_size >> 3ULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* XX: scan for good frame.
|
|
||||||
*/
|
|
||||||
return fol_u;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* u3_foil_delete(): delete a file; free descriptor.
|
|
||||||
*/
|
|
||||||
struct _foil_delete_request {
|
|
||||||
uv_fs_t ruq_u;
|
|
||||||
void (*fun_f)(void*);
|
|
||||||
void* vod_p;
|
|
||||||
u3_foil* fol_u;
|
|
||||||
c3_c* pax_c;
|
|
||||||
};
|
|
||||||
|
|
||||||
static void
|
|
||||||
_foil_delete_cb(uv_fs_t* ruq_u)
|
|
||||||
{
|
|
||||||
struct _foil_delete_request* req_u = (void *)ruq_u;
|
|
||||||
|
|
||||||
if ( req_u->fun_f ) {
|
|
||||||
req_u->fun_f(req_u->vod_p);
|
|
||||||
}
|
|
||||||
|
|
||||||
c3_free(req_u->pax_c);
|
|
||||||
c3_free(req_u->fol_u->nam_c);
|
|
||||||
c3_free(req_u->fol_u);
|
|
||||||
|
|
||||||
uv_fs_req_cleanup(ruq_u);
|
|
||||||
c3_free(req_u);
|
|
||||||
}
|
|
||||||
void
|
|
||||||
u3_foil_delete(void (*fun_f)(void*), // context pointer
|
|
||||||
void* vod_p, // context pointer
|
|
||||||
u3_foil* fol_u) // file to delete
|
|
||||||
{
|
|
||||||
c3_i err_i;
|
|
||||||
c3_c* pax_c;
|
|
||||||
|
|
||||||
/* construct full path
|
|
||||||
*/
|
|
||||||
pax_c = _foil_path(fol_u->dir_u, fol_u->nam_c);
|
|
||||||
|
|
||||||
/* perform delete
|
|
||||||
*/
|
|
||||||
{
|
|
||||||
struct _foil_delete_request* req_u;
|
|
||||||
|
|
||||||
req_u = c3_malloc(sizeof(*req_u));
|
|
||||||
|
|
||||||
req_u->fun_f = fun_f;
|
|
||||||
req_u->vod_p = vod_p;
|
|
||||||
req_u->fol_u = fol_u;
|
|
||||||
req_u->pax_c = pax_c;
|
|
||||||
|
|
||||||
if ( 0 != (err_i = uv_fs_unlink(u3L,
|
|
||||||
&req_u->ruq_u,
|
|
||||||
pax_c,
|
|
||||||
_foil_delete_cb)) )
|
|
||||||
{
|
|
||||||
_foil_fail("uv_fs_unlink", err_i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* u3_foil_append(): write a frame at the end of a file, freeing the buffer.
|
|
||||||
*/
|
|
||||||
struct _foil_append_request {
|
|
||||||
uv_fs_t ruq_u;
|
|
||||||
void (*fun_f)(void*);
|
|
||||||
void* vod_p;
|
|
||||||
u3_foil* fol_u;
|
|
||||||
c3_d* fam_d;
|
|
||||||
c3_d* buf_d;
|
|
||||||
};
|
|
||||||
static void
|
|
||||||
_foil_append_cb_2(uv_fs_t* ruq_u)
|
|
||||||
{
|
|
||||||
struct _foil_append_request* req_u = (void*) ruq_u;
|
|
||||||
|
|
||||||
req_u->fun_f(req_u->vod_p);
|
|
||||||
uv_fs_req_cleanup(ruq_u);
|
|
||||||
c3_free(req_u);
|
|
||||||
}
|
|
||||||
static void
|
|
||||||
_foil_append_cb_1(uv_fs_t* ruq_u)
|
|
||||||
{
|
|
||||||
struct _foil_append_request* req_u = (void*) ruq_u;
|
|
||||||
|
|
||||||
uv_fs_req_cleanup(ruq_u);
|
|
||||||
c3_free(req_u->buf_d);
|
|
||||||
|
|
||||||
uv_fs_fsync(u3L, &req_u->ruq_u,
|
|
||||||
req_u->fol_u->fil_u,
|
|
||||||
_foil_append_cb_2);
|
|
||||||
}
|
|
||||||
void
|
|
||||||
u3_foil_append(void (*fun_f)(void*), // context pointer
|
|
||||||
void* vod_p, // context pointer
|
|
||||||
u3_foil* fol_u, // file
|
|
||||||
c3_d* buf_d, // buffer to write from
|
|
||||||
c3_d len_d) // length in chubs
|
|
||||||
{
|
|
||||||
c3_d pos_d = fol_u->end_d;
|
|
||||||
struct _foil_append_request* req_u;
|
|
||||||
c3_i err_i;
|
|
||||||
|
|
||||||
/* set up request
|
|
||||||
*/
|
|
||||||
{
|
|
||||||
req_u = c3_malloc(sizeof(*req_u));
|
|
||||||
req_u->fun_f = fun_f;
|
|
||||||
req_u->vod_p = vod_p;
|
|
||||||
req_u->fol_u = fol_u;
|
|
||||||
req_u->buf_d = buf_d;
|
|
||||||
req_u->fam_d = c3_malloc(16);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* framing
|
|
||||||
*/
|
|
||||||
{
|
|
||||||
c3_w top_w, bot_w;
|
|
||||||
|
|
||||||
fol_u->end_d = pos_d + len_d + 2;
|
|
||||||
|
|
||||||
/* XX: assumes "little-endian won", 32-bit frame length.
|
|
||||||
*/
|
|
||||||
top_w = u3r_mug_words((c3_w *)(void *) buf_d, (2 * len_d));
|
|
||||||
bot_w = (req_u->fol_u->end_d & 0xffffffff);
|
|
||||||
bot_w = u3r_mug_words(&bot_w, 1);
|
|
||||||
|
|
||||||
req_u->fam_d[0] = len_d;
|
|
||||||
req_u->fam_d[1] = ((c3_d)top_w) << 32ULL | ((c3_d) bot_w);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* do it
|
|
||||||
*/
|
|
||||||
{
|
|
||||||
uv_buf_t buf_u[2];
|
|
||||||
|
|
||||||
buf_u[0] = uv_buf_init((void *)buf_d, (len_d * 8));
|
|
||||||
buf_u[1] = uv_buf_init((void *)req_u->fam_d, 16);
|
|
||||||
|
|
||||||
if ( 0 != (err_i = uv_fs_write(u3L,
|
|
||||||
&req_u->ruq_u,
|
|
||||||
fol_u->fil_u,
|
|
||||||
buf_u,
|
|
||||||
2,
|
|
||||||
(8ULL * pos_d),
|
|
||||||
_foil_append_cb_1)) )
|
|
||||||
{
|
|
||||||
_foil_fail("uv_fs_write", err_i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* u3_foil_reveal(): read the frame before a position, blocking.
|
|
||||||
*/
|
|
||||||
c3_d*
|
|
||||||
u3_foil_reveal(u3_foil* fol_u, // file from
|
|
||||||
c3_d* sop_d, // end position/prev end
|
|
||||||
c3_d* len_d) // length return
|
|
||||||
{
|
|
||||||
c3_d pos_d = *sop_d;
|
|
||||||
c3_d fam_d[2];
|
|
||||||
c3_l mug_l;
|
|
||||||
uv_fs_t ruq_u;
|
|
||||||
c3_i err_i;
|
|
||||||
|
|
||||||
c3_assert(pos_d >= 2);
|
|
||||||
c3_assert(pos_d <= fol_u->end_d);
|
|
||||||
|
|
||||||
/* read frame data
|
|
||||||
*/
|
|
||||||
{
|
|
||||||
uv_buf_t buf_u = uv_buf_init((void *)fam_d, 16);
|
|
||||||
|
|
||||||
fam_d[0] = fam_d[1] = 0;
|
|
||||||
if ( 0 > (err_i = uv_fs_read(u3L,
|
|
||||||
&ruq_u,
|
|
||||||
fol_u->fil_u,
|
|
||||||
&buf_u, 1,
|
|
||||||
(8ULL * (pos_d - 2ULL)),
|
|
||||||
0)) )
|
|
||||||
{
|
|
||||||
_foil_fail("uv_fs_read", err_i);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
uv_fs_req_cleanup(&ruq_u);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* validate frame
|
|
||||||
*/
|
|
||||||
{
|
|
||||||
c3_w top_w, bot_w;
|
|
||||||
c3_l chk_l;
|
|
||||||
|
|
||||||
*len_d = fam_d[0];
|
|
||||||
if ( *len_d > (pos_d - 2ULL) ) {
|
|
||||||
_foil_fail("corrupt frame a", 0);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
top_w = fam_d[1] >> 32ULL;
|
|
||||||
mug_l = top_w;
|
|
||||||
|
|
||||||
bot_w = fam_d[1] & 0xffffffff;
|
|
||||||
chk_l = (pos_d & 0xffffffff);
|
|
||||||
chk_l = u3r_mug_words(&chk_l, 1);
|
|
||||||
if ( bot_w != chk_l ) {
|
|
||||||
_foil_fail("corrupt frame b", 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* read frame
|
|
||||||
*/
|
|
||||||
{
|
|
||||||
c3_d* buf_d = c3_malloc(8 * *len_d);
|
|
||||||
uv_buf_t buf_u = uv_buf_init((void *)buf_d, 8 * *len_d);
|
|
||||||
c3_l gum_l;
|
|
||||||
|
|
||||||
if ( 0 > (err_i = uv_fs_read(u3L,
|
|
||||||
&ruq_u,
|
|
||||||
fol_u->fil_u,
|
|
||||||
&buf_u, 1,
|
|
||||||
(8ULL * (pos_d - (*len_d + 2ULL))),
|
|
||||||
0) ) )
|
|
||||||
{
|
|
||||||
_foil_fail("uv_fs_read", err_i);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
uv_fs_req_cleanup(&ruq_u);
|
|
||||||
|
|
||||||
gum_l = u3r_mug_words((c3_w *)(void *) buf_d, (2 * *len_d));
|
|
||||||
if ( mug_l != gum_l ) {
|
|
||||||
_foil_fail("corrupt frame c", 0);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
*sop_d = (pos_d - (*len_d + 2ULL));
|
|
||||||
return buf_d;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* u3_foil_invent(): create a new file with one frame, freeing buffer; sync.
|
|
||||||
*/
|
|
||||||
struct _foil_invent_request {
|
|
||||||
uv_fs_t ruq_u;
|
|
||||||
void (*fun_f)(void*, u3_foil*);
|
|
||||||
u3_foil* fol_u;
|
|
||||||
void* vod_p;
|
|
||||||
c3_d* buf_d;
|
|
||||||
c3_d len_d;
|
|
||||||
c3_d num_d;
|
|
||||||
#if 0
|
|
||||||
struct timeval bef_u;
|
|
||||||
#endif
|
|
||||||
};
|
|
||||||
static void
|
|
||||||
_foil_invent_cb_2a(void* req_p)
|
|
||||||
{
|
|
||||||
struct _foil_invent_request* req_u = req_p;
|
|
||||||
|
|
||||||
if ( 1 == req_u->num_d ) {
|
|
||||||
#if 0
|
|
||||||
{
|
|
||||||
struct timeval aft_u, gap_u;
|
|
||||||
c3_w mls_w;
|
|
||||||
|
|
||||||
gettimeofday(&aft_u, 0);
|
|
||||||
timersub(&aft_u, &req_u->bef_u, &gap_u);
|
|
||||||
mls_w = (gap_u.tv_sec * 1000) + (gap_u.tv_usec / 1000);
|
|
||||||
|
|
||||||
u3l_log("invent ms: %d\r\n", mls_w);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
req_u->fun_f(req_u->vod_p, req_u->fol_u);
|
|
||||||
_foil_close(req_u->fol_u->fil_u);
|
|
||||||
|
|
||||||
c3_free(req_u);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
req_u->num_d++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
_foil_invent_cb_2b(uv_fs_t* ruq_u)
|
|
||||||
{
|
|
||||||
struct _foil_invent_request* req_u = (void *)ruq_u;
|
|
||||||
|
|
||||||
uv_fs_req_cleanup(ruq_u);
|
|
||||||
_foil_invent_cb_2a(req_u);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
_foil_invent_cb_1(void* req_p,
|
|
||||||
u3_foil* fol_u)
|
|
||||||
{
|
|
||||||
struct _foil_invent_request* req_u = req_p;
|
|
||||||
|
|
||||||
req_u->fol_u = fol_u;
|
|
||||||
|
|
||||||
/* fsync the parent directory, since we just created a file.
|
|
||||||
*/
|
|
||||||
uv_fs_fsync(u3L, &req_u->ruq_u,
|
|
||||||
req_u->fol_u->dir_u->fil_u,
|
|
||||||
_foil_invent_cb_2b);
|
|
||||||
|
|
||||||
u3_foil_append(_foil_invent_cb_2a,
|
|
||||||
req_u,
|
|
||||||
fol_u,
|
|
||||||
req_u->buf_d,
|
|
||||||
req_u->len_d);
|
|
||||||
}
|
|
||||||
void
|
|
||||||
u3_foil_invent(void (*fun_f)(void*, // context pointer
|
|
||||||
u3_foil*), // new file
|
|
||||||
void* vod_p, // context pointer
|
|
||||||
u3_dire* dir_u, // directory
|
|
||||||
c3_c* nam_c, // filename
|
|
||||||
c3_d* buf_d, // buffer (to free)
|
|
||||||
c3_d len_d) // length
|
|
||||||
{
|
|
||||||
struct _foil_invent_request* req_u;
|
|
||||||
|
|
||||||
req_u = malloc(sizeof(*req_u));
|
|
||||||
req_u->fun_f = fun_f;
|
|
||||||
req_u->fol_u = 0;
|
|
||||||
req_u->vod_p = vod_p;
|
|
||||||
req_u->buf_d = buf_d;
|
|
||||||
req_u->len_d = len_d;
|
|
||||||
req_u->num_d = 0;
|
|
||||||
#if 0
|
|
||||||
gettimeofday(&req_u->bef_u, 0);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
u3_foil_create(_foil_invent_cb_1, req_u, dir_u, nam_c);
|
|
||||||
}
|
|
||||||
|
602
pkg/urbit/vere/lmdb.c
Normal file
602
pkg/urbit/vere/lmdb.c
Normal file
@ -0,0 +1,602 @@
|
|||||||
|
/* vere/lmdb.c
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "all.h"
|
||||||
|
|
||||||
|
#include <uv.h>
|
||||||
|
#include <lmdb.h>
|
||||||
|
|
||||||
|
#include "vere/vere.h"
|
||||||
|
|
||||||
|
// Event log persistence for Urbit
|
||||||
|
//
|
||||||
|
// Persistence works by having an lmdb environment opened on the main
|
||||||
|
// thread. This environment is used to create read-only transactions
|
||||||
|
// synchronously when needed.
|
||||||
|
//
|
||||||
|
// But the majority of lmdb writes operate asynchronously in the uv worker
|
||||||
|
// pool. Since individual transactions are bound to threads, we perform all
|
||||||
|
// blocking writing on worker threads.
|
||||||
|
//
|
||||||
|
// We perform the very first metadata writes on the main thread because we
|
||||||
|
// can't do anything until they persist.
|
||||||
|
|
||||||
|
/* u3_lmdb_init(): Opens up a log environment
|
||||||
|
**
|
||||||
|
** Precondition: log_path points to an already created directory
|
||||||
|
*/
|
||||||
|
MDB_env* u3_lmdb_init(const char* log_path)
|
||||||
|
{
|
||||||
|
MDB_env* env = 0;
|
||||||
|
c3_w ret_w = mdb_env_create(&env);
|
||||||
|
if (ret_w != 0) {
|
||||||
|
u3l_log("lmdb: init fail: %s\n", mdb_strerror(ret_w));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Our databases have up to three tables: META, EVENTS, and GRAINS.
|
||||||
|
ret_w = mdb_env_set_maxdbs(env, 3);
|
||||||
|
if (ret_w != 0) {
|
||||||
|
u3l_log("lmdb: failed to set number of databases: %s\n", mdb_strerror(ret_w));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Start with a gigabyte for the event log.
|
||||||
|
//
|
||||||
|
ret_w = mdb_env_set_mapsize(env, 1024 * 1024 * 1024);
|
||||||
|
if (ret_w != 0) {
|
||||||
|
u3l_log("lmdb: failed to set database size: %s\n", mdb_strerror(ret_w));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret_w = mdb_env_open(env, log_path, 0, 0664);
|
||||||
|
if (ret_w != 0) {
|
||||||
|
u3l_log("lmdb: failed to open event log: %s\n", mdb_strerror(ret_w));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return env;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* u3_lmdb_shutdown(): Shuts down lmdb
|
||||||
|
*/
|
||||||
|
void u3_lmdb_shutdown(MDB_env* env)
|
||||||
|
{
|
||||||
|
mdb_env_close(env);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* _perform_put_on_database_raw(): Writes a key/value pair to a specific
|
||||||
|
** database as part of a transaction.
|
||||||
|
**
|
||||||
|
** The raw version doesn't take ownership of either key/value and performs no
|
||||||
|
** nock calculations, so it is safe to call from any thread.
|
||||||
|
*/
|
||||||
|
static
|
||||||
|
c3_o _perform_put_on_database_raw(MDB_txn* transaction_u,
|
||||||
|
MDB_dbi database_u,
|
||||||
|
c3_w flags,
|
||||||
|
void* key,
|
||||||
|
size_t key_len,
|
||||||
|
void* value,
|
||||||
|
size_t value_len) {
|
||||||
|
MDB_val key_val, value_val;
|
||||||
|
|
||||||
|
key_val.mv_size = key_len;
|
||||||
|
key_val.mv_data = key;
|
||||||
|
|
||||||
|
value_val.mv_size = value_len;
|
||||||
|
value_val.mv_data = value;
|
||||||
|
|
||||||
|
c3_w ret_w = mdb_put(transaction_u, database_u, &key_val, &value_val, flags);
|
||||||
|
if (ret_w != 0) {
|
||||||
|
u3l_log("lmdb: write failed: %s\n", mdb_strerror(ret_w));
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
return c3y;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* _perform_get_on_database_raw(): Reads a key/value pair to a specific
|
||||||
|
** database as part of a transaction.
|
||||||
|
*/
|
||||||
|
static
|
||||||
|
c3_o _perform_get_on_database_raw(MDB_txn* transaction_u,
|
||||||
|
MDB_dbi database_u,
|
||||||
|
void* key,
|
||||||
|
size_t key_len,
|
||||||
|
MDB_val* value) {
|
||||||
|
MDB_val key_val;
|
||||||
|
key_val.mv_size = key_len;
|
||||||
|
key_val.mv_data = key;
|
||||||
|
|
||||||
|
c3_w ret_w = mdb_get(transaction_u, database_u, &key_val, value);
|
||||||
|
if (ret_w != 0) {
|
||||||
|
u3l_log("lmdb: read failed: %s\n", mdb_strerror(ret_w));
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
return c3y;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* _perform_put_on_database_noun(): Writes a noun to the database.
|
||||||
|
**
|
||||||
|
** This requires access to the loom so it must only be run from the libuv
|
||||||
|
** thread.
|
||||||
|
*/
|
||||||
|
static
|
||||||
|
c3_o _perform_put_on_database_noun(MDB_txn* transaction_u,
|
||||||
|
MDB_dbi database_u,
|
||||||
|
c3_c* key,
|
||||||
|
u3_noun noun) {
|
||||||
|
// jam noun into an atom representation
|
||||||
|
u3_atom mat = u3ke_jam(noun);
|
||||||
|
|
||||||
|
// copy the jammed noun into a byte buffer we can hand to lmdb
|
||||||
|
c3_w len_w = u3r_met(3, mat);
|
||||||
|
c3_y* bytes_y = (c3_y*) malloc(len_w);
|
||||||
|
u3r_bytes(0, len_w, bytes_y, mat);
|
||||||
|
|
||||||
|
c3_o ret = _perform_put_on_database_raw(
|
||||||
|
transaction_u,
|
||||||
|
database_u,
|
||||||
|
0,
|
||||||
|
key, strlen(key),
|
||||||
|
bytes_y, len_w);
|
||||||
|
|
||||||
|
free(bytes_y);
|
||||||
|
u3z(mat);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* _perform_get_on_database_noun(): Reads a noun from the database.
|
||||||
|
**
|
||||||
|
** This requires access to the loom so it must only be run from the libuv
|
||||||
|
** thread.
|
||||||
|
*/
|
||||||
|
static
|
||||||
|
c3_o _perform_get_on_database_noun(MDB_txn* transaction_u,
|
||||||
|
MDB_dbi database_u,
|
||||||
|
c3_c* key,
|
||||||
|
u3_noun* noun) {
|
||||||
|
MDB_val value_val;
|
||||||
|
c3_o ret = _perform_get_on_database_raw(transaction_u,
|
||||||
|
database_u,
|
||||||
|
key, strlen(key),
|
||||||
|
&value_val);
|
||||||
|
if (ret == c3n) {
|
||||||
|
return c3y;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Take the bytes and cue them.
|
||||||
|
u3_atom raw_atom = u3i_bytes(value_val.mv_size, value_val.mv_data);
|
||||||
|
*noun = u3qe_cue(raw_atom);
|
||||||
|
return c3y;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* _write_request_data: callback struct for u3_lmdb_write_event()
|
||||||
|
*/
|
||||||
|
struct _write_request_data {
|
||||||
|
// The database environment to write to. This object is thread-safe, though
|
||||||
|
// the transactions and handles opened from it are explicitly not.
|
||||||
|
MDB_env* environment;
|
||||||
|
|
||||||
|
// The original event. Not to be accessed from the worker thread; only used
|
||||||
|
// in the callback executed on the main loop thread.
|
||||||
|
u3_writ* event;
|
||||||
|
|
||||||
|
// The event number from event separated out so we can access it on the other
|
||||||
|
// thread.
|
||||||
|
c3_d event_number;
|
||||||
|
|
||||||
|
// The event serialized out of the loom into a malloced structure accessible
|
||||||
|
// from the worker thread.
|
||||||
|
void* malloced_event_data;
|
||||||
|
|
||||||
|
// The size of the malloced_event_data. We keep track of this for the
|
||||||
|
// database write.
|
||||||
|
size_t malloced_event_data_size;
|
||||||
|
|
||||||
|
// Whether the write completed successfully.
|
||||||
|
c3_o success;
|
||||||
|
|
||||||
|
// Called on main loop thread on completion.
|
||||||
|
void (*on_complete)(c3_o, u3_writ*);
|
||||||
|
};
|
||||||
|
|
||||||
|
/* _u3_lmdb_write_event_cb(): Implementation of u3_lmdb_write_event()
|
||||||
|
**
|
||||||
|
** This is always run on a libuv background worker thread; actual nouns cannot
|
||||||
|
** be touched here.
|
||||||
|
*/
|
||||||
|
static void _u3_lmdb_write_event_cb(uv_work_t* req) {
|
||||||
|
struct _write_request_data* data = req->data;
|
||||||
|
|
||||||
|
// Creates the write transaction.
|
||||||
|
MDB_txn* transaction_u;
|
||||||
|
c3_w ret_w = mdb_txn_begin(data->environment,
|
||||||
|
(MDB_txn *) NULL,
|
||||||
|
0, /* flags */
|
||||||
|
&transaction_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: txn_begin fail: %s\n", mdb_strerror(ret_w));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Opens the database as part of the transaction.
|
||||||
|
c3_w flags_w = MDB_CREATE | MDB_INTEGERKEY;
|
||||||
|
MDB_dbi database_u;
|
||||||
|
ret_w = mdb_dbi_open(transaction_u,
|
||||||
|
"EVENTS",
|
||||||
|
flags_w,
|
||||||
|
&database_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: dbi_open fail: %s\n", mdb_strerror(ret_w));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: We need to detect the database being full, making the database
|
||||||
|
// maxsize larger, and then retrying this transaction.
|
||||||
|
//
|
||||||
|
c3_o success = _perform_put_on_database_raw(
|
||||||
|
transaction_u,
|
||||||
|
database_u,
|
||||||
|
MDB_NOOVERWRITE,
|
||||||
|
&(data->event_number),
|
||||||
|
sizeof(c3_d),
|
||||||
|
data->malloced_event_data,
|
||||||
|
data->malloced_event_data_size);
|
||||||
|
|
||||||
|
ret_w = mdb_txn_commit(transaction_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: failed to commit event %" PRIu64 ": %s\n",
|
||||||
|
data->event_number,
|
||||||
|
mdb_strerror(ret_w));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
data->success = c3y;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* _u3_lmdb_write_event_after_cb(): Implementation of u3_lmdb_write_event()
|
||||||
|
**
|
||||||
|
** This is always run on the main loop thread after the worker thread event
|
||||||
|
** completes.
|
||||||
|
*/
|
||||||
|
static void _u3_lmdb_write_event_after_cb(uv_work_t* req, int status) {
|
||||||
|
struct _write_request_data* data = req->data;
|
||||||
|
|
||||||
|
data->on_complete(data->success, data->event);
|
||||||
|
|
||||||
|
free(data->malloced_event_data);
|
||||||
|
free(data);
|
||||||
|
free(req);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* u3_lmdb_write_event(): Asynchronously writes events to the database.
|
||||||
|
**
|
||||||
|
** This writes all the passed in events along with log metadata updates to the
|
||||||
|
** database as a single transaction on a worker thread. Once the transaction
|
||||||
|
** is completed, it calls the passed in callback on the main loop thread.
|
||||||
|
**
|
||||||
|
** TODO: Make this take multiple events in one commit once we have this
|
||||||
|
** working one at a time.
|
||||||
|
*/
|
||||||
|
void u3_lmdb_write_event(MDB_env* environment,
|
||||||
|
u3_writ* event_u,
|
||||||
|
void (*on_complete)(c3_o, u3_writ*))
|
||||||
|
{
|
||||||
|
// Serialize the jammed $work into a malloced buffer we can send to the other
|
||||||
|
// thread.
|
||||||
|
c3_w siz_w = u3r_met(3, event_u->mat);
|
||||||
|
c3_y* data_u = c3_calloc(siz_w);
|
||||||
|
u3r_bytes(0, siz_w, data_u, event_u->mat);
|
||||||
|
|
||||||
|
// Structure to pass to the worker thread.
|
||||||
|
struct _write_request_data* data = c3_malloc(sizeof(struct _write_request_data));
|
||||||
|
data->environment = environment;
|
||||||
|
data->event = event_u;
|
||||||
|
data->event_number = event_u->evt_d;
|
||||||
|
data->malloced_event_data = data_u;
|
||||||
|
data->malloced_event_data_size = siz_w;
|
||||||
|
data->on_complete = on_complete;
|
||||||
|
data->success = c3n;
|
||||||
|
|
||||||
|
// Queue asynchronous work to happen on the other thread.
|
||||||
|
uv_work_t* req = c3_malloc(sizeof(uv_work_t));
|
||||||
|
req->data = data;
|
||||||
|
|
||||||
|
uv_queue_work(uv_default_loop(),
|
||||||
|
req,
|
||||||
|
_u3_lmdb_write_event_cb,
|
||||||
|
_u3_lmdb_write_event_after_cb);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* u3_lmdb_read_events(): Synchronously reads events from the database.
|
||||||
|
**
|
||||||
|
** Reads back up to |len_d| events starting with |first_event_d|. For
|
||||||
|
** each event, the event will be passed to |on_event_read| and further
|
||||||
|
** reading will be aborted if the callback returns c3n.
|
||||||
|
**
|
||||||
|
** Returns c3y on complete success; c3n on any error.
|
||||||
|
*/
|
||||||
|
c3_o u3_lmdb_read_events(u3_pier* pir_u,
|
||||||
|
c3_d first_event_d,
|
||||||
|
c3_d len_d,
|
||||||
|
c3_o(*on_event_read)(u3_pier* pir_u, c3_d id,
|
||||||
|
u3_noun mat, u3_noun ovo))
|
||||||
|
{
|
||||||
|
// Creates the read transaction.
|
||||||
|
MDB_txn* transaction_u;
|
||||||
|
c3_w ret_w = mdb_txn_begin(pir_u->log_u->db_u,
|
||||||
|
//environment,
|
||||||
|
(MDB_txn *) NULL,
|
||||||
|
MDB_RDONLY, /* flags */
|
||||||
|
&transaction_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: txn_begin fail: %s\n", mdb_strerror(ret_w));
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Opens the database as part of the transaction.
|
||||||
|
c3_w flags_w = MDB_CREATE | MDB_INTEGERKEY;
|
||||||
|
MDB_dbi database_u;
|
||||||
|
ret_w = mdb_dbi_open(transaction_u,
|
||||||
|
"EVENTS",
|
||||||
|
flags_w,
|
||||||
|
&database_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: dbi_open fail: %s\n", mdb_strerror(ret_w));
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates a cursor to iterate over keys starting at first_event_d.
|
||||||
|
MDB_cursor* cursor_u;
|
||||||
|
ret_w = mdb_cursor_open(transaction_u, database_u, &cursor_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: cursor_open fail: %s\n", mdb_strerror(ret_w));
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sets the cursor to the position of first_event_d.
|
||||||
|
MDB_val key;
|
||||||
|
MDB_val val;
|
||||||
|
key.mv_size = sizeof(c3_d);
|
||||||
|
key.mv_data = &first_event_d;
|
||||||
|
|
||||||
|
ret_w = mdb_cursor_get(cursor_u, &key, &val, MDB_SET_KEY);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: could not find initial event %" PRIu64 ": %s\r\n",
|
||||||
|
first_event_d, mdb_strerror(ret_w));
|
||||||
|
mdb_cursor_close(cursor_u);
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load up to len_d events, iterating forward across the cursor.
|
||||||
|
for (c3_d loaded = 0; (ret_w != MDB_NOTFOUND) && (loaded < len_d); ++loaded) {
|
||||||
|
// As a sanity check, we make sure that there aren't any discontinuities in
|
||||||
|
// the sequence of loaded events.
|
||||||
|
c3_d current_id = first_event_d + loaded;
|
||||||
|
if (key.mv_size != sizeof(c3_d) ||
|
||||||
|
*(c3_d*)key.mv_data != current_id) {
|
||||||
|
u3l_log("lmdb: invalid cursor key\r\n");
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now build the atom version and then the cued version from the raw data
|
||||||
|
u3_noun mat = u3i_bytes(val.mv_size, val.mv_data);
|
||||||
|
u3_noun ovo = u3ke_cue(u3k(mat));
|
||||||
|
|
||||||
|
if (on_event_read(pir_u, current_id, mat, ovo) == c3n) {
|
||||||
|
u3z(ovo);
|
||||||
|
u3z(mat);
|
||||||
|
u3l_log("lmdb: aborting replay due to error.\r\n");
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
u3z(ovo);
|
||||||
|
u3z(mat);
|
||||||
|
|
||||||
|
ret_w = mdb_cursor_get(cursor_u, &key, &val, MDB_NEXT);
|
||||||
|
if (ret_w != 0 && ret_w != MDB_NOTFOUND) {
|
||||||
|
u3l_log("lmdb: error while loading events: %s\r\n",
|
||||||
|
mdb_strerror(ret_w));
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mdb_cursor_close(cursor_u);
|
||||||
|
|
||||||
|
// Read-only transactions are aborted since we don't need to record the fact
|
||||||
|
// that we performed a read.
|
||||||
|
mdb_txn_abort(transaction_u);
|
||||||
|
|
||||||
|
return c3y;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* u3_lmdb_get_latest_event_number(): Gets last event id persisted
|
||||||
|
**
|
||||||
|
** Reads the last key in order from the EVENTS table as the latest event
|
||||||
|
** number. On table empty, returns c3y but doesn't modify event_number.
|
||||||
|
*/
|
||||||
|
c3_o u3_lmdb_get_latest_event_number(MDB_env* environment, c3_d* event_number)
|
||||||
|
{
|
||||||
|
// Creates the read transaction.
|
||||||
|
MDB_txn* transaction_u;
|
||||||
|
c3_w ret_w = mdb_txn_begin(environment,
|
||||||
|
(MDB_txn *) NULL,
|
||||||
|
0, /* flags */
|
||||||
|
&transaction_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: txn_begin fail: %s\n", mdb_strerror(ret_w));
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Opens the database as part of the transaction.
|
||||||
|
c3_w flags_w = MDB_CREATE | MDB_INTEGERKEY;
|
||||||
|
MDB_dbi database_u;
|
||||||
|
ret_w = mdb_dbi_open(transaction_u,
|
||||||
|
"EVENTS",
|
||||||
|
flags_w,
|
||||||
|
&database_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: dbi_open fail: %s\n", mdb_strerror(ret_w));
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates a cursor to point to the last event
|
||||||
|
MDB_cursor* cursor_u;
|
||||||
|
ret_w = mdb_cursor_open(transaction_u, database_u, &cursor_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: cursor_open fail: %s\n", mdb_strerror(ret_w));
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the cursor at the end of the line.
|
||||||
|
MDB_val key;
|
||||||
|
MDB_val val;
|
||||||
|
ret_w = mdb_cursor_get(cursor_u, &key, &val, MDB_LAST);
|
||||||
|
if (MDB_NOTFOUND == ret_w) {
|
||||||
|
// Clean up, but don't error out.
|
||||||
|
mdb_cursor_close(cursor_u);
|
||||||
|
mdb_txn_abort(transaction_u);
|
||||||
|
return c3y;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: could not find last event: %s\r\n", mdb_strerror(ret_w));
|
||||||
|
mdb_cursor_close(cursor_u);
|
||||||
|
mdb_txn_abort(transaction_u);
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
*event_number = *(c3_d*)key.mv_data;
|
||||||
|
|
||||||
|
mdb_cursor_close(cursor_u);
|
||||||
|
|
||||||
|
// Read-only transactions are aborted since we don't need to record the fact
|
||||||
|
// that we performed a read.
|
||||||
|
mdb_txn_abort(transaction_u);
|
||||||
|
|
||||||
|
return c3y;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* u3_lmdb_write_identity(): Writes the event log identity information
|
||||||
|
**
|
||||||
|
** We have a secondary database (table) in this environment named META where we
|
||||||
|
** read/write identity information from/to.
|
||||||
|
*/
|
||||||
|
c3_o u3_lmdb_write_identity(MDB_env* environment,
|
||||||
|
u3_noun who,
|
||||||
|
u3_noun is_fake,
|
||||||
|
u3_noun life)
|
||||||
|
{
|
||||||
|
// Creates the write transaction.
|
||||||
|
MDB_txn* transaction_u;
|
||||||
|
c3_w ret_w = mdb_txn_begin(environment,
|
||||||
|
(MDB_txn *) NULL,
|
||||||
|
0, /* flags */
|
||||||
|
&transaction_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: txn_begin fail: %s\n", mdb_strerror(ret_w));
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Opens the database as part of the transaction.
|
||||||
|
c3_w flags_w = MDB_CREATE;
|
||||||
|
MDB_dbi database_u;
|
||||||
|
ret_w = mdb_dbi_open(transaction_u,
|
||||||
|
"META",
|
||||||
|
flags_w,
|
||||||
|
&database_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: dbi_open fail: %s\n", mdb_strerror(ret_w));
|
||||||
|
mdb_txn_abort(transaction_u);
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
c3_o ret;
|
||||||
|
ret = _perform_put_on_database_noun(transaction_u, database_u, "who", who);
|
||||||
|
if (ret == c3n) {
|
||||||
|
mdb_txn_abort(transaction_u);
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = _perform_put_on_database_noun(transaction_u, database_u, "is-fake",
|
||||||
|
is_fake);
|
||||||
|
if (ret == c3n) {
|
||||||
|
mdb_txn_abort(transaction_u);
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = _perform_put_on_database_noun(transaction_u, database_u, "life", life);
|
||||||
|
if (ret == c3n) {
|
||||||
|
mdb_txn_abort(transaction_u);
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret_w = mdb_txn_commit(transaction_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: failed to commit transaction: %s\n", mdb_strerror(ret_w));
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
return c3y;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* u3_lmdb_read_identity(): Reads the event log identity information.
|
||||||
|
*/
|
||||||
|
c3_o u3_lmdb_read_identity(MDB_env* environment,
|
||||||
|
u3_noun* who,
|
||||||
|
u3_noun* is_fake,
|
||||||
|
u3_noun* life) {
|
||||||
|
// Creates the write transaction.
|
||||||
|
MDB_txn* transaction_u;
|
||||||
|
c3_w ret_w = mdb_txn_begin(environment,
|
||||||
|
(MDB_txn *) NULL,
|
||||||
|
MDB_RDONLY, /* flags */
|
||||||
|
&transaction_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: txn_begin fail: %s\n", mdb_strerror(ret_w));
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Opens the database as part of the transaction.
|
||||||
|
MDB_dbi database_u;
|
||||||
|
ret_w = mdb_dbi_open(transaction_u,
|
||||||
|
"META",
|
||||||
|
0,
|
||||||
|
&database_u);
|
||||||
|
if (0 != ret_w) {
|
||||||
|
u3l_log("lmdb: dbi_open fail: %s\n", mdb_strerror(ret_w));
|
||||||
|
mdb_txn_abort(transaction_u);
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
c3_o ret;
|
||||||
|
ret = _perform_get_on_database_noun(transaction_u, database_u, "who", who);
|
||||||
|
if (ret == c3n) {
|
||||||
|
mdb_txn_abort(transaction_u);
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = _perform_get_on_database_noun(transaction_u, database_u, "is-fake",
|
||||||
|
is_fake);
|
||||||
|
if (ret == c3n) {
|
||||||
|
mdb_txn_abort(transaction_u);
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = _perform_get_on_database_noun(transaction_u, database_u, "life", life);
|
||||||
|
if (ret == c3n) {
|
||||||
|
mdb_txn_abort(transaction_u);
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read-only transactions are aborted since we don't need to record the fact
|
||||||
|
// that we performed a read.
|
||||||
|
mdb_txn_abort(transaction_u);
|
||||||
|
|
||||||
|
return c3y;
|
||||||
|
}
|
@ -70,34 +70,37 @@ static void _pier_boot_ready(u3_pier* pir_u);
|
|||||||
static void _pier_boot_set_ship(u3_pier* pir_u, u3_noun who, u3_noun fak);
|
static void _pier_boot_set_ship(u3_pier* pir_u, u3_noun who, u3_noun fak);
|
||||||
static void _pier_exit_done(u3_pier* pir_u);
|
static void _pier_exit_done(u3_pier* pir_u);
|
||||||
|
|
||||||
/* _pier_disk_bail(): bail from disk i/o.
|
/* _pier_db_bail(): bail from disk i/o.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
_pier_disk_bail(void* vod_p, const c3_c* err_c)
|
_pier_db_bail(void* vod_p, const c3_c* err_c)
|
||||||
{
|
{
|
||||||
// u3_writ* wit_u = vod_p;
|
|
||||||
|
|
||||||
u3l_log("disk error: %s\r\n", err_c);
|
u3l_log("disk error: %s\r\n", err_c);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* _pier_disk_shutdown(): close the log.
|
/* _pier_db_shutdown(): close the log.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
_pier_disk_shutdown(u3_pier* pir_u)
|
_pier_db_shutdown(u3_pier* pir_u)
|
||||||
{
|
{
|
||||||
|
u3_lmdb_shutdown(pir_u->log_u->db_u);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* _pier_disk_commit_complete(): commit complete.
|
/* _pier_db_commit_complete(): commit complete.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
_pier_disk_commit_complete(void* vod_p)
|
_pier_db_commit_complete(c3_o success, u3_writ* wit_u)
|
||||||
{
|
{
|
||||||
u3_writ* wit_u = vod_p;
|
|
||||||
u3_pier* pir_u = wit_u->pir_u;
|
u3_pier* pir_u = wit_u->pir_u;
|
||||||
u3_disk* log_u = pir_u->log_u;
|
u3_disk* log_u = pir_u->log_u;
|
||||||
|
|
||||||
|
if (success == c3n) {
|
||||||
|
u3l_log("Failed to persist event. Exiting to prevent corruption.");
|
||||||
|
u3_pier_bail();
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef VERBOSE_EVENTS
|
#ifdef VERBOSE_EVENTS
|
||||||
u3l_log("pier: (%" PRIu64 "): commit: complete\r\n", wit_u->evt_d);
|
u3l_log("pier: (%" PRIu64 "): db commit completed\r\n", wit_u->evt_d);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* advance commit counter
|
/* advance commit counter
|
||||||
@ -111,10 +114,10 @@ _pier_disk_commit_complete(void* vod_p)
|
|||||||
_pier_apply(pir_u);
|
_pier_apply(pir_u);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* _pier_disk_commit_request(): start commit.
|
/* _pier_db_commit_request(): start commit.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
_pier_disk_commit_request(u3_writ* wit_u)
|
_pier_db_commit_request(u3_writ* wit_u)
|
||||||
{
|
{
|
||||||
u3_pier* pir_u = wit_u->pir_u;
|
u3_pier* pir_u = wit_u->pir_u;
|
||||||
u3_disk* log_u = pir_u->log_u;
|
u3_disk* log_u = pir_u->log_u;
|
||||||
@ -123,18 +126,12 @@ _pier_disk_commit_request(u3_writ* wit_u)
|
|||||||
u3l_log("pier: (%" PRIu64 "): commit: request\r\n", wit_u->evt_d);
|
u3l_log("pier: (%" PRIu64 "): commit: request\r\n", wit_u->evt_d);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* append to logfile
|
/* put it in the database
|
||||||
*/
|
*/
|
||||||
{
|
{
|
||||||
c3_d len_d = u3r_met(6, wit_u->mat);
|
u3_lmdb_write_event(log_u->db_u,
|
||||||
c3_d* buf_d = c3_malloc(8 * len_d);
|
wit_u,
|
||||||
|
_pier_db_commit_complete);
|
||||||
u3r_chubs(0, len_d, buf_d, wit_u->mat);
|
|
||||||
u3_foil_append(_pier_disk_commit_complete,
|
|
||||||
wit_u,
|
|
||||||
log_u->fol_u,
|
|
||||||
buf_d,
|
|
||||||
len_d);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* advance commit-request counter
|
/* advance commit-request counter
|
||||||
@ -145,312 +142,144 @@ _pier_disk_commit_request(u3_writ* wit_u)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* _pier_disk_write_header_complete(): commit complete.
|
|
||||||
*/
|
|
||||||
static void
|
static void
|
||||||
_pier_disk_write_header_complete(void* vod_p)
|
_pier_db_write_header(u3_pier* pir_u,
|
||||||
|
u3_noun who,
|
||||||
|
u3_noun is_fake,
|
||||||
|
u3_noun life)
|
||||||
{
|
{
|
||||||
// no-op, callback required by u3_foil_append()
|
c3_o ret = u3_lmdb_write_identity(pir_u->log_u->db_u,
|
||||||
//
|
who, is_fake, life);
|
||||||
|
if (ret == c3n) {
|
||||||
|
u3_pier_bail();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* _pier_disk_write_header(): save boot metadata.
|
/* _pier_db_read_header(): reads the ships metadata from lmdb
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
_pier_disk_write_header(u3_pier* pir_u, u3_atom mat)
|
_pier_db_read_header(u3_pier* pir_u)
|
||||||
{
|
{
|
||||||
// XX deduplicate with _pier_disk_commit_request
|
u3_noun who, is_fake, life;
|
||||||
//
|
c3_o ret = u3_lmdb_read_identity(pir_u->log_u->db_u,
|
||||||
u3_disk* log_u = pir_u->log_u;
|
&who, &is_fake, &life);
|
||||||
|
if (ret == c3n) {
|
||||||
c3_assert( 0ULL == log_u->fol_u->end_d );
|
u3l_log("Failed to load identity. Exiting...");
|
||||||
|
u3_pier_bail();
|
||||||
c3_d len_d = u3r_met(6, mat);
|
|
||||||
c3_d* buf_d = c3_malloc(8 * len_d);
|
|
||||||
|
|
||||||
u3r_chubs(0, len_d, buf_d, mat);
|
|
||||||
|
|
||||||
u3_foil_append(_pier_disk_write_header_complete,
|
|
||||||
(void*)0,
|
|
||||||
log_u->fol_u,
|
|
||||||
buf_d,
|
|
||||||
len_d);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* _pier_disk_read_header_complete():
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
_pier_disk_read_header_complete(u3_disk* log_u, u3_noun dat)
|
|
||||||
{
|
|
||||||
u3_pier* pir_u = log_u->pir_u;
|
|
||||||
|
|
||||||
{
|
|
||||||
u3_noun who, fak, len;
|
|
||||||
|
|
||||||
u3x_trel(dat, &who, &fak, &len);
|
|
||||||
|
|
||||||
c3_assert( c3y == u3ud(who) );
|
|
||||||
c3_assert( 1 >= u3r_met(7, who) );
|
|
||||||
c3_assert( c3y == u3ud(fak) );
|
|
||||||
c3_assert( 1 >= u3r_met(0, fak) );
|
|
||||||
c3_assert( c3y == u3ud(len) );
|
|
||||||
c3_assert( 1 >= u3r_met(3, len) );
|
|
||||||
|
|
||||||
_pier_boot_set_ship(pir_u, u3k(who), u3k(fak));
|
|
||||||
|
|
||||||
pir_u->lif_d = u3r_chub(0, len);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
u3z(dat);
|
_pier_boot_set_ship(pir_u, u3k(who), u3k(is_fake));
|
||||||
|
pir_u->lif_d = u3r_chub(0, life);
|
||||||
|
|
||||||
|
u3z(who);
|
||||||
|
u3z(is_fake);
|
||||||
|
u3z(life);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* _pier_disk_read_header():
|
static c3_o
|
||||||
*/
|
_pier_db_on_commit_loaded(u3_pier* pir_u,
|
||||||
static void
|
c3_d id,
|
||||||
_pier_disk_read_header(u3_disk* log_u)
|
u3_noun mat,
|
||||||
|
u3_noun ovo)
|
||||||
{
|
{
|
||||||
// XX disabled
|
u3_noun evt = u3h(u3t(ovo));
|
||||||
//
|
u3_noun job = u3k(u3t(u3t(u3t(ovo))));
|
||||||
// This is very, very slow.
|
c3_d evt_d = u3r_chub(0, evt);
|
||||||
// The one situation in which we currently *need* it -
|
|
||||||
// full log replay - it's unnecessary thanks to the current
|
|
||||||
// _pier_disk_load_commit.
|
|
||||||
// In all other situations, we're covered by
|
|
||||||
// _pier_work_play or u3_pier_boot.
|
|
||||||
// In the long run, it seems best to always get identity
|
|
||||||
// from the log for restart/replay.
|
|
||||||
//
|
|
||||||
#if 0
|
|
||||||
c3_assert( 0 != log_u->fol_u );
|
|
||||||
|
|
||||||
c3_d pos_d = log_u->fol_u->end_d;
|
if (evt_d != id) {
|
||||||
c3_o got_o = c3n;
|
_pier_db_bail(0, "pier: load: commit: event order");
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
// XX requires that writs be unlinked before effects are released
|
// Need to grab references to the nouns above.
|
||||||
//
|
u3_writ* wit_u = c3_calloc(sizeof(u3_writ));
|
||||||
if ( (0 == pir_u->ent_u) &&
|
wit_u->pir_u = pir_u;
|
||||||
(wit_u->evt_d < log_u->com_d) )
|
wit_u->evt_d = evt_d;
|
||||||
{
|
wit_u->job = u3k(job);
|
||||||
_pier_disk_load_commit(pir_u, (1ULL + god_u->dun_d), 1000ULL);
|
wit_u->mat = u3k(mat);
|
||||||
}
|
|
||||||
}
|
// Insert at queue front since we're loading events in order
|
||||||
|
if ( !pir_u->ent_u ) {
|
||||||
|
c3_assert(!pir_u->ext_u);
|
||||||
|
|
||||||
|
pir_u->ent_u = pir_u->ext_u = wit_u;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
#ifdef VERBOSE_EVENTS
|
if ( wit_u->evt_d != (1ULL + pir_u->ent_u->evt_d) ) {
|
||||||
u3l_log("pier: (%" PRIu64 "): compute: release\r\n", wit_u->evt_d);
|
fprintf(stderr, "pier: load: commit: event gap: %" PRIx64 ", %"
|
||||||
#endif
|
PRIx64 "\r\n",
|
||||||
|
wit_u->evt_d,
|
||||||
while ( pos_d ) {
|
pir_u->ent_u->evt_d);
|
||||||
c3_d len_d, evt_d;
|
_pier_db_bail(0, "pier: load: comit: event gap");
|
||||||
c3_d* buf_d;
|
return c3n;
|
||||||
u3_noun mat, ovo, job, evt;
|
|
||||||
|
|
||||||
buf_d = u3_foil_reveal(log_u->fol_u, &pos_d, &len_d);
|
|
||||||
|
|
||||||
if ( !buf_d ) {
|
|
||||||
_pier_disk_bail(0, "corrupt header");
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( 0ULL == pos_d) {
|
pir_u->ent_u->nex_u = wit_u;
|
||||||
u3_noun mat = u3i_chubs(len_d, buf_d);
|
pir_u->ent_u = wit_u;
|
||||||
u3_noun ovo = u3ke_cue(u3k(mat));
|
|
||||||
|
|
||||||
c3_assert( c3__boot == u3h(ovo) );
|
|
||||||
|
|
||||||
_pier_disk_read_header_complete(log_u, u3k(u3t(ovo)));
|
|
||||||
|
|
||||||
u3z(ovo); u3z(mat);
|
|
||||||
}
|
|
||||||
|
|
||||||
c3_free(buf_d);
|
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
return c3y;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* _pier_disk_load_commit(): load len_d commits >= lav_d; enqueue for replay
|
/* _pier_db_load_commit(): load len_d commits >= lav_d; enqueue for replay
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
_pier_disk_load_commit(u3_pier* pir_u,
|
_pier_db_load_commits(u3_pier* pir_u,
|
||||||
c3_d lav_d,
|
c3_d lav_d,
|
||||||
c3_d len_d)
|
c3_d len_d)
|
||||||
{
|
{
|
||||||
u3_disk* log_u = pir_u->log_u;
|
if (lav_d == 1) {
|
||||||
|
// We are restarting from event 1. That means we need to set the ship from
|
||||||
c3_d max_d = lav_d + len_d;
|
// the log identity information.
|
||||||
c3_d pos_d = log_u->fol_u->end_d;
|
u3_noun who, fak, len;
|
||||||
c3_d old_d = 0;
|
c3_o ret = u3_lmdb_read_identity(pir_u->log_u->db_u,
|
||||||
|
&who,
|
||||||
c3_assert ( 0 != log_u->fol_u );
|
&fak,
|
||||||
|
&len);
|
||||||
#ifdef VERBOSE_EVENTS
|
if (ret == c3n) {
|
||||||
fprintf(stderr, "pier: load: commit: at %" PRIx64 "\r\n", pos_d);
|
u3l_log("Failed to load identity for replay. Exiting...");
|
||||||
#endif
|
u3_pier_bail();
|
||||||
|
|
||||||
while ( pos_d ) {
|
|
||||||
c3_d len_d, evt_d;
|
|
||||||
c3_d* buf_d;
|
|
||||||
u3_noun mat, ovo, job, evt;
|
|
||||||
|
|
||||||
buf_d = u3_foil_reveal(log_u->fol_u, &pos_d, &len_d);
|
|
||||||
|
|
||||||
if ( !buf_d ) {
|
|
||||||
_pier_disk_bail(0, "pier: load: commit: corrupt");
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mat = u3i_chubs(len_d, buf_d);
|
_pier_boot_set_ship(pir_u, u3k(who), u3k(fak));
|
||||||
c3_free(buf_d);
|
pir_u->lif_d = u3r_chub(0, len);
|
||||||
|
|
||||||
ovo = u3ke_cue(u3k(mat));
|
u3z(who);
|
||||||
|
u3z(fak);
|
||||||
|
u3z(len);
|
||||||
|
}
|
||||||
|
|
||||||
// reached header
|
c3_o ret = u3_lmdb_read_events(pir_u,
|
||||||
//
|
lav_d,
|
||||||
if ( 0ULL == pos_d ) {
|
len_d,
|
||||||
c3_assert( 1ULL == lav_d );
|
_pier_db_on_commit_loaded);
|
||||||
c3_assert( c3__boot == u3h(ovo) );
|
if (ret == c3n) {
|
||||||
|
u3l_log("Failed to read event log for replay. Exiting...");
|
||||||
_pier_disk_read_header_complete(log_u, u3k(u3t(ovo)));
|
u3_pier_bail();
|
||||||
|
|
||||||
u3z(ovo); u3z(mat);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
c3_assert(c3__work == u3h(ovo));
|
|
||||||
evt = u3h(u3t(ovo));
|
|
||||||
job = u3k(u3t(u3t(u3t(ovo))));
|
|
||||||
evt_d = u3r_chub(0, evt);
|
|
||||||
u3z(ovo);
|
|
||||||
|
|
||||||
// confirm event order
|
|
||||||
//
|
|
||||||
if ( (0 != old_d) &&
|
|
||||||
((old_d - 1ULL) != evt_d) ) {
|
|
||||||
_pier_disk_bail(0, "pier: load: commit: event order");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
old_d = evt_d;
|
|
||||||
}
|
|
||||||
|
|
||||||
// done: read past the first event requested
|
|
||||||
//
|
|
||||||
if ( evt_d < lav_d ) {
|
|
||||||
u3z(mat);
|
|
||||||
u3z(job);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// skip: haven't reached the last event requested
|
|
||||||
//
|
|
||||||
else if ( evt_d > max_d ) {
|
|
||||||
u3z(mat);
|
|
||||||
u3z(job);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// enqueue requested event
|
|
||||||
//
|
|
||||||
else {
|
|
||||||
u3_writ* wit_u = c3_calloc(sizeof(u3_writ));
|
|
||||||
|
|
||||||
#ifdef VERBOSE_EVENTS
|
|
||||||
fprintf(stderr, "pier: load: commit: %" PRIu64 "\r\n", evt_d);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
wit_u->pir_u = pir_u;
|
|
||||||
wit_u->evt_d = evt_d;
|
|
||||||
wit_u->job = job;
|
|
||||||
wit_u->mat = mat;
|
|
||||||
|
|
||||||
/* insert at queue exit -- the oldest events run first
|
|
||||||
*/
|
|
||||||
if ( !pir_u->ent_u && !pir_u->ext_u ) {
|
|
||||||
pir_u->ent_u = pir_u->ext_u = wit_u;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
if ( (1ULL + wit_u->evt_d) != pir_u->ext_u->evt_d ) {
|
|
||||||
fprintf(stderr, "pier: load: commit: event gap: %" PRIx64 ", %"
|
|
||||||
PRIx64 "\r\n",
|
|
||||||
wit_u->evt_d,
|
|
||||||
pir_u->ext_u->evt_d);
|
|
||||||
u3z(mat);
|
|
||||||
u3z(job);
|
|
||||||
_pier_disk_bail(0, "pier: load: comit: event gap");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
wit_u->nex_u = pir_u->ext_u;
|
|
||||||
pir_u->ext_u = wit_u;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* _pier_disk_init_complete():
|
/* _pier_db_init():
|
||||||
*/
|
|
||||||
static void
|
|
||||||
_pier_disk_init_complete(u3_disk* log_u, c3_d evt_d)
|
|
||||||
{
|
|
||||||
c3_assert( c3n == log_u->liv_o );
|
|
||||||
log_u->liv_o = c3y;
|
|
||||||
|
|
||||||
log_u->com_d = log_u->moc_d = evt_d;
|
|
||||||
|
|
||||||
// restore pier identity (XX currently a no-op, see comment)
|
|
||||||
//
|
|
||||||
_pier_disk_read_header(log_u);
|
|
||||||
|
|
||||||
_pier_boot_ready(log_u->pir_u);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* _pier_disk_init():
|
|
||||||
*/
|
*/
|
||||||
static c3_o
|
static c3_o
|
||||||
_pier_disk_init(u3_disk* log_u)
|
_pier_db_init(u3_disk* log_u)
|
||||||
{
|
{
|
||||||
c3_d evt_d = 0;
|
c3_d evt_d = 0;
|
||||||
c3_d pos_d = 0;
|
c3_d pos_d = 0;
|
||||||
|
|
||||||
c3_assert( c3n == log_u->liv_o );
|
c3_assert( c3n == log_u->liv_o );
|
||||||
|
|
||||||
log_u->fol_u = u3_foil_absorb(log_u->com_u, "commit.urbit-log");
|
// Request from the database the last event
|
||||||
|
if ( c3n == u3_lmdb_get_latest_event_number(log_u->db_u, &evt_d) ) {
|
||||||
if ( !log_u->fol_u ) {
|
u3l_log("disk init from lmdb failed.");
|
||||||
return c3n;
|
return c3n;
|
||||||
}
|
}
|
||||||
|
|
||||||
// use the last event in the log to set the commit point.
|
log_u->liv_o = c3y;
|
||||||
//
|
log_u->com_d = log_u->moc_d = evt_d;
|
||||||
if ( 0 != (pos_d = log_u->fol_u->end_d) ) {
|
|
||||||
c3_d len_d = 0;
|
|
||||||
|
|
||||||
c3_d* buf_d = u3_foil_reveal(log_u->fol_u, &pos_d, &len_d);
|
_pier_boot_ready(log_u->pir_u);
|
||||||
|
|
||||||
if ( !buf_d ) {
|
|
||||||
fprintf(stderr, "pier: load: commit: corrupt\r\n");
|
|
||||||
return c3n;
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
u3_noun mat = u3i_chubs(len_d, buf_d);
|
|
||||||
u3_noun ovo = u3ke_cue(u3k(mat));
|
|
||||||
|
|
||||||
c3_assert(c3__work == u3h(ovo));
|
|
||||||
|
|
||||||
u3_noun evt = u3h(u3t(ovo));
|
|
||||||
|
|
||||||
evt_d = u3r_chub(0, evt);
|
|
||||||
|
|
||||||
u3z(mat); u3z(ovo); u3z(evt);
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef VERBOSE_EVENTS
|
|
||||||
fprintf(stderr, "pier: load: last %" PRIu64 "\r\n", evt_d);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
c3_free(buf_d);
|
|
||||||
}
|
|
||||||
|
|
||||||
_pier_disk_init_complete(log_u, evt_d);
|
|
||||||
|
|
||||||
return c3y;
|
return c3y;
|
||||||
}
|
}
|
||||||
@ -500,10 +329,18 @@ _pier_disk_create(u3_pier* pir_u)
|
|||||||
strcpy(log_c, pir_u->pax_c);
|
strcpy(log_c, pir_u->pax_c);
|
||||||
strcat(log_c, "/.urb/log");
|
strcat(log_c, "/.urb/log");
|
||||||
|
|
||||||
|
// Creates the folder
|
||||||
if ( 0 == (log_u->com_u = u3_foil_folder(log_c)) ) {
|
if ( 0 == (log_u->com_u = u3_foil_folder(log_c)) ) {
|
||||||
c3_free(log_c);
|
c3_free(log_c);
|
||||||
return c3n;
|
return c3n;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Inits the database
|
||||||
|
if ( 0 == (log_u->db_u = u3_lmdb_init(log_c)) ) {
|
||||||
|
c3_free(log_c);
|
||||||
|
return c3n;
|
||||||
|
}
|
||||||
|
|
||||||
c3_free(log_c);
|
c3_free(log_c);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -526,7 +363,7 @@ _pier_disk_create(u3_pier* pir_u)
|
|||||||
|
|
||||||
// create/load event log
|
// create/load event log
|
||||||
//
|
//
|
||||||
if ( c3n == _pier_disk_init(log_u) ) {
|
if ( c3n == _pier_db_init(log_u) ) {
|
||||||
return c3n;
|
return c3n;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -637,7 +474,7 @@ _pier_work_bail(void* vod_p,
|
|||||||
fprintf(stderr, "pier: work error: %s\r\n", err_c);
|
fprintf(stderr, "pier: work error: %s\r\n", err_c);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* _pier_work_boot(): prepare serf boot.
|
/* _pier_work_boot(): prepare for boot.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
_pier_work_boot(u3_pier* pir_u, c3_o sav_o)
|
_pier_work_boot(u3_pier* pir_u, c3_o sav_o)
|
||||||
@ -648,13 +485,13 @@ _pier_work_boot(u3_pier* pir_u, c3_o sav_o)
|
|||||||
|
|
||||||
u3_noun who = u3i_chubs(2, pir_u->who_d);
|
u3_noun who = u3i_chubs(2, pir_u->who_d);
|
||||||
u3_noun len = u3i_chubs(1, &pir_u->lif_d);
|
u3_noun len = u3i_chubs(1, &pir_u->lif_d);
|
||||||
u3_noun msg = u3nq(c3__boot, who, pir_u->fak_o, len);
|
|
||||||
u3_atom mat = u3ke_jam(msg);
|
|
||||||
|
|
||||||
if ( c3y == sav_o ) {
|
if ( c3y == sav_o ) {
|
||||||
_pier_disk_write_header(pir_u, u3k(mat));
|
_pier_db_write_header(pir_u, who, u3k(pir_u->fak_o), len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
u3_noun msg = u3nq(c3__boot, who, pir_u->fak_o, len);
|
||||||
|
u3_atom mat = u3ke_jam(msg);
|
||||||
u3_newt_write(&god_u->inn_u, mat, 0);
|
u3_newt_write(&god_u->inn_u, mat, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -748,7 +585,7 @@ _pier_work_release(u3_writ* wit_u)
|
|||||||
if ( (0 == pir_u->ent_u) &&
|
if ( (0 == pir_u->ent_u) &&
|
||||||
(wit_u->evt_d < log_u->com_d) )
|
(wit_u->evt_d < log_u->com_d) )
|
||||||
{
|
{
|
||||||
_pier_disk_load_commit(pir_u, (1ULL + god_u->dun_d), 1000ULL);
|
_pier_db_load_commits(pir_u, (1ULL + god_u->dun_d), 1000ULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -896,7 +733,7 @@ _pier_work_play(u3_pier* pir_u,
|
|||||||
c3_assert( c3n == god_u->liv_o );
|
c3_assert( c3n == god_u->liv_o );
|
||||||
god_u->liv_o = c3y;
|
god_u->liv_o = c3y;
|
||||||
|
|
||||||
// all events in the serf are complete
|
// all events in the worker are complete
|
||||||
//
|
//
|
||||||
god_u->rel_d = god_u->dun_d = god_u->sen_d = (lav_d - 1ULL);
|
god_u->rel_d = god_u->dun_d = god_u->sen_d = (lav_d - 1ULL);
|
||||||
|
|
||||||
@ -975,7 +812,7 @@ _pier_work_exit(uv_process_t* req_u,
|
|||||||
u3l_log("pier: exit: status %" PRIu64 ", signal %d\r\n", sas_i, sig_i);
|
u3l_log("pier: exit: status %" PRIu64 ", signal %d\r\n", sas_i, sig_i);
|
||||||
uv_close((uv_handle_t*) req_u, 0);
|
uv_close((uv_handle_t*) req_u, 0);
|
||||||
|
|
||||||
_pier_disk_shutdown(pir_u);
|
_pier_db_shutdown(pir_u);
|
||||||
_pier_work_shutdown(pir_u);
|
_pier_work_shutdown(pir_u);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1108,6 +945,7 @@ _pier_work_poke(void* vod_p,
|
|||||||
c3_d evt_d = u3r_chub(0, p_jar);
|
c3_d evt_d = u3r_chub(0, p_jar);
|
||||||
u3_writ* wit_u = _pier_writ_find(pir_u, evt_d);
|
u3_writ* wit_u = _pier_writ_find(pir_u, evt_d);
|
||||||
|
|
||||||
|
// Unlike slog, we always reprint interpreter errors during replay.
|
||||||
_pier_work_stdr(wit_u, q_jar);
|
_pier_work_stdr(wit_u, q_jar);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -1123,11 +961,19 @@ _pier_work_poke(void* vod_p,
|
|||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
// XXX: The wit_u pointer will almost always be 0 because of how the
|
||||||
|
// worker process manages the difference between u3V.evt_d vs
|
||||||
|
// u3A->ent_d. Either stop communicating the evt_d in the wire protocol
|
||||||
|
// or fix the worker to keep track of and communicate the correct event
|
||||||
|
// number.
|
||||||
c3_d evt_d = u3r_chub(0, p_jar);
|
c3_d evt_d = u3r_chub(0, p_jar);
|
||||||
c3_w pri_w = u3r_word(0, q_jar);
|
c3_w pri_w = u3r_word(0, q_jar);
|
||||||
u3_writ* wit_u = _pier_writ_find(pir_u, evt_d);
|
u3_writ* wit_u = _pier_writ_find(pir_u, evt_d);
|
||||||
|
|
||||||
_pier_work_slog(wit_u, pri_w, u3k(r_jar));
|
// Only print this slog if the event is uncommitted.
|
||||||
|
if ( u3_psat_pace != pir_u->sat_e ) {
|
||||||
|
_pier_work_slog(wit_u, pri_w, u3k(r_jar));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1667,7 +1513,7 @@ _pier_boot_ready(u3_pier* pir_u)
|
|||||||
_pier_boot_vent(pir_u->bot_u);
|
_pier_boot_vent(pir_u->bot_u);
|
||||||
_pier_boot_dispose(pir_u->bot_u);
|
_pier_boot_dispose(pir_u->bot_u);
|
||||||
|
|
||||||
// prepare serf for boot sequence, write log header
|
// prepare worker for boot sequence, write log header
|
||||||
//
|
//
|
||||||
_pier_work_boot(pir_u, c3y);
|
_pier_work_boot(pir_u, c3y);
|
||||||
|
|
||||||
@ -1690,13 +1536,13 @@ _pier_boot_ready(u3_pier* pir_u)
|
|||||||
|
|
||||||
// begin queuing batches of committed events
|
// begin queuing batches of committed events
|
||||||
//
|
//
|
||||||
_pier_disk_load_commit(pir_u, (1ULL + god_u->dun_d), 1000ULL);
|
_pier_db_load_commits(pir_u, (1ULL + god_u->dun_d), 1000ULL);
|
||||||
|
|
||||||
if ( 0 == god_u->dun_d ) {
|
if ( 0 == god_u->dun_d ) {
|
||||||
fprintf(stderr, "pier: replaying events 1 through %" PRIu64 "\r\n",
|
fprintf(stderr, "pier: replaying events 1 through %" PRIu64 "\r\n",
|
||||||
log_u->com_d);
|
log_u->com_d);
|
||||||
|
|
||||||
// prepare serf for replay of boot sequence, don't write log header
|
// prepare worker for replay of boot sequence, don't write log header
|
||||||
//
|
//
|
||||||
_pier_work_boot(pir_u, c3n);
|
_pier_work_boot(pir_u, c3n);
|
||||||
}
|
}
|
||||||
@ -1770,7 +1616,8 @@ start:
|
|||||||
(wit_u->evt_d == (1 + log_u->moc_d)) &&
|
(wit_u->evt_d == (1 + log_u->moc_d)) &&
|
||||||
(wit_u->evt_d == (1 + log_u->com_d)) )
|
(wit_u->evt_d == (1 + log_u->com_d)) )
|
||||||
{
|
{
|
||||||
_pier_disk_commit_request(wit_u);
|
// TODO(erg): This is the place where we build up things into a queue.
|
||||||
|
_pier_db_commit_request(wit_u);
|
||||||
act_o = c3y;
|
act_o = c3y;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1877,6 +1724,7 @@ _pier_exit_done(u3_pier* pir_u)
|
|||||||
{
|
{
|
||||||
u3l_log("pier: exit\r\n");
|
u3l_log("pier: exit\r\n");
|
||||||
|
|
||||||
|
_pier_db_shutdown(pir_u);
|
||||||
_pier_work_shutdown(pir_u);
|
_pier_work_shutdown(pir_u);
|
||||||
_pier_loop_exit(pir_u);
|
_pier_loop_exit(pir_u);
|
||||||
|
|
||||||
@ -1917,7 +1765,8 @@ u3_pier_snap(u3_pier* pir_u)
|
|||||||
|
|
||||||
// save eagerly if all computed events are already committed
|
// save eagerly if all computed events are already committed
|
||||||
//
|
//
|
||||||
if ( log_u->com_d >= top_d ) {
|
if ( (log_u->com_d >= top_d) &&
|
||||||
|
(god_u->dun_d == top_d) ) {
|
||||||
_pier_work_save(pir_u);
|
_pier_work_save(pir_u);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user