From 69eeb6309efdeea8c56d06e600aebd0044255244 Mon Sep 17 00:00:00 2001 From: Joe Bryan Date: Tue, 21 Jul 2020 14:09:47 -0700 Subject: [PATCH 1/4] vere: fixes driver exit in u3_pier_bail() --- pkg/urbit/vere/pier.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/urbit/vere/pier.c b/pkg/urbit/vere/pier.c index ec7668990..fb2c220a9 100644 --- a/pkg/urbit/vere/pier.c +++ b/pkg/urbit/vere/pier.c @@ -1666,7 +1666,9 @@ u3_pier_bail(u3_pier* pir_u) // exig i/o drivers // - if ( pir_u->wok_u ) { + if ( (u3_psat_work == pir_u->sat_e) + && pir_u->wok_u ) + { _pier_work_close(pir_u->wok_u); pir_u->wok_u = 0; } From 2994e0e44e9e43faf2c2e67418e5785a6bcdbb34 Mon Sep 17 00:00:00 2001 From: Joe Bryan Date: Tue, 21 Jul 2020 14:08:49 -0700 Subject: [PATCH 2/4] vere: wires |pack to new compaction; renames old |pack to |cram --- pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs | 16 +++++---- pkg/urbit/include/c/motes.h | 1 + pkg/urbit/include/vere/serf.h | 4 +-- pkg/urbit/include/vere/vere.h | 15 ++++---- pkg/urbit/vere/lord.c | 27 ++++++++++---- pkg/urbit/vere/pier.c | 30 ++++++++-------- pkg/urbit/worker/main.c | 2 +- pkg/urbit/worker/serf.c | 37 +++++++++++++------- 8 files changed, 81 insertions(+), 51 deletions(-) diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs index f8d8b4d47..418284b58 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs @@ -19,9 +19,10 @@ +$ gang (unit (set ship)) +$ writ $% $: %live - $% [%exit cod=@] + $% [%cram eve=@] + [%exit cod=@] [%save eve=@] - [%pack eve=@] + [%pack ~] == == [%peek mil=@ now=@da lyc=gang pat=path] [%play eve=@ lit=(list ?((pair @da ovum) *))] @@ -107,7 +108,8 @@ data Serf = Serf data Live = LExit Atom -- exit status code | LSave EventId - | LPack EventId + | LCram EventId + | LPack () deriving (Show) data Play @@ -260,9 +262,9 @@ sendSnapshotRequest serf eve = do sendWrit serf (WLive $ LSave eve) recvLive serf -sendCompactionRequest :: Serf -> EventId -> IO () -sendCompactionRequest serf eve = do - sendWrit serf (WLive $ LPack eve) +sendCompactionRequest :: Serf -> IO () +sendCompactionRequest serf = do + sendWrit serf (WLive $ LPack ()) recvLive serf sendScryRequest :: Serf -> Wen -> Gang -> Path -> IO (Maybe (Term, Noun)) @@ -410,7 +412,7 @@ snapshot serf = withSerfLockIO serf $ \ss -> do -} compact :: Serf -> IO () compact serf = withSerfLockIO serf $ \ss -> do - sendCompactionRequest serf (ssLast ss) + sendCompactionRequest serf pure (ss, ()) {-| diff --git a/pkg/urbit/include/c/motes.h b/pkg/urbit/include/c/motes.h index 516b2d268..8f9f04013 100644 --- a/pkg/urbit/include/c/motes.h +++ b/pkg/urbit/include/c/motes.h @@ -248,6 +248,7 @@ # define c3__cow c3_s3('c','o','w') # define c3__cpu c3_s3('c','p','u') # define c3__crad c3_s4('c','r','a','d') +# define c3__cram c3_s4('c','r','a','m') # define c3__crap c3_s4('c','r','a','p') # define c3__cret c3_s4('c','r','e','t') # define c3__crib c3_s4('c','r','i','b') diff --git a/pkg/urbit/include/vere/serf.h b/pkg/urbit/include/vere/serf.h index b1a1fdde3..b931a761a 100644 --- a/pkg/urbit/include/vere/serf.h +++ b/pkg/urbit/include/vere/serf.h @@ -24,10 +24,10 @@ u3_noun u3_serf_init(u3_serf* sef_u); - /* u3_serf_unpack(): initialize from rock at [eve_d]. + /* u3_serf_uncram(): initialize from rock at [eve_d]. */ void - u3_serf_unpack(u3_serf* sef_u, c3_d eve_d); + u3_serf_uncram(u3_serf* sef_u, c3_d eve_d); /* u3_serf_writ(): apply writ [wit], producing plea [*pel] on c3y. */ diff --git a/pkg/urbit/include/vere/vere.h b/pkg/urbit/include/vere/vere.h index d86f21cd4..de8379d3b 100644 --- a/pkg/urbit/include/vere/vere.h +++ b/pkg/urbit/include/vere/vere.h @@ -398,8 +398,9 @@ u3_writ_peek = 1, u3_writ_play = 2, u3_writ_save = 3, - u3_writ_pack = 4, - u3_writ_exit = 5 + u3_writ_cram = 4, + u3_writ_pack = 5, + u3_writ_exit = 6 } u3_writ_type; /* u3_writ: ipc message from king to serf @@ -433,7 +434,7 @@ void (*work_done_f)(void*, u3_ovum*, u3_fact*, u3_gift*); void (*work_bail_f)(void*, u3_ovum*, u3_noun lud); void (*save_f)(void*); - void (*pack_f)(void*); + void (*cram_f)(void*); void (*bail_f)(void*); void (*exit_f)(void*); } u3_lord_cb; @@ -942,10 +943,10 @@ c3_o u3_lord_save(u3_lord* god_u); - /* u3_lord_pack(): save portable state. + /* u3_lord_cram(): save portable state. */ c3_o - u3_lord_pack(u3_lord* god_u); + u3_lord_cram(u3_lord* god_u); /* u3_lord_work(): attempt work. */ @@ -1215,10 +1216,10 @@ c3_o u3_pier_save(u3_pier* pir_u); - /* u3_pier_pack(): save a portable snapshot. + /* u3_pier_cram(): save a portable snapshot. */ c3_o - u3_pier_pack(u3_pier* pir_u); + u3_pier_cram(u3_pier* pir_u); /* u3_pier_info(): print status info. */ diff --git a/pkg/urbit/vere/lord.c b/pkg/urbit/vere/lord.c index ac1be4b8a..73d9e41af 100644 --- a/pkg/urbit/vere/lord.c +++ b/pkg/urbit/vere/lord.c @@ -25,9 +25,10 @@ :: +$ writ $% $: %live - $% [%exit cod=@] + $% [%cram eve=@] + [%exit cod=@] [%save eve=@] - [%pack eve=@] + [%pack ~] == == [%peek mil=@ now=@da lyc=gang pat=path] [%play eve=@ lit=(list ?((pair @da ovum) *))] @@ -108,6 +109,7 @@ _lord_writ_free(u3_writ* wit_u) } break; case u3_writ_save: + case u3_writ_cram: case u3_writ_pack: case u3_writ_exit: { } break; @@ -196,6 +198,7 @@ _lord_writ_str(u3_writ_type typ_e) case u3_writ_peek: return "peek"; case u3_writ_play: return "play"; case u3_writ_save: return "save"; + case u3_writ_cram: return "cram"; case u3_writ_pack: return "pack"; case u3_writ_exit: return "exit"; } @@ -258,8 +261,14 @@ _lord_plea_live(u3_lord* god_u, u3_noun dat) god_u->cb_u.save_f(god_u->cb_u.ptr_v); } break; + case u3_writ_cram: { + god_u->cb_u.cram_f(god_u->cb_u.ptr_v); + } break; + case u3_writ_pack: { - god_u->cb_u.pack_f(god_u->cb_u.ptr_v); + // XX wire into cb + // + u3l_log("pier: pack complete\n"); } break; } @@ -738,8 +747,12 @@ _lord_writ_jam(u3_lord* god_u, u3_writ* wit_u) msg = u3nt(c3__live, c3__save, u3i_chubs(1, &god_u->eve_d)); } break; + case u3_writ_cram: { + msg = u3nt(c3__live, c3__cram, u3i_chubs(1, &god_u->eve_d)); + } break; + case u3_writ_pack: { - msg = u3nt(c3__live, c3__pack, u3i_chubs(1, &god_u->eve_d)); + msg = u3nt(c3__live, c3__pack, u3_nul); } break; case u3_writ_exit: { @@ -938,17 +951,17 @@ u3_lord_save(u3_lord* god_u) } } -/* u3_lord_pack(): save portable state. +/* u3_lord_cram(): save portable state. */ c3_o -u3_lord_pack(u3_lord* god_u) +u3_lord_cram(u3_lord* god_u) { if ( god_u->dep_w ) { return c3n; } else { u3_writ* wit_u = _lord_writ_new(god_u); - wit_u->typ_e = u3_writ_pack; + wit_u->typ_e = u3_writ_cram; _lord_writ_plan(god_u, wit_u); return c3y; } diff --git a/pkg/urbit/vere/pier.c b/pkg/urbit/vere/pier.c index fb2c220a9..9c66b5315 100644 --- a/pkg/urbit/vere/pier.c +++ b/pkg/urbit/vere/pier.c @@ -597,8 +597,8 @@ _pier_play(u3_play* pay_u) // XX temporary hack // - u3l_log("pier: replay barrier reached, packing\r\n"); - u3_pier_pack(pir_u); + u3l_log("pier: replay barrier reached, cramming\r\n"); + u3_pier_cram(pir_u); } else if ( pay_u->eve_d == log_u->dun_d ) { _pier_work_init(pir_u); @@ -868,21 +868,21 @@ _pier_on_lord_save(void* ptr_v) // _pier_next(pir_u); } -/* _pier_on_lord_pack(): worker state-export complete (portable snapshot). +/* _pier_on_lord_cram(): worker state-export complete (portable snapshot). */ static void -_pier_on_lord_pack(void* ptr_v) +_pier_on_lord_cram(void* ptr_v) { u3_pier* pir_u = ptr_v; #ifdef VERBOSE_PIER - fprintf(stderr, "pier: (%" PRIu64 "): lord: pack\r\n", pir_u->god_u->eve_d); + fprintf(stderr, "pier: (%" PRIu64 "): lord: cram\r\n", pir_u->god_u->eve_d); #endif // XX temporary hack // if ( u3_psat_play == pir_u->sat_e ) { - u3l_log("pier: pack complete, shutting down\r\n"); + u3l_log("pier: cram complete, shutting down\r\n"); u3_pier_bail(pir_u); exit(0); } @@ -1189,7 +1189,7 @@ _pier_init(c3_w wag_w, c3_c* pax_c) .work_done_f = _pier_on_lord_work_done, .work_bail_f = _pier_on_lord_work_bail, .save_f = _pier_on_lord_save, - .pack_f = _pier_on_lord_pack, + .cram_f = _pier_on_lord_cram, .bail_f = _pier_on_lord_bail, .exit_f = _pier_on_lord_exit }; @@ -1469,33 +1469,33 @@ u3_pier_save(u3_pier* pir_u) } static void -_pier_pack_cb(void* ptr_v, c3_d eve_d) +_pier_cram_cb(void* ptr_v, c3_d eve_d) { u3_pier* pir_u = ptr_v; #ifdef VERBOSE_PIER - fprintf(stderr, "pier: (%" PRIu64 "): snap: send at %" PRIu64 "\r\n", pir_u->god_u->eve_d, eve_d); + fprintf(stderr, "pier: (%" PRIu64 "): cram: send at %" PRIu64 "\r\n", pir_u->god_u->eve_d, eve_d); #endif - u3_lord_pack(pir_u->god_u); + u3_lord_cram(pir_u->god_u); } -/* u3_pier_pack(): save a portable snapshot. +/* u3_pier_cram(): save a portable snapshot. */ c3_o -u3_pier_pack(u3_pier* pir_u) +u3_pier_cram(u3_pier* pir_u) { #ifdef VERBOSE_PIER - fprintf(stderr, "pier: (%" PRIu64 "): snap: plan\r\n", pir_u->god_u->eve_d); + fprintf(stderr, "pier: (%" PRIu64 "): cram: plan\r\n", pir_u->god_u->eve_d); #endif if ( u3_psat_play == pir_u->sat_e ) { - u3_lord_pack(pir_u->god_u); + u3_lord_cram(pir_u->god_u); return c3y; } if ( u3_psat_work == pir_u->sat_e ) { - _pier_wall_plan(pir_u, 0, pir_u, _pier_pack_cb); + _pier_wall_plan(pir_u, 0, pir_u, _pier_cram_cb); return c3y; } diff --git a/pkg/urbit/worker/main.c b/pkg/urbit/worker/main.c index e5d47c96d..c7895a8cd 100644 --- a/pkg/urbit/worker/main.c +++ b/pkg/urbit/worker/main.c @@ -175,7 +175,7 @@ main(c3_i argc, c3_c* argv[]) u3V.sen_d = u3V.dun_d = u3m_boot(dir_c); if ( eve_d ) { - u3_serf_unpack(&u3V, eve_d); + u3_serf_uncram(&u3V, eve_d); } } diff --git a/pkg/urbit/worker/serf.c b/pkg/urbit/worker/serf.c index e18bae794..315f7e05f 100644 --- a/pkg/urbit/worker/serf.c +++ b/pkg/urbit/worker/serf.c @@ -31,9 +31,10 @@ :: +$ writ $% $: %live - $% [%exit cod=@] + $% [%cram eve=@] + [%exit cod=@] [%save eve=@] - [%pack eve=@] + [%pack ~] == == [%peek mil=@ now=@da lyc=gang pat=path] [%play eve=@ lit=(list ?((pair @da ovum) *))] @@ -274,10 +275,10 @@ _serf_static_grab(void) fflush(stderr); } -/* _serf_pack(): deduplicate and compact memory +/* _serf_cram(): deduplicate and compact memory. ORPHANED */ static void -_serf_pack(u3_serf* sef_u) +_serf_cram(u3_serf* sef_u) { _serf_static_grab(); @@ -288,7 +289,7 @@ _serf_pack(u3_serf* sef_u) return; } - u3_serf_unpack(sef_u, sef_u->dun_d); + u3_serf_uncram(sef_u, sef_u->dun_d); u3l_log("serf (%" PRIu64 "): compacted loom\r\n", sef_u->dun_d); @@ -313,7 +314,7 @@ u3_serf_post(u3_serf* sef_u) } if ( c3y == sef_u->pac_o ) { - _serf_pack(sef_u); + u3a_compact(); sef_u->pac_o = c3n; } } @@ -907,7 +908,6 @@ c3_o u3_serf_live(u3_serf* sef_u, u3_noun com, u3_noun* ret) { u3_noun tag, dat; - c3_o ret_o; // refcounts around snapshots require special handling // @@ -938,9 +938,9 @@ u3_serf_live(u3_serf* sef_u, u3_noun com, u3_noun* ret) return c3y; } - // NB: the %pack $writ only saves the rock, it doesn't load it + // NB: the %cram $writ only saves the rock, it doesn't load it // - case c3__pack: { + case c3__cram: { c3_d eve_d; if ( c3n == u3r_safe_chub(dat, &eve_d) ) { @@ -951,7 +951,7 @@ u3_serf_live(u3_serf* sef_u, u3_noun com, u3_noun* ret) u3z(com); if( eve_d != sef_u->dun_d ) { - fprintf(stderr, "serf (%" PRIu64 "): pack failed: %" PRIu64 "\r\n", + fprintf(stderr, "serf (%" PRIu64 "): cram failed: %" PRIu64 "\r\n", sef_u->dun_d, eve_d); return c3n; @@ -970,6 +970,19 @@ u3_serf_live(u3_serf* sef_u, u3_noun com, u3_noun* ret) return c3y; } + case c3__pack: { + if ( u3_nul != dat ) { + u3z(com); + return c3n; + } + else { + u3z(com); + u3a_compact(); + *ret = u3nc(c3__live, u3_nul); + return c3y; + } + } + case c3__save: { c3_d eve_d; @@ -1095,10 +1108,10 @@ _serf_ripe(u3_serf* sef_u) return u3nc(u3i_chubs(1, &sef_u->dun_d), sef_u->mug_l); } -/* u3_serf_unpack(): initialize from rock at [eve_d]. +/* u3_serf_uncram(): initialize from rock at [eve_d]. */ void -u3_serf_unpack(u3_serf* sef_u, c3_d eve_d) +u3_serf_uncram(u3_serf* sef_u, c3_d eve_d) { c3_o roc_o; c3_c nam_c[8193]; From 7d243771c0a1cc5afbbe73c0b162418670735815 Mon Sep 17 00:00:00 2001 From: Joe Bryan Date: Tue, 21 Jul 2020 14:23:21 -0700 Subject: [PATCH 3/4] serf: restores autopack in response to memory pressure --- pkg/urbit/worker/serf.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/urbit/worker/serf.c b/pkg/urbit/worker/serf.c index 315f7e05f..55f12da8c 100644 --- a/pkg/urbit/worker/serf.c +++ b/pkg/urbit/worker/serf.c @@ -393,10 +393,12 @@ _serf_sure_feck(u3_serf* sef_u, c3_w pre_w, u3_noun vir) if ( (pre_w > low_w) && !(pos_w > low_w) ) { // XX set flag(s) in u3V so we don't repeat endlessly? // + pac_o = c3y; rec_o = c3y; pri = 1; } else if ( (pre_w > hig_w) && !(pos_w > hig_w) ) { + pac_o = c3y; rec_o = c3y; pri = 0; } From a580f964166cd41bd618529a5982b8d84c2ac4e6 Mon Sep 17 00:00:00 2001 From: Joe Bryan Date: Tue, 21 Jul 2020 16:53:45 -0700 Subject: [PATCH 4/4] serf: adds subcommands to urbit-worker --- pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs | 2 +- pkg/urbit/include/vere/serf.h | 5 + pkg/urbit/vere/lord.c | 17 +- pkg/urbit/worker/main.c | 239 ++++++++++++++++--- pkg/urbit/worker/serf.c | 14 +- 5 files changed, 223 insertions(+), 54 deletions(-) diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs index 418284b58..3b0d5b140 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs @@ -305,7 +305,7 @@ start (Config exePax pierPath flags onSlog onStdr onDead) = do config = show (compileFlags flags) rock = "0" -- XX support loading from rock cache = "50000" -- XX support memo-cache size - args = [pierPath, diskKey, config, cache, rock] + args = ["serf", pierPath, diskKey, config, cache, rock] pSpec = (proc exePax args) { std_in = CreatePipe , std_out = CreatePipe , std_err = CreatePipe diff --git a/pkg/urbit/include/vere/serf.h b/pkg/urbit/include/vere/serf.h index b931a761a..12647dbb6 100644 --- a/pkg/urbit/include/vere/serf.h +++ b/pkg/urbit/include/vere/serf.h @@ -58,3 +58,8 @@ */ void u3_serf_post(u3_serf* sef_u); + + /* u3_serf_grab(): garbage collect. + */ + void + u3_serf_grab(void); diff --git a/pkg/urbit/vere/lord.c b/pkg/urbit/vere/lord.c index 73d9e41af..fef5584c5 100644 --- a/pkg/urbit/vere/lord.c +++ b/pkg/urbit/vere/lord.c @@ -1066,7 +1066,7 @@ u3_lord_init(c3_c* pax_c, c3_w wag_w, c3_d key_d[4], u3_lord_cb cb_u) // spawn new process and connect to it // { - c3_c* arg_c[7]; + c3_c* arg_c[8]; c3_c key_c[256]; c3_c wag_c[11]; c3_c hap_c[11]; @@ -1083,21 +1083,22 @@ u3_lord_init(c3_c* pax_c, c3_w wag_w, c3_d key_d[4], u3_lord_cb cb_u) sprintf(hap_c, "%u", u3_Host.ops_u.hap_w); arg_c[0] = god_u->bin_c; // executable - arg_c[1] = god_u->pax_c; // path to checkpoint directory - arg_c[2] = key_c; // disk key - arg_c[3] = wag_c; // runtime config - arg_c[4] = hap_c; // hash table size + arg_c[1] = "serf"; // protocol + arg_c[2] = god_u->pax_c; // path to checkpoint directory + arg_c[3] = key_c; // disk key + arg_c[4] = wag_c; // runtime config + arg_c[5] = hap_c; // hash table size if ( u3_Host.ops_u.roc_c ) { // XX validate // - arg_c[5] = u3_Host.ops_u.roc_c; + arg_c[6] = u3_Host.ops_u.roc_c; } else { - arg_c[5] = "0"; + arg_c[6] = "0"; } - arg_c[6] = 0; + arg_c[7] = 0; uv_pipe_init(u3L, &god_u->inn_u.pyp_u, 0); uv_timer_init(u3L, &god_u->out_u.tim_u); diff --git a/pkg/urbit/worker/main.c b/pkg/urbit/worker/main.c index c7895a8cd..60558aff1 100644 --- a/pkg/urbit/worker/main.c +++ b/pkg/urbit/worker/main.c @@ -26,51 +26,51 @@ static u3_serf u3V; // one serf per process static u3_moat inn_u; // input stream static u3_mojo out_u; // output stream -/* _newt_fail(): failure stub. +/* _cw_serf_fail(): failure stub. */ static void -_newt_fail(void* vod_p, const c3_c* wut_c) +_cw_serf_fail(void* vod_p, const c3_c* wut_c) { fprintf(stderr, "serf: fail: %s\r\n", wut_c); exit(1); } -/* _newt_send(): send plea back to daemon. +/* _cw_serf_send(): send plea back to daemon. */ static void -_newt_send(u3_noun pel) +_cw_serf_send(u3_noun pel) { u3_newt_write(&out_u, u3ke_jam(pel)); } -/* _newt_send_slog(): send hint output (hod is [priority tank]). +/* _cw_serf_send_slog(): send hint output (hod is [priority tank]). */ static void -_newt_send_slog(u3_noun hod) +_cw_serf_send_slog(u3_noun hod) { - _newt_send(u3nc(c3__slog, hod)); + _cw_serf_send(u3nc(c3__slog, hod)); } -/* _newt_send_stdr(): send stderr output +/* _cw_serf_send_stdr(): send stderr output */ static void -_newt_send_stdr(c3_c* str_c) +_cw_serf_send_stdr(c3_c* str_c) { - _newt_send_slog(u3nc(0, u3i_string(str_c))); + _cw_serf_send_slog(u3nc(0, u3i_string(str_c))); } -/* _newt_writ(): +/* _cw_serf_writ(): */ static void -_newt_writ(void* vod_p, u3_noun mat) +_cw_serf_writ(void* vod_p, u3_noun mat) { u3_noun ret; if ( c3n == u3_serf_writ(&u3V, u3ke_cue(mat), &ret) ) { - _newt_fail(0, "bad jar"); + _cw_serf_fail(0, "bad jar"); } else { - _newt_send(ret); + _cw_serf_send(ret); // all references must now be counted, and all roots recorded // @@ -78,10 +78,10 @@ _newt_writ(void* vod_p, u3_noun mat) } } -/* main(): main() when run as urbit-worker +/* _cw_serf_stdio(): fix up std io handles */ -c3_i -main(c3_i argc, c3_c* argv[]) +static void +_cw_serf_stdio(c3_i* inn_i, c3_i* out_i) { // the serf is spawned with [FD 0] = events and [FD 1] = effects // we dup [FD 0 & 1] so we don't accidently use them for something else @@ -89,22 +89,34 @@ main(c3_i argc, c3_c* argv[]) // we replace [FD 1] (stdout) with a dup of [FD 2] (stderr) // c3_i nul_i = open("/dev/null", O_RDWR, 0); - c3_i inn_i = dup(0); - c3_i out_i = dup(1); + + *inn_i = dup(0); + *out_i = dup(1); + dup2(nul_i, 0); dup2(2, 1); - close(nul_i); - c3_assert( 6 == argc ); + close(nul_i); +} + +/* _cw_serf_commence(); initialize and run serf +*/ +static void +_cw_serf_commence(c3_i argc, c3_c* argv[]) +{ + c3_i inn_i, out_i; + _cw_serf_stdio(&inn_i, &out_i); + + c3_assert( 7 == argc ); uv_loop_t* lup_u = uv_default_loop(); - c3_c* dir_c = argv[1]; - c3_c* key_c = argv[2]; - c3_c* wag_c = argv[3]; - c3_c* hap_c = argv[4]; + c3_c* dir_c = argv[2]; + c3_c* key_c = argv[3]; + c3_c* wag_c = argv[4]; + c3_c* hap_c = argv[5]; c3_d eve_d = 0; - if ( 1 != sscanf(argv[5], "%" PRIu64 "", &eve_d) ) { + if ( 1 != sscanf(argv[6], "%" PRIu64 "", &eve_d) ) { fprintf(stderr, "serf: rock: invalid number '%s'\r\n", argv[4]); } @@ -160,13 +172,13 @@ main(c3_i argc, c3_c* argv[]) // set up writing // out_u.ptr_v = &u3V; - out_u.bal_f = _newt_fail; + out_u.bal_f = _cw_serf_fail; // set up reading // inn_u.ptr_v = &u3V; - inn_u.pok_f = _newt_writ; - inn_u.bal_f = _newt_fail; + inn_u.pok_f = _cw_serf_writ; + inn_u.bal_f = _cw_serf_fail; // setup loom // @@ -184,20 +196,14 @@ main(c3_i argc, c3_c* argv[]) // XX must be after u3m_boot due to u3l_log // { - u3C.stderr_log_f = _newt_send_stdr; - u3C.slog_f = _newt_send_slog; - } - - if (u3_Host.ops_u.hap_w == 1337) { - u3a_compact(); - u3e_save(); - return 0; + u3C.stderr_log_f = _cw_serf_send_stdr; + u3C.slog_f = _cw_serf_send_slog; } // start serf // { - _newt_send(u3_serf_init(&u3V)); + _cw_serf_send(u3_serf_init(&u3V)); } // start reading @@ -207,6 +213,163 @@ main(c3_i argc, c3_c* argv[]) // enter loop // uv_run(lup_u, UV_RUN_DEFAULT); +} + +/* _cw_info(); print pier info +*/ +static void +_cw_info(c3_i argc, c3_c* argv[]) +{ + c3_assert( 3 <= argc ); + + c3_c* dir_c = argv[2]; + c3_d eve_d = u3m_boot(dir_c); + + fprintf(stderr, "urbit-worker: %s at event %" PRIu64 "\r\n", dir_c, eve_d); +} + +/* _cw_grab(); gc pier. +*/ +static void +_cw_grab(c3_i argc, c3_c* argv[]) +{ + c3_assert( 3 <= argc ); + + c3_c* dir_c = argv[2]; + u3m_boot(dir_c); + u3_serf_grab(); +} + +/* _cw_cram(); jam persistent state (rock), and exit. +*/ +static void +_cw_cram(c3_i argc, c3_c* argv[]) +{ + c3_assert( 3 <= argc ); + + c3_c* dir_c = argv[2]; + c3_d eve_d = u3m_boot(dir_c); + + fprintf(stderr, "urbit-worker: cram: preparing\r\n"); + + if ( c3n == u3m_rock_stay(dir_c, eve_d) ) { + fprintf(stderr, "urbit-worker: cram: unable to jam state\r\n"); + exit(1); + } + + fprintf(stderr, "urbit-worker: cram: rock saved at event %" PRIu64 "\r\n", eve_d); +} + +/* _cw_queu(); cue rock, save, and exit. +*/ +static void +_cw_queu(c3_i argc, c3_c* argv[]) +{ + c3_assert( 4 <= argc ); + + c3_c* dir_c = argv[2]; + c3_c* eve_c = argv[3]; + c3_d eve_d; + + if ( 1 != sscanf(eve_c, "%" PRIu64 "", &eve_d) ) { + fprintf(stderr, "urbit-worker: queu: invalid number '%s'\r\n", eve_c); + exit(1); + } + else { + fprintf(stderr, "urbit-worker: queu: preparing\r\n"); + + memset(&u3V, 0, sizeof(u3V)); + u3V.dir_c = strdup(dir_c); + u3V.sen_d = u3V.dun_d = u3m_boot(dir_c); + u3_serf_uncram(&u3V, eve_d); + u3e_save(); + + fprintf(stderr, "urbit-worker: queu: rock loaded at event %" PRIu64 "\r\n", eve_d); + } +} + +/* _cw_pack(); compact memory, save, and exit. +*/ +static void +_cw_pack(c3_i argc, c3_c* argv[]) +{ + c3_assert( 3 <= argc ); + + c3_c* dir_c = argv[2]; + + u3m_boot(dir_c); + u3a_compact(); + u3e_save(); +} + +/* _cw_usage(): print urbit-worker usage. +*/ +static void +_cw_usage(c3_i argc, c3_c* argv[]) +{ + fprintf(stderr, + "\rurbit-worker usage:\n" + " print pier info:\n" + " %s info \n\n" + " gc persistent state:\n" + " %s grab \n\n" + " compact persistent state:\n" + " %s pack \n\n" + " jam persistent state:\n" + " %s cram \n\n" + " cue persistent state:\n" + " %s queu \n\n" + " run as a 'serf':\n" + " %s serf \n", + argv[0], argv[0], argv[0], argv[0], argv[0], argv[0]); +} + +/* main(): main() when run as urbit-worker +*/ +c3_i +main(c3_i argc, c3_c* argv[]) +{ + // urbit-worker commands and positional arguments, by analogy + // + // $@ ~ ;; usage + // $% [%cram dir=@t] + // [%queu dir=@t eve=@ud] + // [%pack dir=@t] + // [%serf dir=@t key=@t wag=@t hap=@ud eve=@ud] + // == + // + // NB: don't print to anything other than stderr; + // other streams may have special requirements (in the case of "serf") + // + if ( 2 > argc ) { + _cw_usage(argc, argv); + exit(1); + } + else { + if ( 0 == strcmp("serf", argv[1]) ) { + _cw_serf_commence(argc, argv); + } + else if ( 0 == strcmp("info", argv[1]) ) { + _cw_info(argc, argv); + } + else if ( 0 == strcmp("grab", argv[1]) ) { + _cw_grab(argc, argv); + } + else if ( 0 == strcmp("cram", argv[1]) ) { + _cw_cram(argc, argv); + } + else if ( 0 == strcmp("queu", argv[1]) ) { + _cw_queu(argc, argv); + } + else if ( 0 == strcmp("pack", argv[1]) ) { + _cw_pack(argc, argv); + } + else { + fprintf(stderr, "unknown command '%s'\r\n", argv[1]); + _cw_usage(argc, argv); + exit(1); + } + } return 0; } diff --git a/pkg/urbit/worker/serf.c b/pkg/urbit/worker/serf.c index 55f12da8c..8dd1ddb80 100644 --- a/pkg/urbit/worker/serf.c +++ b/pkg/urbit/worker/serf.c @@ -260,10 +260,10 @@ _serf_grab(u3_serf* sef_u) } } -/* _serf_static_grab(): garbage collect, checking for profiling. RETAIN. +/* u3_serf_grab(): garbage collect. */ -static void -_serf_static_grab(void) +void +u3_serf_grab(void) { c3_assert( u3R == &(u3H->rod_u) ); @@ -280,7 +280,7 @@ _serf_static_grab(void) static void _serf_cram(u3_serf* sef_u) { - _serf_static_grab(); + u3_serf_grab(); u3l_log("serf (%" PRIu64 "): compacting loom\r\n", sef_u->dun_d); @@ -293,7 +293,7 @@ _serf_cram(u3_serf* sef_u) u3l_log("serf (%" PRIu64 "): compacted loom\r\n", sef_u->dun_d); - _serf_static_grab(); + u3_serf_grab(); } /* u3_serf_post(): update serf state post-writ. @@ -966,7 +966,7 @@ u3_serf_live(u3_serf* sef_u, u3_noun com, u3_noun* ret) return c3n; } - _serf_static_grab(); + u3_serf_grab(); *ret = u3nc(c3__live, u3_nul); return c3y; @@ -1197,7 +1197,7 @@ u3_serf_init(u3_serf* sef_u) // if ( !(pen_w > (1 << 28)) ) { // fprintf(stderr, "\r\n"); // u3a_print_memory(stderr, "serf: contiguous free space", pen_w); - // _serf_static_grab(); + // u3_serf_grab(); // } // }