Merge pull request #6159 from urbit/jb/mars/play

vere: adds dedicated replay command
This commit is contained in:
Joe Bryan 2022-12-14 19:53:56 -05:00 committed by GitHub
commit 98aa6ee728
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1036 additions and 193 deletions

View File

@ -9,6 +9,8 @@
#include "rsignal.h"
#include <vere/serf.h>
#include "vere/vere.h"
#include "vere/mars.h"
#include "noun/events.h"
#if !defined(U3_OS_mingw)
#include <sigsegv.h>
#endif
@ -644,14 +646,17 @@ _cw_usage(c3_c* bin_c)
"utilities:\n",
" %s cram %.*s jam state:\n",
" %s dock %.*s copy binary:\n",
" %s eval %.*s eval hoon:\n",
" %s grab %.*s measure memory usage:\n",
" %s info %.*s print pier info:\n",
" %s meld %.*s deduplicate snapshot:\n",
" %s pack %.*s defragment snapshot:\n",
" %s play %.*s recompute events:\n",
" %s prep %.*s prepare for upgrade:\n",
" %s next %.*s request upgrade:\n",
" %s queu %.*s<at-event> cue state:\n",
" %s vere ARGS <output dir> download binary:\n",
" %s vile %.*s print keyfile:\n",
"\n run as a 'serf':\n",
" %s serf <pier> <key> <flags> <cache-size> <at-event>"
#ifdef U3_OS_mingw
@ -1619,9 +1624,7 @@ _cw_meld(c3_i argc, c3_c* argv[])
u3C.wag_w |= u3o_hashless;
u3m_boot(u3_Host.dir_c, (size_t)1 << u3_Host.ops_u.lom_y);
pre_w = u3a_open(u3R);
u3u_meld();
u3a_print_memory(stderr, "urbit: meld: gained", (u3a_open(u3R) - pre_w));
u3a_print_memory(stderr, "urbit: meld: gained", u3u_meld());
u3m_save();
u3_disk_exit(log_u);
@ -1768,6 +1771,138 @@ _cw_pack(c3_i argc, c3_c* argv[])
u3m_stop();
}
/* _cw_play_slog(): print during replay.
*/
static void
_cw_play_slog(u3_noun hod)
{
u3_pier_tank(0, 0, u3k(u3t(hod)));
u3z(hod);
}
/* _cw_play(): replay events, but better.
*/
static void
_cw_play(c3_i argc, c3_c* argv[])
{
c3_i ch_i, lid_i;
c3_w arg_w;
c3_o ful_o = c3n;
c3_o mel_o = c3n;
static struct option lop_u[] = {
{ "loom", required_argument, NULL, c3__loom },
{ "auto-meld", no_argument, NULL, 4 },
{ "no-demand", no_argument, NULL, 6 },
{ "full", required_argument, NULL, 'f' },
{ "replay-to", no_argument, NULL, 'n' },
{ NULL, 0, NULL, 0 }
};
u3_Host.dir_c = _main_pier_run(argv[0]);
while ( -1 != (ch_i=getopt_long(argc, argv, "fn:", lop_u, &lid_i)) ) {
switch ( ch_i ) {
case 4: { // auto-meld
mel_o = c3y;
} break;
case 6: { // no-demand
u3_Host.ops_u.map = c3n;
u3C.wag_w |= u3o_no_demand;
} break;
case c3__loom: {
c3_w lom_w;
c3_o res_o = _main_readw(optarg, u3a_bits + 3, &lom_w);
if ( (c3n == res_o) || (lom_w < 20) ) {
fprintf(stderr, "error: --loom must be >= 20 and <= %u\r\n", u3a_bits + 2);
exit(1);
}
u3_Host.ops_u.lom_y = lom_w;
} break;
case 'f': {
ful_o = c3y;
break;
}
case 'n': {
u3_Host.ops_u.til_c = strdup(optarg);
break;
}
case '?': {
fprintf(stderr, "invalid argument\r\n");
exit(1);
} break;
}
}
// argv[optind] is always "play"
//
if ( !u3_Host.dir_c ) {
if ( optind + 1 < argc ) {
u3_Host.dir_c = argv[optind + 1];
}
else {
fprintf(stderr, "invalid command, pier required\r\n");
exit(1);
}
optind++;
}
if ( optind + 1 != argc ) {
fprintf(stderr, "invalid command\r\n");
exit(1);
}
// XX handle SIGTSTP so that the lockfile is not orphaned?
//
u3_disk* log_u = _cw_disk_init(u3_Host.dir_c); // XX s/b try_aquire lock
if ( c3y == mel_o ) {
u3C.wag_w |= u3o_auto_meld;
}
u3C.wag_w |= u3o_hashless;
u3m_boot(u3_Host.dir_c, (size_t)1 << u3_Host.ops_u.lom_y);
u3C.slog_f = _cw_play_slog;
if ( c3y == ful_o ) {
u3l_log("mars: preparing for full replay\r\n");
u3e_yolo();
u3m_pave(c3y);
u3j_boot(c3y);
u3A->eve_d = 0;
}
{
u3_mars mar_u = {
.log_u = log_u,
.dir_c = u3_Host.dir_c,
.sen_d = u3A->eve_d,
.dun_d = u3A->eve_d,
.mug_l = u3r_mug(u3A->roc)
};
c3_d eve_d = 0;
c3_c* eve_c = u3_Host.ops_u.til_c;
if ( u3_Host.ops_u.til_c ) {
if ( 1 != sscanf(eve_c, "%" PRIu64 "", &eve_d) ) {
fprintf(stderr, "mars: replay-to invalid: '%s'\r\n", eve_c);
}
}
u3_mars_play(&mar_u, eve_d);
}
u3_disk_exit(log_u);
u3m_stop();
}
/* _cw_prep(): prepare for upgrade
*/
static void
@ -2055,11 +2190,13 @@ _cw_utils(c3_i argc, c3_c* argv[])
// $@ ~ :: usage
// $% [%cram dir=@t] :: jam state
// [%dock dir=@t] :: copy binary
// [%eval ~] :: eval hoon
// [?(%grab %mass) dir=@t] :: gc
// [%info dir=@t] :: print
// [%meld dir=@t] :: deduplicate
// [?(%next %upgrade) dir=@t] :: upgrade
// [%pack dir=@t] :: defragment
// [%play dir=@t] :: recompute
// [%prep dir=@t] :: prep upgrade
// [%queu dir=@t eve=@ud] :: cue state
// [?(%vere %fetch-vere) dir=@t] :: download vere
@ -2100,6 +2237,7 @@ _cw_utils(c3_i argc, c3_c* argv[])
case c3__meld: _cw_meld(argc, argv); return 1;
case c3__next: _cw_next(argc, argv); return 2; // continue on
case c3__pack: _cw_pack(argc, argv); return 1;
case c3__play: _cw_play(argc, argv); return 1;
case c3__prep: _cw_prep(argc, argv); return 2; // continue on
case c3__queu: _cw_queu(argc, argv); return 1;
case c3__vere: _cw_vere(argc, argv); return 1;

View File

@ -645,6 +645,11 @@
void
u3a_print_memory(FILE* fil_u, c3_c* cap_c, c3_w wor_w);
/* u3a_prof(): mark/measure/print memory profile. RETAIN.
*/
c3_w
u3a_prof(FILE* fil_u, c3_w den_w, u3_noun mas);
/* u3a_maid(): maybe print memory.
*/
c3_w

View File

@ -22,16 +22,17 @@
** _check flags are set inside u3 and heard outside it.
*/
enum u3o_flag { // execution flags
u3o_debug_ram = 1 << 0, // debug: gc
u3o_debug_cpu = 1 << 1, // debug: profile
u3o_check_corrupt = 1 << 2, // check: gc memory
u3o_check_fatal = 1 << 3, // check: unrecoverable
u3o_verbose = 1 << 4, // be remarkably wordy
u3o_dryrun = 1 << 5, // don't touch checkpoint
u3o_quiet = 1 << 6, // disable ~&
u3o_hashless = 1 << 7, // disable hashboard
u3o_trace = 1 << 8, // enables trace dumping
u3o_no_demand = 1 << 9 // disables demand paging
u3o_debug_ram = 1 << 0, // debug: gc
u3o_debug_cpu = 1 << 1, // debug: profile
u3o_check_corrupt = 1 << 2, // check: gc memory
u3o_check_fatal = 1 << 3, // check: unrecoverable
u3o_verbose = 1 << 4, // be remarkably wordy
u3o_dryrun = 1 << 5, // don't touch checkpoint
u3o_quiet = 1 << 6, // disable ~&
u3o_hashless = 1 << 7, // disable hashboard
u3o_trace = 1 << 8, // enables trace dumping
u3o_auto_meld = 1 << 9, // enables meld under pressure
u3o_no_demand = 1 << 10 // disables demand paging
};
/** Globals.

View File

@ -5,7 +5,7 @@
**/
/* u3u_meld(): globally deduplicate memory.
*/
void
c3_w
u3u_meld(void);
/* u3u_cram(): globably deduplicate memory, and write a rock to disk.

View File

@ -87,6 +87,11 @@
u3_noun
u3v_poke(u3_noun ovo);
/* u3v_poke_sure(): inject an event, saving new state if successful.
*/
c3_o
u3v_poke_sure(c3_w mil_w, u3_noun eve, u3_noun* pro);
/* u3v_tank(): dump single tank.
*/
void

View File

@ -6,6 +6,17 @@
/* lmdb api wrapper
*/
/* u3_lmdb_iter: event iterator
*/
typedef struct _u3_lmdb_walk {
MDB_txn* txn_u; // transaction handle
MDB_dbi mdb_u; // db handle
MDB_cursor* cur_u; // db cursor
c3_o red_o; // have we read from this yet?
c3_d nex_d; // next event number
c3_d las_d; // final event number, inclusive
} u3_lmdb_walk;
/* u3_lmdb_init(): open lmdb at [pax_c], mmap up to [siz_i].
*/
MDB_env*
@ -61,4 +72,22 @@
size_t val_i,
void* val_p);
/* u3_lmdb_walk_init(): initialize db iterator.
*/
c3_o
u3_lmdb_walk_init(MDB_env* env_u,
u3_lmdb_walk* itr_u,
c3_d nex_d,
c3_d las_d);
/* u3_lmdb_walk_next(): synchronously read next event from iterator.
*/
c3_o
u3_lmdb_walk_next(u3_lmdb_walk* itr_u, size_t* len_i, void** buf_v);
/* u3_lmdb_walk_done(): close iterator.
*/
void
u3_lmdb_walk_done(u3_lmdb_walk* itr_u);
#endif /* ifndef U3_VERE_DB_LMDB_H */

View File

@ -0,0 +1,22 @@
#ifndef U3_VERE_MARS_H
#define U3_VERE_MARS_H
/** Data types.
**/
/* u3_mars: the urbit state machine.
*/
typedef struct _u3_mars {
c3_d key_d[4]; // disk key
u3_disk* log_u; // event log
c3_c* dir_c; // execution directory (pier)
c3_d sen_d; // last event requested
c3_d dun_d; // last event processed
c3_l mug_l; // hash of state
} u3_mars;
/* u3_mars_play(): replay logged events up to [eve_d].
*/
void
u3_mars_play(u3_mars* mar_u, c3_d eve_d);
#endif /* ifndef U3_VERE_MARS_H */

View File

@ -563,6 +563,10 @@
u3_info put_u; // write queue
} u3_disk;
/* u3_disk_walk: opaque event log iterator.
*/
typedef struct _u3_disk_walk u3_disk_walk;
/* u3_psat: pier state.
*/
typedef enum {
@ -943,6 +947,23 @@
u3_disk*
u3_disk_init(c3_c* pax_c, u3_disk_cb cb_u);
/* u3_disk_etch(): serialize an event for persistence. RETAIN [eve]
*/
size_t
u3_disk_etch(u3_disk* log_u,
u3_noun eve,
c3_l mug_l,
c3_y** out_y);
/* u3_disk_sift(): parse a persisted event buffer.
*/
c3_o
u3_disk_sift(u3_disk* log_u,
size_t len_i,
c3_y* dat_y,
c3_l* mug_l,
u3_noun* job);
/* u3_disk_info(): status info as $mass.
*/
u3_noun
@ -994,6 +1015,33 @@
void
u3_disk_plan(u3_disk* log_u, u3_fact* tac_u);
/* u3_disk_read_list(): synchronously read a cons list of events.
*/
u3_weak
u3_disk_read_list(u3_disk* log_u, c3_d eve_d, c3_d len_d, c3_l* mug_l);
/* u3_disk_walk_init(): init iterator.
*/
u3_disk_walk*
u3_disk_walk_init(u3_disk* log_u,
c3_d eve_d,
c3_d len_d);
/* u3_disk_walk_live(): check if live.
*/
c3_o
u3_disk_walk_live(u3_disk_walk* wok_u);
/* u3_disk_walk_live(): get next fact.
*/
c3_o
u3_disk_walk_step(u3_disk_walk* wok_u, u3_fact* tac_u);
/* u3_disk_walk_done(): close iterator.
*/
void
u3_disk_walk_done(u3_disk_walk* wok_u);
/* u3_lord_init(): start serf.
*/
u3_lord*

View File

@ -1627,7 +1627,7 @@ u3a_mark_ptr(void* ptr_v)
if ( 0 == box_u->eus_w ) {
siz_w = box_u->siz_w;
}
else if ( 0xffffffff == box_u->eus_w ) { // see _raft_prof()
else if ( 0xffffffff == box_u->eus_w ) { // see u3a_prof()
siz_w = 0xffffffff;
box_u->eus_w = 0;
}
@ -1645,7 +1645,7 @@ u3a_mark_ptr(void* ptr_v)
else {
c3_assert(use_ws != 0);
if ( 0x80000000 == (c3_w)use_ws ) { // see _raft_prof()
if ( 0x80000000 == (c3_w)use_ws ) { // see u3a_prof()
use_ws = -1;
siz_w = 0xffffffff;
}
@ -1916,7 +1916,7 @@ u3a_print_memory(FILE* fil_u, c3_c* cap_c, c3_w wor_w)
if ( byt_w ) {
if ( gib_w ) {
fprintf(fil_u, "%s: GB/%d.%03d.%03d.%03d\r\n",
cap_c, gib_w, mib_w, kib_w, bib_w);
cap_c, gib_w, mib_w, kib_w, bib_w);
}
else if ( mib_w ) {
fprintf(fil_u, "%s: MB/%d.%03d.%03d\r\n", cap_c, mib_w, kib_w, bib_w);
@ -1941,6 +1941,122 @@ u3a_maid(FILE* fil_u, c3_c* cap_c, c3_w wor_w)
return wor_w;
}
/* _ca_print_memory(): un-captioned u3a_print_memory().
*/
static void
_ca_print_memory(FILE* fil_u, c3_w wor_w)
{
c3_w byt_w = (wor_w * 4);
c3_w gib_w = (byt_w / 1000000000);
c3_w mib_w = (byt_w % 1000000000) / 1000000;
c3_w kib_w = (byt_w % 1000000) / 1000;
c3_w bib_w = (byt_w % 1000);
if ( gib_w ) {
fprintf(fil_u, "GB/%d.%03d.%03d.%03d\r\n",
gib_w, mib_w, kib_w, bib_w);
}
else if ( mib_w ) {
fprintf(fil_u, "MB/%d.%03d.%03d\r\n", mib_w, kib_w, bib_w);
}
else if ( kib_w ) {
fprintf(fil_u, "KB/%d.%03d\r\n", kib_w, bib_w);
}
else {
fprintf(fil_u, "B/%d\r\n", bib_w);
}
}
/* u3a_prof(): mark/measure/print memory profile. RETAIN.
*/
c3_w
u3a_prof(FILE* fil_u, c3_w den_w, u3_noun mas)
{
c3_w tot_w = 0;
u3_noun h_mas, t_mas;
if ( c3n == u3r_cell(mas, &h_mas, &t_mas) ) {
fprintf(fil_u, "%.*smistyped mass\r\n", den_w, "");
return tot_w;
}
else if ( _(u3du(h_mas)) ) {
fprintf(fil_u, "%.*smistyped mass head\r\n", den_w, "");
{
c3_c* lab_c = u3m_pretty(h_mas);
fprintf(fil_u, "h_mas: %s", lab_c);
c3_free(lab_c);
}
return tot_w;
}
else {
{
c3_c* lab_c = u3m_pretty(h_mas);
fprintf(fil_u, "%*s%s: ", den_w, "", lab_c);
c3_free(lab_c);
}
u3_noun it_mas, tt_mas;
if ( c3n == u3r_cell(t_mas, &it_mas, &tt_mas) ) {
fprintf(fil_u, "%*smistyped mass tail\r\n", den_w, "");
return tot_w;
}
else if ( c3y == it_mas ) {
tot_w += u3a_mark_noun(tt_mas);
_ca_print_memory(fil_u, tot_w);
#if 1
/* The basic issue here is that tt_mas is included in .sac
* (the whole profile), so they can't both be roots in the
* normal sense. When we mark .sac later on, we want tt_mas
* to appear unmarked, but its children should be already
* marked.
*
* see u3a_mark_ptr().
*/
if ( _(u3a_is_dog(tt_mas)) ) {
u3a_box* box_u = u3a_botox(u3a_to_ptr(tt_mas));
#ifdef U3_MEMORY_DEBUG
if ( 1 == box_u->eus_w ) {
box_u->eus_w = 0xffffffff;
}
else {
box_u->eus_w -= 1;
}
#else
if ( -1 == (c3_w)box_u->use_w ) {
box_u->use_w = 0x80000000;
}
else {
box_u->use_w += 1;
}
#endif
}
#endif
return tot_w;
}
else if ( c3n == it_mas ) {
fprintf(fil_u, "\r\n");
while ( _(u3du(tt_mas)) ) {
tot_w += u3a_prof(fil_u, den_w+2, u3h(tt_mas));
tt_mas = u3t(tt_mas);
}
fprintf(fil_u, "%*s--", den_w, "");
_ca_print_memory(fil_u, tot_w);
return tot_w;
}
else {
fprintf(fil_u, "%*smistyped (strange) mass tail\r\n", den_w, "");
return tot_w;
}
}
}
/* u3a_mark_road(): mark ad-hoc persistent road structures.
*/
c3_w

View File

@ -432,6 +432,7 @@ _ce_patch_verify(u3_ce_patch* pat_u)
}
return c3n;
}
{
c3_w nug_w = u3r_mug_words(mem_w, pag_wiz_i);

View File

@ -1946,6 +1946,10 @@ void
u3m_stop()
{
u3je_secp_stop();
// XX move to jets.c
//
c3_free(u3D.ray_u);
}
/* u3m_boot(): start the u3 system. return next event, starting from 1.

View File

@ -278,11 +278,19 @@ void
u3t_trace_open(c3_c* dir_c)
{
c3_c fil_c[2048];
if ( !dir_c ) {
return;
}
snprintf(fil_c, 2048, "%s/.urb/put/trace", dir_c);
struct stat st;
if ( -1 == stat(fil_c, &st) ) {
c3_mkdir(fil_c, 0700);
if ( (-1 == stat(fil_c, &st))
&& (-1 == c3_mkdir(fil_c, 0700)) )
{
fprintf(stderr, "mkdir: %s failed: %s\r\n", fil_c, strerror(errno));
return;
}
c3_c lif_c[2056];
@ -291,6 +299,11 @@ u3t_trace_open(c3_c* dir_c)
u3_Host.tra_u.fil_u = c3_fopen(lif_c, "w");
u3_Host.tra_u.nid_w = (int)getpid();
if ( !u3_Host.tra_u.fil_u) {
fprintf(stderr, "trace open: %s\r\n", strerror(errno));
return;
}
fprintf(u3_Host.tra_u.fil_u, "[ ");
// We have two "threads", the event processing and the nock stuff.

View File

@ -426,15 +426,17 @@ _cu_realloc(FILE* fil_u, ur_root_t** tor_u, ur_nvec_t* doc_u)
/* u3u_meld(): globally deduplicate memory.
*/
#ifdef U3_MEMORY_DEBUG
void
c3_w
u3u_meld(void)
{
fprintf(stderr, "u3: unable to meld under U3_MEMORY_DEBUG\r\n");
return 0;
}
#else
void
c3_w
u3u_meld(void)
{
c3_w pre_w = u3a_open(u3R);
ur_root_t* rot_u;
ur_nvec_t cod_u;
@ -446,6 +448,8 @@ u3u_meld(void)
//
ur_nvec_free(&cod_u);
ur_root_free(rot_u);
return (u3a_open(u3R) - pre_w);
}
#endif

View File

@ -27,9 +27,18 @@ u3v_life(u3_noun eve)
c3_o
u3v_boot(u3_noun eve)
{
c3_d len_d;
{
u3_noun len = u3qb_lent(eve);
c3_assert( c3y == u3r_safe_chub(len, &len_d) );
u3z(len);
}
// ensure zero-initialized kernel
//
u3A->roc = 0;
u3A->roc = 0;
u3A->eve_d = 0;
{
u3_noun pro = u3m_soft(0, u3v_life, eve);
@ -39,7 +48,8 @@ u3v_boot(u3_noun eve)
return c3n;
}
u3A->roc = u3k(u3t(pro));
u3A->roc = u3k(u3t(pro));
u3A->eve_d = len_d;
u3z(pro);
}
@ -260,6 +270,61 @@ u3v_poke(u3_noun ovo)
return pro;
}
/* _cv_poke_eve(): u3v_poke w/out u3A->now XX replace
*/
static u3_noun
_cv_poke_eve(u3_noun sam)
{
u3_noun fun = u3n_nock_on(u3k(u3A->roc), u3k(u3x_at(_CVX_POKE, u3A->roc)));
u3_noun pro;
{
# ifdef U3_MEMORY_DEBUG
c3_w cod_w = u3a_lush(u3h(u3t(u3t(sam))));
# endif
pro = u3n_slam_on(fun, sam);
# ifdef U3_MEMORY_DEBUG
u3a_lop(cod_w);
# endif
}
return pro;
}
/* u3v_poke_sure(): inject an event, saving new state if successful.
*/
c3_o
u3v_poke_sure(c3_w mil_w, u3_noun eve, u3_noun* pro)
{
u3_noun gon = u3m_soft(mil_w, _cv_poke_eve, eve);
u3_noun tag, dat;
u3x_cell(gon, &tag, &dat);
// event failed, produce trace
//
if ( u3_blip != tag ) {
*pro = gon;
return c3n;
}
// event succeeded, persist state and produce effects
//
{
u3_noun vir, cor;
u3x_cell(dat, &vir, &cor);
u3z(u3A->roc);
u3A->roc = u3k(cor);
u3A->eve_d++;
*pro = u3k(vir);
u3z(gon);
return c3y;
}
}
/* u3v_tank(): dump single tank.
*/
void

View File

@ -225,6 +225,112 @@ u3_lmdb_gulf(MDB_env* env_u, c3_d* low_d, c3_d* hig_d)
}
}
/* u3_lmdb_walk_init(): initialize db iterator.
*/
c3_o
u3_lmdb_walk_init(MDB_env* env_u,
u3_lmdb_walk* itr_u,
c3_d nex_d,
c3_d las_d)
{
// XX assumes little-endian
//
MDB_val key_u = { .mv_size = sizeof(c3_d), .mv_data = &nex_d };
MDB_val val_u;
c3_w ops_w, ret_w;
itr_u->red_o = c3n;
itr_u->nex_d = nex_d;
itr_u->las_d = las_d;
// create a read-only transaction.
//
ops_w = MDB_RDONLY;
if ( (ret_w = mdb_txn_begin(env_u, 0, ops_w, &itr_u->txn_u)) ) {
mdb_logerror(stderr, ret_w, "lmdb: read txn_begin fail");
return c3n;
}
// open the database in the transaction
//
ops_w = MDB_CREATE | MDB_INTEGERKEY;
if ( (ret_w = mdb_dbi_open(itr_u->txn_u, "EVENTS", ops_w, &itr_u->mdb_u)) ) {
mdb_logerror(stderr, ret_w, "lmdb: read: dbi_open fail");
// XX confirm
//
mdb_txn_abort(itr_u->txn_u);
return c3n;
}
// creates a cursor to iterate over keys starting at [eve_d]
//
if ( (ret_w = mdb_cursor_open(itr_u->txn_u, itr_u->mdb_u, &itr_u->cur_u)) ) {
mdb_logerror(stderr, ret_w, "lmdb: read: cursor_open fail");
// XX confirm
//
mdb_txn_abort(itr_u->txn_u);
return c3n;
}
// set the cursor to the position of [eve_d]
//
ops_w = MDB_SET_KEY;
if ( (ret_w = mdb_cursor_get(itr_u->cur_u, &key_u, &val_u, ops_w)) ) {
mdb_logerror(stderr, ret_w, "lmdb: read: initial cursor_get failed");
fprintf(stderr, " at %" PRIu64 "\r\n", nex_d);
mdb_cursor_close(itr_u->cur_u);
// XX confirm
//
mdb_txn_abort(itr_u->txn_u);
return c3n;
}
return c3y;
}
/* u3_lmdb_walk_next(): synchronously read next event from iterator.
*/
c3_o
u3_lmdb_walk_next(u3_lmdb_walk* itr_u, size_t* len_i, void** buf_v)
{
MDB_val key_u, val_u;
c3_w ret_w, ops_w;
c3_assert( itr_u->nex_d <= itr_u->las_d );
ops_w = ( c3y == itr_u->red_o ) ? MDB_NEXT : MDB_GET_CURRENT;
if ( (ret_w = mdb_cursor_get(itr_u->cur_u, &key_u, &val_u, ops_w)) ) {
mdb_logerror(stderr, ret_w, "lmdb: walk error");
return c3n;
}
// sanity check: ensure contiguous event numbers
//
if ( *(c3_d*)key_u.mv_data != itr_u->nex_d ) {
fprintf(stderr, "lmdb: read gap: expected %" PRIu64
", received %" PRIu64 "\r\n",
itr_u->nex_d,
*(c3_d*)key_u.mv_data);
return c3n;
}
*len_i = val_u.mv_size;
*buf_v = val_u.mv_data;
itr_u->nex_d++;
itr_u->red_o = c3y;
return c3y;
}
/* u3_lmdb_walk_done(): close iterator.
*/
void
u3_lmdb_walk_done(u3_lmdb_walk* itr_u)
{
mdb_cursor_close(itr_u->cur_u);
mdb_txn_abort(itr_u->txn_u);
}
/* u3_lmdb_read(): read [len_d] events starting at [eve_d].
*/
c3_o

View File

@ -22,6 +22,12 @@ struct _cd_save {
struct _u3_disk* log_u;
};
struct _u3_disk_walk {
u3_lmdb_walk itr_u;
u3_disk* log_u;
c3_o liv_o;
};
#undef VERBOSE_DISK
#undef DISK_TRACE_JAM
#undef DISK_TRACE_CUE
@ -149,34 +155,47 @@ _disk_commit_start(struct _cd_save* req_u)
_disk_commit_after_cb);
}
/* _disk_serialize_v1(): serialize events in format v1.
/* u3_disk_etch(): serialize an event for persistence. RETAIN [eve]
*/
static c3_w
_disk_serialize_v1(u3_fact* tac_u, c3_y** out_y)
size_t
u3_disk_etch(u3_disk* log_u,
u3_noun eve,
c3_l mug_l,
c3_y** out_y)
{
size_t len_i;
c3_y* dat_y;
#ifdef DISK_TRACE_JAM
u3t_event_trace("king disk jam", 'B');
u3t_event_trace("disk etch", 'B');
#endif
// XX check version number in log_u
// XX needs api redesign to limit allocations
//
{
u3_atom mat = u3qe_jam(tac_u->job);
u3_atom mat = u3qe_jam(eve);
c3_w len_w = u3r_met(3, mat);
c3_y* dat_y = c3_malloc(4 + len_w);
dat_y[0] = tac_u->mug_l & 0xff;
dat_y[1] = (tac_u->mug_l >> 8) & 0xff;
dat_y[2] = (tac_u->mug_l >> 16) & 0xff;
dat_y[3] = (tac_u->mug_l >> 24) & 0xff;
len_i = 4 + len_w;
dat_y = c3_malloc(len_i);
dat_y[0] = mug_l & 0xff;
dat_y[1] = (mug_l >> 8) & 0xff;
dat_y[2] = (mug_l >> 16) & 0xff;
dat_y[3] = (mug_l >> 24) & 0xff;
u3r_bytes(0, len_w, dat_y + 4, mat);
u3z(mat);
}
#ifdef DISK_TRACE_JAM
u3t_event_trace("king disk jam", 'E');
u3t_event_trace("disk etch", 'E');
#endif
u3z(mat);
*out_y = dat_y;
return len_w + 4;
}
*out_y = dat_y;
return len_i;
}
/* _disk_batch(): create a write batch
@ -200,7 +219,8 @@ _disk_batch(u3_disk* log_u, c3_d len_d)
for ( c3_d i_d = 0ULL; i_d < len_d; ++i_d) {
c3_assert( (req_u->eve_d + i_d) == tac_u->eve_d );
req_u->siz_i[i_d] = _disk_serialize_v1(tac_u, &req_u->byt_y[i_d]);
req_u->siz_i[i_d] = u3_disk_etch(log_u, tac_u->job,
tac_u->mug_l, &req_u->byt_y[i_d]);
tac_u = tac_u->nex_u;
}
@ -358,6 +378,41 @@ _disk_read_done_cb(uv_timer_t* tim_u)
_disk_read_close(red_u);
}
/* u3_disk_sift(): parse a persisted event buffer.
*/
c3_o
u3_disk_sift(u3_disk* log_u,
size_t len_i,
c3_y* dat_y,
c3_l* mug_l,
u3_noun* job)
{
if ( 4 >= len_i ) {
return c3n;
}
#ifdef DISK_TRACE_CUE
u3t_event_trace("disk sift", 'B');
#endif
// XX check version in log_u
//
*mug_l = dat_y[0]
^ (dat_y[1] << 8)
^ (dat_y[2] << 16)
^ (dat_y[3] << 24);
// XX u3m_soft?
//
*job = u3ke_cue(u3i_bytes(len_i - 4, dat_y + 4));
#ifdef DISK_TRACE_CUE
u3t_event_trace("disk sift", 'E');
#endif
return c3y;
}
/* _disk_read_one_cb(): lmdb read callback, invoked for each event in order
*/
static c3_o
@ -367,29 +422,13 @@ _disk_read_one_cb(void* ptr_v, c3_d eve_d, size_t val_i, void* val_p)
u3_disk* log_u = red_u->log_u;
u3_fact* tac_u;
if ( 4 >= val_i ) {
return c3n;
}
{
u3_noun job;
c3_y* dat_y = val_p;
c3_l mug_l = dat_y[0]
^ (dat_y[1] << 8)
^ (dat_y[2] << 16)
^ (dat_y[3] << 24);
c3_l mug_l;
#ifdef DISK_TRACE_CUE
u3t_event_trace("king disk cue", 'B');
#endif
// XX u3m_soft?
//
job = u3ke_cue(u3i_bytes(val_i - 4, dat_y + 4));
#ifdef DISK_TRACE_CUE
u3t_event_trace("king disk cue", 'E');
#endif
if ( c3n == u3_disk_sift(log_u, val_i, (c3_y*)val_p, &mug_l, &job) ) {
return c3n;
}
tac_u = u3_fact_init(eve_d, mug_l, job);
}
@ -461,6 +500,120 @@ u3_disk_read(u3_disk* log_u, c3_d eve_d, c3_d len_d)
uv_timer_start(&red_u->tim_u, _disk_read_start_cb, 0, 0);
}
struct _cd_list {
u3_disk* log_u;
u3_noun eve;
c3_l mug_l;
};
/* _disk_read_list_cb(): lmdb read callback, invoked for each event in order
*/
static c3_o
_disk_read_list_cb(void* ptr_v, c3_d eve_d, size_t val_i, void* val_p)
{
struct _cd_list* ven_u = ptr_v;
u3_disk* log_u = ven_u->log_u;
{
u3_noun job;
c3_l mug_l;
if ( c3n == u3_disk_sift(log_u, val_i, (c3_y*)val_p, &mug_l, &job) ) {
return c3n;
}
ven_u->mug_l = mug_l;
ven_u->eve = u3nc(job, ven_u->eve);
}
return c3y;
}
/* u3_disk_read_list(): synchronously read a cons list of events.
*/
u3_weak
u3_disk_read_list(u3_disk* log_u, c3_d eve_d, c3_d len_d, c3_l* mug_l)
{
struct _cd_list ven_u = { log_u, u3_nul, 0 };
if ( c3n == u3_lmdb_read(log_u->mdb_u, &ven_u,
eve_d, len_d, _disk_read_list_cb) )
{
return u3_none;
}
*mug_l = ven_u.mug_l;
return u3kb_flop(ven_u.eve);
}
/* u3_disk_walk_init(): init iterator.
*/
u3_disk_walk*
u3_disk_walk_init(u3_disk* log_u,
c3_d eve_d,
c3_d len_d)
{
u3_disk_walk* wok_u = c3_malloc(sizeof(*wok_u));
c3_d max_d = eve_d + len_d - 1;
wok_u->log_u = log_u;
wok_u->liv_o = u3_lmdb_walk_init(log_u->mdb_u,
&wok_u->itr_u,
eve_d,
c3_min(max_d, log_u->dun_d));
return wok_u;
}
/* u3_disk_walk_live(): check if live.
*/
c3_o
u3_disk_walk_live(u3_disk_walk* wok_u)
{
if ( wok_u->itr_u.nex_d > wok_u->itr_u.las_d ) {
wok_u->liv_o = c3n;
}
return wok_u->liv_o;
}
/* u3_disk_walk_step(): get next fact.
*/
c3_o
u3_disk_walk_step(u3_disk_walk* wok_u, u3_fact* tac_u)
{
u3_disk* log_u = wok_u->log_u;
size_t len_i;
void* buf_v;
tac_u->eve_d = wok_u->itr_u.nex_d;
if ( c3n == u3_lmdb_walk_next(&wok_u->itr_u, &len_i, &buf_v) ) {
fprintf(stderr, "disk: (%" PRIu64 "): read fail\r\n", tac_u->eve_d);
return wok_u->liv_o = c3n;
}
if ( c3n == u3_disk_sift(log_u, len_i,
(c3_y*)buf_v,
&tac_u->mug_l,
&tac_u->job) )
{
fprintf(stderr, "disk: (%" PRIu64 "): sift fail\r\n", tac_u->eve_d);
return wok_u->liv_o = c3n;
}
return c3y;
}
/* u3_disk_walk_done(): close iterator.
*/
void
u3_disk_walk_done(u3_disk_walk* wok_u)
{
u3_lmdb_walk_done(&wok_u->itr_u);
c3_free(wok_u);
}
/* _disk_save_meta(): serialize atom, save as metadata at [key_c].
*/
static c3_o

264
pkg/urbit/worker/mars.c Normal file
View File

@ -0,0 +1,264 @@
/* worker/mars.c
**
** the main loop of a mars process.
*/
#include "all.h"
#include <vere/vere.h>
#include <vere/mars.h>
/* _mars_step_trace(): initialize or rotate trace file.
*/
static void
_mars_step_trace(const c3_c* dir_c)
{
if ( u3C.wag_w & u3o_trace ) {
if ( u3_Host.tra_u.con_w == 0 && u3_Host.tra_u.fun_w == 0 ) {
u3t_trace_open(dir_c);
}
else if ( u3_Host.tra_u.con_w >= 100000 ) {
u3t_trace_close();
u3t_trace_open(dir_c);
}
}
}
/* _mars_poke_play(): replay an event.
*/
static u3_weak
_mars_poke_play(u3_mars* mar_u, c3_d eve_d, u3_noun job)
{
u3_noun vir;
if ( c3n == u3v_poke_sure(0, job, &vir) ) {
return vir;
}
u3z(vir);
return u3_none;
}
typedef enum {
_play_yes_e, // success
_play_mem_e, // %meme
_play_int_e, // %intr
_play_log_e, // event log fail
_play_mug_e, // mug mismatch
_play_bad_e // total failure
} _mars_play_e;
/* _mars_play_batch(): replay a batch of events.
*/
static _mars_play_e
_mars_play_batch(u3_mars* mar_u, c3_o mug_o, c3_w bat_w)
{
u3_disk* log_u = mar_u->log_u;
u3_disk_walk* wok_u = u3_disk_walk_init(log_u, mar_u->dun_d + 1, bat_w);
u3_fact tac_u;
u3_noun dud;
while ( c3y == u3_disk_walk_live(wok_u) ) {
if ( c3n == u3_disk_walk_step(wok_u, &tac_u) ) {
u3_disk_walk_done(wok_u);
return _play_log_e;
}
c3_assert( ++mar_u->sen_d == tac_u.eve_d );
if ( u3_none != (dud = _mars_poke_play(mar_u, tac_u.eve_d, tac_u.job)) ) {
c3_m mot_m;
mar_u->sen_d = mar_u->dun_d;
u3_disk_walk_done(wok_u);
c3_assert( c3y == u3r_safe_word(u3h(dud), &mot_m) );
switch ( mot_m ) {
case c3__meme: {
fprintf(stderr, "play (%" PRIu64 "): %%meme\r\n", tac_u.eve_d);
u3z(dud);
return _play_mem_e;
}
case c3__intr: {
fprintf(stderr, "play (%" PRIu64 "): %%intr\r\n", tac_u.eve_d);
u3z(dud);
return _play_int_e;
}
default: {
fprintf(stderr, "play (%" PRIu64 "): failed\r\n", tac_u.eve_d);
u3_pier_punt_goof("play", dud);
// XX say something uplifting
//
return _play_bad_e;
}
}
}
mar_u->mug_l = u3r_mug(u3A->roc);
if ( tac_u.mug_l && (mar_u->mug_l != tac_u.mug_l) ) {
fprintf(stderr, "play (%" PRIu64 "): mug mismatch "
"expected %08x, actual %08x\r\n",
tac_u.eve_d, tac_u.mug_l, mar_u->mug_l);
if ( c3y == mug_o ) {
mar_u->sen_d = mar_u->dun_d;
u3_disk_walk_done(wok_u);
return _play_mug_e;
}
}
mar_u->dun_d = mar_u->sen_d;
}
u3_disk_walk_done(wok_u);
return _play_yes_e;
}
static c3_o
_mars_do_boot(u3_disk* log_u, c3_d eve_d)
{
u3_weak eve;
c3_l mug_l;
if ( u3_none == (eve = u3_disk_read_list(log_u, 1, eve_d, &mug_l)) ) {
fprintf(stderr, "boot: read failed\r\n");
return c3n;
}
u3l_log("--------------- bootstrap starting ----------------\r\n");
u3l_log("boot: 1-%u\r\n", u3qb_lent(eve));
if ( c3n == u3v_boot(eve) ) {
return c3n;
}
u3l_log("--------------- bootstrap complete ----------------\r\n");
return c3y;
}
/* u3_mars_play(): replay logged events up to [eve_d].
*/
void
u3_mars_play(u3_mars* mar_u, c3_d eve_d)
{
u3_disk* log_u = mar_u->log_u;
if ( !eve_d ) {
eve_d = log_u->dun_d;
}
else if ( eve_d <= mar_u->dun_d ) {
u3l_log("mars: already computed %" PRIu64 "\r\n", eve_d);
u3l_log(" state=%" PRIu64 ", log=%" PRIu64 "\r\n",
mar_u->dun_d, log_u->dun_d);
return;
}
else {
eve_d = c3_min(eve_d, log_u->dun_d);
}
if ( !mar_u->dun_d ) {
c3_w lif_w;
if ( c3n == u3_disk_read_meta(log_u, 0, 0, &lif_w) ) {
fprintf(stderr, "mars: disk read meta fail\r\n");
// XX exit code, cb
//
exit(1);
}
if ( c3n == _mars_do_boot(mar_u->log_u, lif_w) ) {
fprintf(stderr, "mars: boot fail\r\n");
// XX exit code, cb
//
exit(1);;
}
mar_u->sen_d = mar_u->dun_d = lif_w;
}
if ( mar_u->dun_d == log_u->dun_d ) {
u3l_log("mars: nothing to do!\r\n");
return;
}
u3l_log("---------------- playback starting ----------------\r\n");
if ( (1ULL + eve_d) == log_u->dun_d ) {
u3l_log("play: event %" PRIu64 "\r\n", log_u->dun_d);
}
else if ( eve_d != log_u->dun_d ) {
u3l_log("play: events %" PRIu64 "-%" PRIu64 " of %" PRIu64 "\r\n",
(c3_d)(1ULL + mar_u->dun_d),
eve_d,
log_u->dun_d);
}
else {
u3l_log("play: events %" PRIu64 "-%" PRIu64 "\r\n",
(c3_d)(1ULL + mar_u->dun_d),
eve_d);
}
{
c3_d fir_d = mar_u->dun_d; // started at
c3_d mem_d = 0; // last event to meme
c3_w try_w = 0; // [mem_d] retry count
while ( mar_u->dun_d < eve_d ) {
_mars_step_trace(mar_u->dir_c);
// XX get batch from args
//
switch ( _mars_play_batch(mar_u, c3y, 1024) ) {
case _play_yes_e: {
u3l_log("play (%" PRIu64 "): done\r\n", mar_u->dun_d);
u3m_reclaim();
// XX save a snapshot every N events?
//
} break;
case _play_mem_e: {
if ( (mem_d == mar_u->dun_d) && (3 == ++try_w) ) {
fprintf(stderr, "play (%" PRIu64 "): failed\r\n", mar_u->dun_d + 1);
u3m_save();
// XX check loom size, suggest --loom X
// XX exit code, cb
//
exit(1);
}
mem_d = mar_u->dun_d;
// XX pack before meld?
//
if ( u3C.wag_w & u3o_auto_meld ) {
u3a_print_memory(stderr, "mars: meld: gained", u3u_meld());
}
else {
u3a_print_memory(stderr, "mars: pack: gained", u3m_pack());
}
} break;
// XX handle any specifically?
//
case _play_int_e:
case _play_log_e:
case _play_mug_e:
case _play_bad_e: {
fprintf(stderr, "play (%" PRIu64 "): failed\r\n", mar_u->dun_d + 1);
u3m_save();
// XX exit code, cb
//
exit(1);
}
}
}
}
u3l_log("---------------- playback complete ----------------\r\n");
u3m_save();
}

View File

@ -49,137 +49,6 @@
--
*/
/* _serf_space(): print n spaces.
*/
static void
_serf_space(FILE* fil_u, c3_w n)
{
for (; n > 0; n--)
(fprintf(fil_u," "));
}
/* _serf_print_memory(): print memory amount.
**
** Helper for _serf_prof(), just an un-captioned u3a_print_memory().
*/
static void
_serf_print_memory(FILE* fil_u, c3_w wor_w)
{
c3_w byt_w = (wor_w * 4);
c3_w gib_w = (byt_w / 1000000000);
c3_w mib_w = (byt_w % 1000000000) / 1000000;
c3_w kib_w = (byt_w % 1000000) / 1000;
c3_w bib_w = (byt_w % 1000);
if ( gib_w ) {
(fprintf(fil_u, "GB/%d.%03d.%03d.%03d\r\n",
gib_w, mib_w, kib_w, bib_w));
}
else if ( mib_w ) {
(fprintf(fil_u, "MB/%d.%03d.%03d\r\n", mib_w, kib_w, bib_w));
}
else if ( kib_w ) {
(fprintf(fil_u, "KB/%d.%03d\r\n", kib_w, bib_w));
}
else {
(fprintf(fil_u, "B/%d\r\n", bib_w));
}
}
/* _serf_prof(): print memory profile. RETAIN.
*/
c3_w
_serf_prof(FILE* fil_u, c3_w den, u3_noun mas)
{
c3_w tot_w = 0;
u3_noun h_mas, t_mas;
if ( c3n == u3r_cell(mas, &h_mas, &t_mas) ) {
_serf_space(fil_u, den);
fprintf(fil_u, "mistyped mass\r\n");
return tot_w;
}
else if ( _(u3du(h_mas)) ) {
_serf_space(fil_u, den);
fprintf(fil_u, "mistyped mass head\r\n");
{
c3_c* lab_c = u3m_pretty(h_mas);
fprintf(fil_u, "h_mas: %s", lab_c);
c3_free(lab_c);
}
return tot_w;
}
else {
_serf_space(fil_u, den);
{
c3_c* lab_c = u3m_pretty(h_mas);
fprintf(fil_u, "%s: ", lab_c);
c3_free(lab_c);
}
u3_noun it_mas, tt_mas;
if ( c3n == u3r_cell(t_mas, &it_mas, &tt_mas) ) {
fprintf(fil_u, "mistyped mass tail\r\n");
return tot_w;
}
else if ( c3y == it_mas ) {
tot_w += u3a_mark_noun(tt_mas);
_serf_print_memory(fil_u, tot_w);
#if 1
/* The basic issue here is that tt_mas is included in .sac
* (the whole profile), so they can't both be roots in the
* normal sense. When we mark .sac later on, we want tt_mas
* to appear unmarked, but its children should be already
* marked.
*/
if ( _(u3a_is_dog(tt_mas)) ) {
u3a_box* box_u = u3a_botox(u3a_to_ptr(tt_mas));
#ifdef U3_MEMORY_DEBUG
if ( 1 == box_u->eus_w ) {
box_u->eus_w = 0xffffffff;
}
else {
box_u->eus_w -= 1;
}
#else
if ( -1 == (c3_w)box_u->use_w ) {
box_u->use_w = 0x80000000;
}
else {
box_u->use_w += 1;
}
#endif
}
#endif
return tot_w;
}
else if ( c3n == it_mas ) {
fprintf(fil_u, "\r\n");
while ( _(u3du(tt_mas)) ) {
tot_w += _serf_prof(fil_u, den+2, u3h(tt_mas));
tt_mas = u3t(tt_mas);
}
_serf_space(fil_u, den);
fprintf(fil_u, "--");
_serf_print_memory(fil_u, tot_w);
return tot_w;
}
else {
_serf_space(fil_u, den);
fprintf(fil_u, "mistyped (strange) mass tail\r\n");
return tot_w;
}
}
}
/* _serf_grab(): garbage collect, checking for profiling. RETAIN.
*/
static void
@ -225,7 +94,7 @@ _serf_grab(u3_noun sac)
c3_assert( u3R == &(u3H->rod_u) );
fprintf(fil_u, "\r\n");
tot_w += u3a_maid(fil_u, "total userspace", _serf_prof(fil_u, 0, sac));
tot_w += u3a_maid(fil_u, "total userspace", u3a_prof(fil_u, 0, sac));
tot_w += u3m_mark(fil_u);
tot_w += u3a_maid(fil_u, "space profile", u3a_mark_noun(sac));
@ -981,7 +850,7 @@ u3_serf_live(u3_serf* sef_u, u3_noun com, u3_noun* ret)
}
else {
u3z(com);
u3u_meld();
u3a_print_memory(stderr, "serf: meld: gained", u3u_meld());
*ret = u3nc(c3__live, u3_nul);
return c3y;
}