diff --git a/pkg/urbit/include/vere/vere.h b/pkg/urbit/include/vere/vere.h index 94d323813c..94cfb80da6 100644 --- a/pkg/urbit/include/vere/vere.h +++ b/pkg/urbit/include/vere/vere.h @@ -351,7 +351,8 @@ typedef struct _u3_save { uv_timer_t tim_u; // checkpoint timer uv_signal_t sil_u; // child signal - c3_d ent_d; // event number + c3_d req_d; // requested at evt_d + c3_d dun_d; // completed at evt_d c3_w pid_w; // pid of checkpoint process } u3_save; @@ -664,7 +665,8 @@ u3_psat_init = 0, // initialized u3_psat_boot = 1, // booting u3_psat_pace = 2, // replaying - u3_psat_play = 3 // full operation + u3_psat_play = 3, // full operation + u3_psat_done = 4 // shutting down } u3_psat; /* u3_pier: ship controller. @@ -1264,10 +1266,10 @@ void u3_pier_work(u3_pier* pir_u, u3_noun pax, u3_noun fav); - /* u3_pier_work_save(): tell worker to save checkpoint. + /* u3_pier_snap(): request checkpoint. */ void - u3_pier_work_save(u3_pier* pir_u); + u3_pier_snap(u3_pier* pir_u); /* u3_pier_stub(): get the One Pier for unreconstructed code. */ diff --git a/pkg/urbit/serf/main.c b/pkg/urbit/serf/main.c index cc72df4575..9f83537665 100644 --- a/pkg/urbit/serf/main.c +++ b/pkg/urbit/serf/main.c @@ -89,7 +89,7 @@ [%exit p=@] :: save snapshot to disk :: - :: p: number of old snaps to save (XX not respected) + :: p: event number :: [%save p=@] :: execute event @@ -684,15 +684,19 @@ _serf_poke(void* vod_p, u3_noun mat) } case c3__save: { - u3_noun sap; + u3_noun evt; + c3_d evt_d; - if ( (c3n == u3r_cell(jar, 0, &sap)) || - (c3n == u3ud(sap)) ) + if ( (c3n == u3r_cell(jar, 0, &evt)) || + (c3n == u3ud(evt)) ) { goto error; } - u3z(jar); + evt_d = u3r_chub(0, evt); + u3z(evt); + + c3_assert( evt_d == u3V.evt_d ); return u3e_save(); } diff --git a/pkg/urbit/vere/pier.c b/pkg/urbit/vere/pier.c index f48e446278..3f0f613a1c 100644 --- a/pkg/urbit/vere/pier.c +++ b/pkg/urbit/vere/pier.c @@ -66,7 +66,9 @@ static void _pier_apply(u3_pier* pir_u); static void _pier_boot_complete(u3_pier* pir_u); +static void _pier_exit_done(u3_pier* pir_u); static void _pier_loop_exit(u3_pier* pir_u); +static void _pier_work_save(u3_pier* pir_u); /* _pier_disk_bail(): bail from disk i/o. */ @@ -328,6 +330,18 @@ _pier_work_release(u3_writ* wit_u) if ( wit_u->evt_d == pir_u->but_d ) { _pier_boot_complete(pir_u); } + + // take snapshot, if requested (and awaiting the commit of this event) + // + { + u3_save* sav_u = pir_u->sav_u; + + if ( (sav_u->req_d > sav_u->dun_d) && + (wit_u->evt_d == sav_u->req_d) ) + { + _pier_work_save(pir_u); + } + } } /* _pier_work_build(): build atomic action. @@ -360,24 +374,60 @@ _pier_work_send(u3_writ* wit_u) u3_newt_write(&god_u->inn_u, u3k(wit_u->mat), wit_u); } -/* - u3_pier_work_save(): tell worker to save checkpoint. - - If there are no unsnapshotted events, then do nothing, otherwise, send - `[%save ~]` to the slave. - - XX Should we wait on some report of success before we update - `pir_u->sav_u->ent_d`? +/* _pier_work_save(): tell worker to save checkpoint. */ -void -u3_pier_work_save(u3_pier* pir_u) +static void +_pier_work_save(u3_pier* pir_u) { u3_lord* god_u = pir_u->god_u; + u3_disk* log_u = pir_u->log_u; u3_save* sav_u = pir_u->sav_u; - if ( god_u->dun_d > sav_u->ent_d ) { - u3_newt_write(&god_u->inn_u, u3ke_jam(u3nc(c3__save, 0)), 0); - sav_u->ent_d = god_u->dun_d; + c3_assert( god_u->dun_d == sav_u->req_d ); + c3_assert( log_u->com_d >= god_u->dun_d ); + + { + u3_noun mat = u3ke_jam(u3nc(c3__save, u3i_chubs(1, &god_u->dun_d))); + u3_newt_write(&god_u->inn_u, mat, 0); + + // XX wait on some report of success before updating? + // + sav_u->dun_d = sav_u->req_d; + } + + // if we're gracefully shutting down, do so now + // + if ( u3_psat_done == pir_u->sat_e ) { + _pier_exit_done(pir_u); + } +} + +/* u3_pier_snap(): request snapshot +*/ +void +u3_pier_snap(u3_pier* pir_u) +{ + u3_lord* god_u = pir_u->god_u; + u3_disk* log_u = pir_u->log_u; + u3_save* sav_u = pir_u->sav_u; + + c3_d top_d = c3_max(god_u->sen_d, god_u->dun_d); + + // no-op if there are no un-snapshot'ed events + // + if ( top_d > sav_u->dun_d ) { + sav_u->req_d = top_d; + + // save eagerly if all computed events are already committed + // + if ( log_u->com_d >= top_d ) { + _pier_work_save(pir_u); + } + } + // if we're gracefully shutting down, do so now + // + else if ( u3_psat_done == pir_u->sat_e ) { + _pier_exit_done(pir_u); } } @@ -462,6 +512,7 @@ _pier_apply(u3_pier* pir_u) { u3_disk* log_u = pir_u->log_u; u3_lord* god_u = pir_u->god_u; + u3_save* sav_u = pir_u->sav_u; if ( (0 == log_u) || (0 == god_u) || @@ -479,11 +530,12 @@ start: */ wit_u = pir_u->ext_u; while ( wit_u ) { - /* if writ is (a) next in line to compute, and (b) worker is inactive, - ** request computation + /* if writ is (a) next in line to compute, (b) worker is inactive, + ** and (c) a snapshot has not been requested, request computation */ if ( (wit_u->evt_d == (1 + god_u->sen_d)) && - (god_u->sen_d == god_u->dun_d) ) + (god_u->sen_d == god_u->dun_d) && + (sav_u->dun_d == sav_u->req_d) ) { _pier_work_compute(wit_u); act_o = c3y; @@ -1559,19 +1611,33 @@ u3_pier_discover(u3_pier* pir_u, _pier_apply(pir_u); } +/* _pier_exit_done(): synchronously shutting down +*/ +static void +_pier_exit_done(u3_pier* pir_u) +{ + fprintf(stderr, "pier: exit\r\n"); + + _pier_work_shutdown(pir_u); + _pier_loop_exit(pir_u); + + // XX uninstall pier from u3K.tab_u, dispose + + // XX no can do + // + uv_stop(u3L); +} + /* u3_pier_exit(): trigger a gentle shutdown. */ void u3_pier_exit(u3_pier* pir_u) { - fprintf(stderr, "pier: exit\r\n"); - u3_pier_work_save(pir_u); - _pier_work_shutdown(pir_u); - _pier_loop_exit(pir_u); + pir_u->sat_e = u3_psat_done; - // XX no can do + // XX must wait for callback confirming // - uv_stop(u3L); + u3_pier_snap(pir_u); } /* u3_pier_send(): modern send with target and path. @@ -1759,7 +1825,7 @@ static void _pier_boot_complete(u3_pier* pir_u) { if ( u3_psat_init != pir_u->sat_e ) { - u3_pier_work_save(pir_u); + u3_pier_snap(pir_u); } if ( u3_psat_boot == pir_u->sat_e ) { diff --git a/pkg/urbit/vere/save.c b/pkg/urbit/vere/save.c index d1e98c9ce5..480e1a5f29 100644 --- a/pkg/urbit/vere/save.c +++ b/pkg/urbit/vere/save.c @@ -17,7 +17,7 @@ static void _save_time_cb(uv_timer_t* tim_u) { u3_pier *pir_u = tim_u->data; - u3_pier_work_save(pir_u); + u3_pier_snap(pir_u); } /* u3_save_ef_chld(): report save termination. @@ -49,7 +49,8 @@ u3_save_io_init(u3_pier *pir_u) { u3_save* sav_u = pir_u->sav_u; - sav_u->ent_d = 0; + sav_u->req_d = 0; + sav_u->dun_d = 0; sav_u->pid_w = 0; sav_u->tim_u.data = pir_u;