updates the worker to track in-progress vs completed event numbers

This commit is contained in:
Joe Bryan 2019-04-26 11:21:22 -07:00
parent 45fff7f8da
commit b6b84b5369
2 changed files with 36 additions and 38 deletions

View File

@ -987,16 +987,14 @@ _pier_work_poke(void* vod_p,
goto error;
}
else {
// XXX: The wit_u pointer will almost always be 0 because of how the
// worker process manages the difference between u3V.evt_d vs
// u3A->ent_d. Either stop communicating the evt_d in the wire protocol
// or fix the worker to keep track of and communicate the correct event
// number.
c3_d evt_d = u3r_chub(0, p_jar);
c3_w pri_w = u3r_word(0, q_jar);
u3_writ* wit_u = _pier_writ_find(pir_u, evt_d);
// Only print this slog if the event is uncommitted.
// skip slog during replay
//
// XX also update the worker to skip *sending* the slog during replay
//
if ( u3_psat_pace != pir_u->sat_e ) {
_pier_work_slog(wit_u, pri_w, u3k(r_jar));
}

View File

@ -26,7 +26,8 @@
typedef struct _u3_worker {
c3_w len_w; // boot sequence length
c3_d evt_d; // last event processed
c3_d sen_d; // last event requested
c3_d dun_d; // last event processed
c3_l mug_l; // hash of state
c3_d key_d[4]; // disk key
u3_moat inn_u; // message input
@ -362,7 +363,7 @@ static void
_worker_send_complete(u3_noun vir)
{
_worker_send(u3nq(c3__done,
u3i_chubs(1, &u3V.evt_d),
u3i_chubs(1, &u3V.dun_d),
u3V.mug_l,
vir));
}
@ -372,7 +373,7 @@ _worker_send_complete(u3_noun vir)
static void
_worker_send_stdr(c3_c* str_c)
{
_worker_send(u3nt(c3__stdr, u3i_chubs(1, &u3V.evt_d), u3i_string(str_c)));
_worker_send(u3nt(c3__stdr, u3i_chubs(1, &u3V.sen_d), u3i_string(str_c)));
}
/* _worker_send_slog(): send hint output (hod is [priority tank]).
@ -380,7 +381,7 @@ _worker_send_stdr(c3_c* str_c)
static void
_worker_send_slog(u3_noun hod)
{
_worker_send(u3nt(c3__slog, u3i_chubs(1, &u3V.evt_d), hod));
_worker_send(u3nt(c3__slog, u3i_chubs(1, &u3V.sen_d), hod));
}
/* _worker_lame(): event failed, replace with error event.
@ -446,8 +447,9 @@ static void
_worker_sure(u3_noun ovo, u3_noun vir, u3_noun cor)
{
u3z(u3A->roc);
u3A->roc = cor;
u3V.mug_l = u3r_mug(u3A->roc);
u3A->roc = cor;
u3A->ent_d = u3V.dun_d;
u3V.mug_l = u3r_mug(u3A->roc);
u3_noun sac = u3_nul;
@ -506,7 +508,8 @@ _worker_work_live(c3_d evt_d, // event number
{
u3_noun now, ovo, gon;
c3_assert(evt_d == u3V.evt_d + 1ULL);
c3_assert(evt_d == u3V.dun_d + 1ULL);
if ( 0 != mug_l ) {
c3_assert(u3V.mug_l == mug_l);
}
@ -516,9 +519,7 @@ _worker_work_live(c3_d evt_d, // event number
u3z(u3A->now);
u3A->now = u3k(now);
// XX why is this set before u3v_poke?
//
u3A->ent_d = evt_d;
u3V.sen_d = evt_d;
#ifdef U3_EVENT_TIME_DEBUG
{
@ -557,6 +558,8 @@ _worker_work_live(c3_d evt_d, // event number
if ( u3_blip != u3h(gon) ) {
// event rejected
//
u3V.sen_d = u3V.dun_d;
u3_noun why, tan;
u3x_cell(gon, &why, &tan);
@ -568,9 +571,8 @@ _worker_work_live(c3_d evt_d, // event number
else {
// event accepted
//
// XX reconcile/dedupe with u3A->ent_d
//
u3V.evt_d = evt_d;
u3V.dun_d = u3V.sen_d;
// vir/(list ovum) list of effects
// cor/arvo arvo core
//
@ -584,7 +586,7 @@ _worker_work_live(c3_d evt_d, // event number
// reclaim memory from persistent caches on |reset
//
if ( 0 == (u3A->ent_d % 1000ULL) ) {
if ( 0 == (evt_d % 1000ULL) ) {
u3m_reclaim();
}
}
@ -613,12 +615,16 @@ _worker_work_boot(c3_d evt_d,
c3_l mug_l,
u3_noun job)
{
c3_assert(evt_d == u3V.evt_d + 1ULL);
// here we asset on u3V.sen_d, because u3V.dun_d isn't set until
// after u3V.sen_d == u3V.len_w (ie, after the lifecycle evaluation)
//
c3_assert(evt_d == u3V.sen_d + 1ULL);
if ( 0 != mug_l ) {
c3_assert(u3V.mug_l == mug_l);
}
u3V.evt_d = evt_d;
u3V.sen_d = evt_d;
u3A->roe = u3nc(job, u3A->roe);
@ -639,13 +645,13 @@ _worker_work_boot(c3_d evt_d,
exit(1);
}
u3A->roc = u3k(u3t(pru));
u3V.mug_l = u3r_mug(u3A->roc);
u3V.dun_d = evt_d;
u3A->ent_d = u3V.dun_d;
u3A->roc = u3k(u3t(pru));
u3V.mug_l = u3r_mug(u3A->roc);
u3l_log("work: (%" PRIu64 ")| core: %x\r\n", evt_d, u3V.mug_l);
// XX set u3A->evt_d ?
//
u3z(pru);
}
else {
@ -820,7 +826,7 @@ _worker_poke(void* vod_p, u3_noun mat)
evt_d = u3r_chub(0, evt);
u3z(jar);
c3_assert( evt_d == u3V.evt_d );
c3_assert( evt_d == u3V.dun_d );
return u3e_save();
}
@ -843,7 +849,7 @@ u3_worker_boot(void)
if ( u3_none != u3A->our ) {
u3V.mug_l = u3r_mug(u3A->roc);
nex_d = u3A->ent_d + 1ULL;
nex_d = u3V.dun_d + 1ULL;
dat = u3nc(u3_nul, u3nt(u3i_chubs(1, &nex_d),
u3V.mug_l,
u3nc(u3k(u3A->our), u3k(u3A->fak))));
@ -876,6 +882,9 @@ main(c3_i argc, c3_c* argv[])
c3_assert(4 == argc);
memset(&u3V, 0, sizeof(u3V));
memset(&u3_Host.tra_u, 0, sizeof(u3_Host.tra_u));
/* load passkey
*/
{
@ -898,19 +907,10 @@ main(c3_i argc, c3_c* argv[])
u3V.dir_c = strdup(dir_c);
}
/* clear tracing struct
*/
{
u3_Host.tra_u.nid_w = 0;
u3_Host.tra_u.fil_u = NULL;
u3_Host.tra_u.con_w = 0;
u3_Host.tra_u.fun_w = 0;
}
/* boot image
*/
{
u3V.evt_d = u3m_boot_new(dir_c);
u3V.sen_d = u3V.dun_d = u3m_boot_new(dir_c);
u3C.stderr_log_f = _worker_send_stdr;
u3C.slog_f = _worker_send_slog;
}