enforces snapshot/commit ordering constraints

This commit is contained in:
Joe Bryan 2019-04-05 10:47:53 -07:00
parent 4fb35aaee3
commit 92ece2d22a
4 changed files with 107 additions and 34 deletions

View File

@ -351,7 +351,8 @@
typedef struct _u3_save { typedef struct _u3_save {
uv_timer_t tim_u; // checkpoint timer uv_timer_t tim_u; // checkpoint timer
uv_signal_t sil_u; // child signal uv_signal_t sil_u; // child signal
c3_d ent_d; // event number c3_d req_d; // requested at evt_d
c3_d dun_d; // completed at evt_d
c3_w pid_w; // pid of checkpoint process c3_w pid_w; // pid of checkpoint process
} u3_save; } u3_save;
@ -664,7 +665,8 @@
u3_psat_init = 0, // initialized u3_psat_init = 0, // initialized
u3_psat_boot = 1, // booting u3_psat_boot = 1, // booting
u3_psat_pace = 2, // replaying u3_psat_pace = 2, // replaying
u3_psat_play = 3 // full operation u3_psat_play = 3, // full operation
u3_psat_done = 4 // shutting down
} u3_psat; } u3_psat;
/* u3_pier: ship controller. /* u3_pier: ship controller.
@ -1264,10 +1266,10 @@
void void
u3_pier_work(u3_pier* pir_u, u3_noun pax, u3_noun fav); u3_pier_work(u3_pier* pir_u, u3_noun pax, u3_noun fav);
/* u3_pier_work_save(): tell worker to save checkpoint. /* u3_pier_snap(): request checkpoint.
*/ */
void void
u3_pier_work_save(u3_pier* pir_u); u3_pier_snap(u3_pier* pir_u);
/* u3_pier_stub(): get the One Pier for unreconstructed code. /* u3_pier_stub(): get the One Pier for unreconstructed code.
*/ */

View File

@ -89,7 +89,7 @@
[%exit p=@] [%exit p=@]
:: save snapshot to disk :: save snapshot to disk
:: ::
:: p: number of old snaps to save (XX not respected) :: p: event number
:: ::
[%save p=@] [%save p=@]
:: execute event :: execute event
@ -684,15 +684,19 @@ _serf_poke(void* vod_p, u3_noun mat)
} }
case c3__save: { case c3__save: {
u3_noun sap; u3_noun evt;
c3_d evt_d;
if ( (c3n == u3r_cell(jar, 0, &sap)) || if ( (c3n == u3r_cell(jar, 0, &evt)) ||
(c3n == u3ud(sap)) ) (c3n == u3ud(evt)) )
{ {
goto error; goto error;
} }
u3z(jar); evt_d = u3r_chub(0, evt);
u3z(evt);
c3_assert( evt_d == u3V.evt_d );
return u3e_save(); return u3e_save();
} }

View File

@ -66,7 +66,9 @@
static void _pier_apply(u3_pier* pir_u); static void _pier_apply(u3_pier* pir_u);
static void _pier_boot_complete(u3_pier* pir_u); static void _pier_boot_complete(u3_pier* pir_u);
static void _pier_exit_done(u3_pier* pir_u);
static void _pier_loop_exit(u3_pier* pir_u); static void _pier_loop_exit(u3_pier* pir_u);
static void _pier_work_save(u3_pier* pir_u);
/* _pier_disk_bail(): bail from disk i/o. /* _pier_disk_bail(): bail from disk i/o.
*/ */
@ -328,6 +330,18 @@ _pier_work_release(u3_writ* wit_u)
if ( wit_u->evt_d == pir_u->but_d ) { if ( wit_u->evt_d == pir_u->but_d ) {
_pier_boot_complete(pir_u); _pier_boot_complete(pir_u);
} }
// take snapshot, if requested (and awaiting the commit of this event)
//
{
u3_save* sav_u = pir_u->sav_u;
if ( (sav_u->req_d > sav_u->dun_d) &&
(wit_u->evt_d == sav_u->req_d) )
{
_pier_work_save(pir_u);
}
}
} }
/* _pier_work_build(): build atomic action. /* _pier_work_build(): build atomic action.
@ -360,24 +374,60 @@ _pier_work_send(u3_writ* wit_u)
u3_newt_write(&god_u->inn_u, u3k(wit_u->mat), wit_u); u3_newt_write(&god_u->inn_u, u3k(wit_u->mat), wit_u);
} }
/* /* _pier_work_save(): tell worker to save checkpoint.
u3_pier_work_save(): tell worker to save checkpoint.
If there are no unsnapshotted events, then do nothing, otherwise, send
`[%save ~]` to the slave.
XX Should we wait on some report of success before we update
`pir_u->sav_u->ent_d`?
*/ */
void static void
u3_pier_work_save(u3_pier* pir_u) _pier_work_save(u3_pier* pir_u)
{ {
u3_lord* god_u = pir_u->god_u; u3_lord* god_u = pir_u->god_u;
u3_disk* log_u = pir_u->log_u;
u3_save* sav_u = pir_u->sav_u; u3_save* sav_u = pir_u->sav_u;
if ( god_u->dun_d > sav_u->ent_d ) { c3_assert( god_u->dun_d == sav_u->req_d );
u3_newt_write(&god_u->inn_u, u3ke_jam(u3nc(c3__save, 0)), 0); c3_assert( log_u->com_d >= god_u->dun_d );
sav_u->ent_d = god_u->dun_d;
{
u3_noun mat = u3ke_jam(u3nc(c3__save, u3i_chubs(1, &god_u->dun_d)));
u3_newt_write(&god_u->inn_u, mat, 0);
// XX wait on some report of success before updating?
//
sav_u->dun_d = sav_u->req_d;
}
// if we're gracefully shutting down, do so now
//
if ( u3_psat_done == pir_u->sat_e ) {
_pier_exit_done(pir_u);
}
}
/* u3_pier_snap(): request snapshot
*/
void
u3_pier_snap(u3_pier* pir_u)
{
u3_lord* god_u = pir_u->god_u;
u3_disk* log_u = pir_u->log_u;
u3_save* sav_u = pir_u->sav_u;
c3_d top_d = c3_max(god_u->sen_d, god_u->dun_d);
// no-op if there are no un-snapshot'ed events
//
if ( top_d > sav_u->dun_d ) {
sav_u->req_d = top_d;
// save eagerly if all computed events are already committed
//
if ( log_u->com_d >= top_d ) {
_pier_work_save(pir_u);
}
}
// if we're gracefully shutting down, do so now
//
else if ( u3_psat_done == pir_u->sat_e ) {
_pier_exit_done(pir_u);
} }
} }
@ -462,6 +512,7 @@ _pier_apply(u3_pier* pir_u)
{ {
u3_disk* log_u = pir_u->log_u; u3_disk* log_u = pir_u->log_u;
u3_lord* god_u = pir_u->god_u; u3_lord* god_u = pir_u->god_u;
u3_save* sav_u = pir_u->sav_u;
if ( (0 == log_u) || if ( (0 == log_u) ||
(0 == god_u) || (0 == god_u) ||
@ -479,11 +530,12 @@ start:
*/ */
wit_u = pir_u->ext_u; wit_u = pir_u->ext_u;
while ( wit_u ) { while ( wit_u ) {
/* if writ is (a) next in line to compute, and (b) worker is inactive, /* if writ is (a) next in line to compute, (b) worker is inactive,
** request computation ** and (c) a snapshot has not been requested, request computation
*/ */
if ( (wit_u->evt_d == (1 + god_u->sen_d)) && if ( (wit_u->evt_d == (1 + god_u->sen_d)) &&
(god_u->sen_d == god_u->dun_d) ) (god_u->sen_d == god_u->dun_d) &&
(sav_u->dun_d == sav_u->req_d) )
{ {
_pier_work_compute(wit_u); _pier_work_compute(wit_u);
act_o = c3y; act_o = c3y;
@ -1559,19 +1611,33 @@ u3_pier_discover(u3_pier* pir_u,
_pier_apply(pir_u); _pier_apply(pir_u);
} }
/* _pier_exit_done(): synchronously shutting down
*/
static void
_pier_exit_done(u3_pier* pir_u)
{
fprintf(stderr, "pier: exit\r\n");
_pier_work_shutdown(pir_u);
_pier_loop_exit(pir_u);
// XX uninstall pier from u3K.tab_u, dispose
// XX no can do
//
uv_stop(u3L);
}
/* u3_pier_exit(): trigger a gentle shutdown. /* u3_pier_exit(): trigger a gentle shutdown.
*/ */
void void
u3_pier_exit(u3_pier* pir_u) u3_pier_exit(u3_pier* pir_u)
{ {
fprintf(stderr, "pier: exit\r\n"); pir_u->sat_e = u3_psat_done;
u3_pier_work_save(pir_u);
_pier_work_shutdown(pir_u);
_pier_loop_exit(pir_u);
// XX no can do // XX must wait for callback confirming
// //
uv_stop(u3L); u3_pier_snap(pir_u);
} }
/* u3_pier_send(): modern send with target and path. /* u3_pier_send(): modern send with target and path.
@ -1759,7 +1825,7 @@ static void
_pier_boot_complete(u3_pier* pir_u) _pier_boot_complete(u3_pier* pir_u)
{ {
if ( u3_psat_init != pir_u->sat_e ) { if ( u3_psat_init != pir_u->sat_e ) {
u3_pier_work_save(pir_u); u3_pier_snap(pir_u);
} }
if ( u3_psat_boot == pir_u->sat_e ) { if ( u3_psat_boot == pir_u->sat_e ) {

View File

@ -17,7 +17,7 @@ static void
_save_time_cb(uv_timer_t* tim_u) _save_time_cb(uv_timer_t* tim_u)
{ {
u3_pier *pir_u = tim_u->data; u3_pier *pir_u = tim_u->data;
u3_pier_work_save(pir_u); u3_pier_snap(pir_u);
} }
/* u3_save_ef_chld(): report save termination. /* u3_save_ef_chld(): report save termination.
@ -49,7 +49,8 @@ u3_save_io_init(u3_pier *pir_u)
{ {
u3_save* sav_u = pir_u->sav_u; u3_save* sav_u = pir_u->sav_u;
sav_u->ent_d = 0; sav_u->req_d = 0;
sav_u->dun_d = 0;
sav_u->pid_w = 0; sav_u->pid_w = 0;
sav_u->tim_u.data = pir_u; sav_u->tim_u.data = pir_u;