serf: adds subcommands to urbit-worker

This commit is contained in:
Joe Bryan 2020-07-21 16:53:45 -07:00
parent 7d243771c0
commit a580f96416
5 changed files with 223 additions and 54 deletions

View File

@ -305,7 +305,7 @@ start (Config exePax pierPath flags onSlog onStdr onDead) = do
config = show (compileFlags flags)
rock = "0" -- XX support loading from rock
cache = "50000" -- XX support memo-cache size
args = [pierPath, diskKey, config, cache, rock]
args = ["serf", pierPath, diskKey, config, cache, rock]
pSpec = (proc exePax args) { std_in = CreatePipe
, std_out = CreatePipe
, std_err = CreatePipe

View File

@ -58,3 +58,8 @@
*/
void
u3_serf_post(u3_serf* sef_u);
/* u3_serf_grab(): garbage collect.
*/
void
u3_serf_grab(void);

View File

@ -1066,7 +1066,7 @@ u3_lord_init(c3_c* pax_c, c3_w wag_w, c3_d key_d[4], u3_lord_cb cb_u)
// spawn new process and connect to it
//
{
c3_c* arg_c[7];
c3_c* arg_c[8];
c3_c key_c[256];
c3_c wag_c[11];
c3_c hap_c[11];
@ -1083,21 +1083,22 @@ u3_lord_init(c3_c* pax_c, c3_w wag_w, c3_d key_d[4], u3_lord_cb cb_u)
sprintf(hap_c, "%u", u3_Host.ops_u.hap_w);
arg_c[0] = god_u->bin_c; // executable
arg_c[1] = god_u->pax_c; // path to checkpoint directory
arg_c[2] = key_c; // disk key
arg_c[3] = wag_c; // runtime config
arg_c[4] = hap_c; // hash table size
arg_c[1] = "serf"; // protocol
arg_c[2] = god_u->pax_c; // path to checkpoint directory
arg_c[3] = key_c; // disk key
arg_c[4] = wag_c; // runtime config
arg_c[5] = hap_c; // hash table size
if ( u3_Host.ops_u.roc_c ) {
// XX validate
//
arg_c[5] = u3_Host.ops_u.roc_c;
arg_c[6] = u3_Host.ops_u.roc_c;
}
else {
arg_c[5] = "0";
arg_c[6] = "0";
}
arg_c[6] = 0;
arg_c[7] = 0;
uv_pipe_init(u3L, &god_u->inn_u.pyp_u, 0);
uv_timer_init(u3L, &god_u->out_u.tim_u);

View File

@ -26,51 +26,51 @@ static u3_serf u3V; // one serf per process
static u3_moat inn_u; // input stream
static u3_mojo out_u; // output stream
/* _newt_fail(): failure stub.
/* _cw_serf_fail(): failure stub.
*/
static void
_newt_fail(void* vod_p, const c3_c* wut_c)
_cw_serf_fail(void* vod_p, const c3_c* wut_c)
{
fprintf(stderr, "serf: fail: %s\r\n", wut_c);
exit(1);
}
/* _newt_send(): send plea back to daemon.
/* _cw_serf_send(): send plea back to daemon.
*/
static void
_newt_send(u3_noun pel)
_cw_serf_send(u3_noun pel)
{
u3_newt_write(&out_u, u3ke_jam(pel));
}
/* _newt_send_slog(): send hint output (hod is [priority tank]).
/* _cw_serf_send_slog(): send hint output (hod is [priority tank]).
*/
static void
_newt_send_slog(u3_noun hod)
_cw_serf_send_slog(u3_noun hod)
{
_newt_send(u3nc(c3__slog, hod));
_cw_serf_send(u3nc(c3__slog, hod));
}
/* _newt_send_stdr(): send stderr output
/* _cw_serf_send_stdr(): send stderr output
*/
static void
_newt_send_stdr(c3_c* str_c)
_cw_serf_send_stdr(c3_c* str_c)
{
_newt_send_slog(u3nc(0, u3i_string(str_c)));
_cw_serf_send_slog(u3nc(0, u3i_string(str_c)));
}
/* _newt_writ():
/* _cw_serf_writ():
*/
static void
_newt_writ(void* vod_p, u3_noun mat)
_cw_serf_writ(void* vod_p, u3_noun mat)
{
u3_noun ret;
if ( c3n == u3_serf_writ(&u3V, u3ke_cue(mat), &ret) ) {
_newt_fail(0, "bad jar");
_cw_serf_fail(0, "bad jar");
}
else {
_newt_send(ret);
_cw_serf_send(ret);
// all references must now be counted, and all roots recorded
//
@ -78,10 +78,10 @@ _newt_writ(void* vod_p, u3_noun mat)
}
}
/* main(): main() when run as urbit-worker
/* _cw_serf_stdio(): fix up std io handles
*/
c3_i
main(c3_i argc, c3_c* argv[])
static void
_cw_serf_stdio(c3_i* inn_i, c3_i* out_i)
{
// the serf is spawned with [FD 0] = events and [FD 1] = effects
// we dup [FD 0 & 1] so we don't accidently use them for something else
@ -89,22 +89,34 @@ main(c3_i argc, c3_c* argv[])
// we replace [FD 1] (stdout) with a dup of [FD 2] (stderr)
//
c3_i nul_i = open("/dev/null", O_RDWR, 0);
c3_i inn_i = dup(0);
c3_i out_i = dup(1);
*inn_i = dup(0);
*out_i = dup(1);
dup2(nul_i, 0);
dup2(2, 1);
close(nul_i);
c3_assert( 6 == argc );
close(nul_i);
}
/* _cw_serf_commence(); initialize and run serf
*/
static void
_cw_serf_commence(c3_i argc, c3_c* argv[])
{
c3_i inn_i, out_i;
_cw_serf_stdio(&inn_i, &out_i);
c3_assert( 7 == argc );
uv_loop_t* lup_u = uv_default_loop();
c3_c* dir_c = argv[1];
c3_c* key_c = argv[2];
c3_c* wag_c = argv[3];
c3_c* hap_c = argv[4];
c3_c* dir_c = argv[2];
c3_c* key_c = argv[3];
c3_c* wag_c = argv[4];
c3_c* hap_c = argv[5];
c3_d eve_d = 0;
if ( 1 != sscanf(argv[5], "%" PRIu64 "", &eve_d) ) {
if ( 1 != sscanf(argv[6], "%" PRIu64 "", &eve_d) ) {
fprintf(stderr, "serf: rock: invalid number '%s'\r\n", argv[4]);
}
@ -160,13 +172,13 @@ main(c3_i argc, c3_c* argv[])
// set up writing
//
out_u.ptr_v = &u3V;
out_u.bal_f = _newt_fail;
out_u.bal_f = _cw_serf_fail;
// set up reading
//
inn_u.ptr_v = &u3V;
inn_u.pok_f = _newt_writ;
inn_u.bal_f = _newt_fail;
inn_u.pok_f = _cw_serf_writ;
inn_u.bal_f = _cw_serf_fail;
// setup loom
//
@ -184,20 +196,14 @@ main(c3_i argc, c3_c* argv[])
// XX must be after u3m_boot due to u3l_log
//
{
u3C.stderr_log_f = _newt_send_stdr;
u3C.slog_f = _newt_send_slog;
}
if (u3_Host.ops_u.hap_w == 1337) {
u3a_compact();
u3e_save();
return 0;
u3C.stderr_log_f = _cw_serf_send_stdr;
u3C.slog_f = _cw_serf_send_slog;
}
// start serf
//
{
_newt_send(u3_serf_init(&u3V));
_cw_serf_send(u3_serf_init(&u3V));
}
// start reading
@ -207,6 +213,163 @@ main(c3_i argc, c3_c* argv[])
// enter loop
//
uv_run(lup_u, UV_RUN_DEFAULT);
}
/* _cw_info(); print pier info
*/
static void
_cw_info(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
c3_d eve_d = u3m_boot(dir_c);
fprintf(stderr, "urbit-worker: %s at event %" PRIu64 "\r\n", dir_c, eve_d);
}
/* _cw_grab(); gc pier.
*/
static void
_cw_grab(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
u3m_boot(dir_c);
u3_serf_grab();
}
/* _cw_cram(); jam persistent state (rock), and exit.
*/
static void
_cw_cram(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
c3_d eve_d = u3m_boot(dir_c);
fprintf(stderr, "urbit-worker: cram: preparing\r\n");
if ( c3n == u3m_rock_stay(dir_c, eve_d) ) {
fprintf(stderr, "urbit-worker: cram: unable to jam state\r\n");
exit(1);
}
fprintf(stderr, "urbit-worker: cram: rock saved at event %" PRIu64 "\r\n", eve_d);
}
/* _cw_queu(); cue rock, save, and exit.
*/
static void
_cw_queu(c3_i argc, c3_c* argv[])
{
c3_assert( 4 <= argc );
c3_c* dir_c = argv[2];
c3_c* eve_c = argv[3];
c3_d eve_d;
if ( 1 != sscanf(eve_c, "%" PRIu64 "", &eve_d) ) {
fprintf(stderr, "urbit-worker: queu: invalid number '%s'\r\n", eve_c);
exit(1);
}
else {
fprintf(stderr, "urbit-worker: queu: preparing\r\n");
memset(&u3V, 0, sizeof(u3V));
u3V.dir_c = strdup(dir_c);
u3V.sen_d = u3V.dun_d = u3m_boot(dir_c);
u3_serf_uncram(&u3V, eve_d);
u3e_save();
fprintf(stderr, "urbit-worker: queu: rock loaded at event %" PRIu64 "\r\n", eve_d);
}
}
/* _cw_pack(); compact memory, save, and exit.
*/
static void
_cw_pack(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
u3m_boot(dir_c);
u3a_compact();
u3e_save();
}
/* _cw_usage(): print urbit-worker usage.
*/
static void
_cw_usage(c3_i argc, c3_c* argv[])
{
fprintf(stderr,
"\rurbit-worker usage:\n"
" print pier info:\n"
" %s info <pier>\n\n"
" gc persistent state:\n"
" %s grab <pier>\n\n"
" compact persistent state:\n"
" %s pack <pier>\n\n"
" jam persistent state:\n"
" %s cram <pier>\n\n"
" cue persistent state:\n"
" %s queu <pier> <at-event>\n\n"
" run as a 'serf':\n"
" %s serf <pier> <key> <flags> <cache-size> <at-event>\n",
argv[0], argv[0], argv[0], argv[0], argv[0], argv[0]);
}
/* main(): main() when run as urbit-worker
*/
c3_i
main(c3_i argc, c3_c* argv[])
{
// urbit-worker commands and positional arguments, by analogy
//
// $@ ~ ;; usage
// $% [%cram dir=@t]
// [%queu dir=@t eve=@ud]
// [%pack dir=@t]
// [%serf dir=@t key=@t wag=@t hap=@ud eve=@ud]
// ==
//
// NB: don't print to anything other than stderr;
// other streams may have special requirements (in the case of "serf")
//
if ( 2 > argc ) {
_cw_usage(argc, argv);
exit(1);
}
else {
if ( 0 == strcmp("serf", argv[1]) ) {
_cw_serf_commence(argc, argv);
}
else if ( 0 == strcmp("info", argv[1]) ) {
_cw_info(argc, argv);
}
else if ( 0 == strcmp("grab", argv[1]) ) {
_cw_grab(argc, argv);
}
else if ( 0 == strcmp("cram", argv[1]) ) {
_cw_cram(argc, argv);
}
else if ( 0 == strcmp("queu", argv[1]) ) {
_cw_queu(argc, argv);
}
else if ( 0 == strcmp("pack", argv[1]) ) {
_cw_pack(argc, argv);
}
else {
fprintf(stderr, "unknown command '%s'\r\n", argv[1]);
_cw_usage(argc, argv);
exit(1);
}
}
return 0;
}

View File

@ -260,10 +260,10 @@ _serf_grab(u3_serf* sef_u)
}
}
/* _serf_static_grab(): garbage collect, checking for profiling. RETAIN.
/* u3_serf_grab(): garbage collect.
*/
static void
_serf_static_grab(void)
void
u3_serf_grab(void)
{
c3_assert( u3R == &(u3H->rod_u) );
@ -280,7 +280,7 @@ _serf_static_grab(void)
static void
_serf_cram(u3_serf* sef_u)
{
_serf_static_grab();
u3_serf_grab();
u3l_log("serf (%" PRIu64 "): compacting loom\r\n", sef_u->dun_d);
@ -293,7 +293,7 @@ _serf_cram(u3_serf* sef_u)
u3l_log("serf (%" PRIu64 "): compacted loom\r\n", sef_u->dun_d);
_serf_static_grab();
u3_serf_grab();
}
/* u3_serf_post(): update serf state post-writ.
@ -966,7 +966,7 @@ u3_serf_live(u3_serf* sef_u, u3_noun com, u3_noun* ret)
return c3n;
}
_serf_static_grab();
u3_serf_grab();
*ret = u3nc(c3__live, u3_nul);
return c3y;
@ -1197,7 +1197,7 @@ u3_serf_init(u3_serf* sef_u)
// if ( !(pen_w > (1 << 28)) ) {
// fprintf(stderr, "\r\n");
// u3a_print_memory(stderr, "serf: contiguous free space", pen_w);
// _serf_static_grab();
// u3_serf_grab();
// }
// }