worker: adds compaction on memory pressure and bail:meme

This commit is contained in:
Joe Bryan 2019-11-21 23:12:05 -08:00
parent 66d88ae5e6
commit 4e0e0b4b31

View File

@ -319,6 +319,62 @@ _worker_grab(u3_noun sac, u3_noun ovo, u3_noun vir)
}
}
/* _worker_static_grab(): garbage collect, checking for profiling. RETAIN.
*/
static void
_worker_static_grab(void)
{
c3_assert( u3R == &(u3H->rod_u) );
fprintf(stderr, "work: measuring memory:\r\n");
u3a_print_memory(stderr, "total marked", u3m_mark(stderr));
u3a_print_memory(stderr, "free lists", u3a_idle(u3R));
u3a_print_memory(stderr, "sweep", u3a_sweep());
fprintf(stderr, "\r\n");
fflush(stderr);
}
/* _worker_pack(): deduplicate and compact memory
*/
static void
_worker_pack(void)
{
_worker_static_grab();
u3l_log("work: compacting loom\r\n");
if ( c3n == u3m_rock_stay(u3V.dir_c, u3V.dun_d) ) {
u3l_log("work: unable to jam state\r\n");
return;
}
if ( c3n == u3e_hold() ) {
u3l_log("work: unable to backup checkpoint\r\n");
return;
}
u3m_wipe();
if ( c3n == u3m_rock_load(u3V.dir_c, u3V.dun_d) ) {
u3l_log("work: compaction failed, restoring checkpoint\r\n");
if ( c3n == u3e_fall() ) {
fprintf(stderr, "work: unable to restore checkpoint\r\n");
c3_assert(0);
}
}
if ( c3n == u3e_drop() ) {
u3l_log("work: warning: orphaned backup checkpoint file\r\n");
}
if ( c3n == u3m_rock_drop(u3V.dir_c, u3V.dun_d) ) {
u3l_log("work: warning: orphaned state file\r\n");
}
u3l_log("work: compacted loom\r\n");
_worker_static_grab();
}
/* _worker_fail(): failure stub.
*/
static void
@ -376,10 +432,14 @@ _worker_send_slog(u3_noun hod)
/* _worker_lame(): event failed, replace with error event.
*/
static void
_worker_lame(c3_d evt_d, u3_noun now, u3_noun ovo, u3_noun why, u3_noun tan)
_worker_lame(u3_noun now, u3_noun ovo, u3_noun why, u3_noun tan)
{
u3_noun rep;
u3_noun wir, tag, cad;
c3_o pac_o = c3n;
c3_d evt_d = u3V.sen_d;
u3V.sen_d = u3V.dun_d;
u3x_trel(ovo, &wir, &tag, &cad);
@ -425,22 +485,27 @@ _worker_lame(c3_d evt_d, u3_noun now, u3_noun ovo, u3_noun why, u3_noun tan)
rep = u3nc(u3k(wir), u3nt(c3__crud, u3k(tag), nat));
}
pac_o = _(c3__meme == why);
_worker_send_replace(evt_d, u3nc(now, rep));
u3z(ovo); u3z(why); u3z(tan);
// XX review, always pack on meme?
//
if ( c3y == pac_o ) {
_worker_pack();
}
}
/* _worker_sure(): event succeeded, report completion.
/* _worker_sure_feck(): event succeeded, send effects.
*/
static void
_worker_sure(u3_noun ovo, u3_noun vir, u3_noun cor)
_worker_sure_feck(u3_noun ovo, u3_noun vir, c3_w pre_w)
{
u3z(u3A->roc);
u3A->roc = cor;
u3A->ent_d = u3V.dun_d;
u3V.mug_l = u3r_mug(u3A->roc);
u3_noun sac = u3_nul;
c3_o pac_o = c3n;
c3_o rec_o = c3n;
// intercept |mass, observe |reset
//
@ -472,7 +537,7 @@ _worker_sure(u3_noun ovo, u3_noun vir, u3_noun cor)
// reclaim memory from persistent caches on |reset
//
if ( c3__vega == u3h(fec) ) {
u3m_reclaim();
rec_o = c3y;
}
riv = u3t(riv);
@ -480,12 +545,86 @@ _worker_sure(u3_noun ovo, u3_noun vir, u3_noun cor)
}
}
// after a successful event, we check for memory pressure.
//
// if we've exceeded either of two thresholds, we reclaim
// from our persistent caches, and notify the daemon
// (via a "fake" effect) that arvo should trim state
// (trusting that the daemon will enqueue an appropriate event).
// For future flexibility, the urgency of the notification is represented
// by a *decreasing* number: 0 is maximally urgent, 1 less so, &c.
//
// high-priority: 2^22 contiguous words remaining (~8 MB)
// low-priority: 2^27 contiguous words remaining (~536 MB)
// XX maybe use 2^23 (~16 MB) and 2^26 (~268 MB?
//
{
u3_noun pri = u3_none;
c3_w pos_w = u3a_open(u3R);
c3_w low_w = (1 << 27);
c3_w hig_w = (1 << 22);
if ( (pre_w > low_w) && !(pos_w > low_w) ) {
// XX set flag(s) in u3V so we don't repeat endlessly?
// XX pack here too?
//
pac_o = c3y;
rec_o = c3y;
pri = 1;
}
else if ( (pre_w > hig_w) && !(pos_w > hig_w) ) {
// XX we should probably jam/cue our entire state at this point
//
pac_o = c3y;
rec_o = c3y;
pri = 0;
}
// reclaim memory from persistent caches periodically
//
// XX this is a hack to work two things
// - bytecode caches grow rapidly and can't be simply capped
// - we don't make very effective use of our free lists
//
else {
rec_o = _(0 == (u3V.dun_d % 1000ULL));
}
// notify daemon of memory pressure via "fake" effect
//
if ( u3_none != pri ) {
u3_noun cad = u3nc(u3nt(u3_blip, c3__arvo, u3_nul),
u3nc(c3__trim, pri));
vir = u3nc(cad, vir);
}
}
if ( c3y == rec_o ) {
u3m_reclaim();
}
// XX this runs on replay too
//
_worker_grab(sac, ovo, vir);
_worker_send_complete(vir);
u3z(sac); u3z(ovo);
if ( c3y == pac_o ) {
_worker_pack();
}
}
/* _worker_sure_core(): event succeeded, save state.
*/
static void
_worker_sure_core(u3_noun cor)
{
u3V.dun_d = u3V.sen_d;
u3z(u3A->roc);
u3A->roc = cor;
u3A->ent_d = u3V.dun_d;
u3V.mug_l = u3r_mug(u3A->roc);
}
/* _worker_work_live(): apply event.
@ -539,7 +678,6 @@ _worker_work_live(c3_d evt_d, u3_noun job)
// event rejected
//
if ( u3_blip != u3h(gon) ) {
u3V.sen_d = u3V.dun_d;
// restore previous time
//
u3_noun nex = u3A->now;
@ -551,13 +689,11 @@ _worker_work_live(c3_d evt_d, u3_noun job)
u3k(ovo); u3k(why); u3k(tan);
u3z(gon); u3z(job);
_worker_lame(evt_d, nex, ovo, why, tan);
_worker_lame(nex, ovo, why, tan);
}
// event accepted
//
else {
c3_o rec_o = c3n;
// vir/(list ovum) list of effects
// cor/arvo arvo core
//
@ -567,66 +703,8 @@ _worker_work_live(c3_d evt_d, u3_noun job)
u3k(ovo); u3k(vir); u3k(cor);
u3z(gon); u3z(job); u3z(last_date);
u3V.dun_d = u3V.sen_d;
// after a successful event, we check for memory pressure.
//
// if we've exceeded either of two thresholds, we reclaim
// from our persistent caches, and notify the daemon
// (via a "fake" effect) that arvo should trim state
// (trusting that the daemon will enqueue an appropriate event).
// For future flexibility, the urgency of the notification is represented
// by a *decreasing* number: 0 is maximally urgent, 1 less so, &c.
//
// high-priority: 2^22 contiguous words remaining (~8 MB)
// low-priority: 2^27 contiguous words remaining (~536 MB)
// XX maybe use 2^23 (~16 MB) and 2^26 (~268 MB?
//
// XX refactor: we should measure memory after losing the old kernel
//
{
u3_noun pri = u3_none;
c3_w pos_w = u3a_open(u3R);
c3_w low_w = (1 << 27);
c3_w hig_w = (1 << 22);
if ( (pre_w > low_w) && !(pos_w > low_w) ) {
// XX set flag in u3V so we don't repeat endlessly?
//
rec_o = c3y;
pri = 1;
}
else if ( (pre_w > hig_w) && !(pos_w > hig_w) ) {
// XX we should probably jam/cue our entire state at this point
//
rec_o = c3y;
pri = 0;
}
// reclaim memory from persistent caches periodically
//
// XX this is a hack to work two things
// - bytecode caches grow rapidly and can't be simply capped
// - we don't make very effective use of our free lists
//
else {
rec_o = _(0 == (evt_d % 1000ULL));
}
// notify daemon of memory pressure via "fake" effect
//
if ( u3_none != pri ) {
u3_noun cad = u3nc(u3nt(u3_blip, c3__arvo, u3_nul),
u3nc(c3__trim, pri));
vir = u3nc(cad, vir);
}
}
_worker_sure(ovo, vir, cor);
if ( c3y == rec_o ) {
u3m_reclaim();
}
_worker_sure_core(cor);
_worker_sure_feck(ovo, vir, pre_w);
}
}
@ -894,21 +972,6 @@ _worker_poke(void* vod_p, u3_noun mat)
}
}
/* _worker_static_grab(): garbage collect, checking for profiling. RETAIN.
*/
static void
_worker_static_grab(void)
{
c3_assert( u3R == &(u3H->rod_u) );
fprintf(stderr, "work: measuring memory:\r\n");
u3a_print_memory(stderr, "total marked", u3m_mark(stderr));
u3a_print_memory(stderr, "free lists", u3a_idle(u3R));
u3a_print_memory(stderr, "sweep", u3a_sweep());
fprintf(stderr, "\r\n");
fflush(stderr);
}
/* u3_worker_boot(): send startup message to manager.
*/
void