Merge pull request #5717 from urbit/jb/one-vere

vere: removes urbit-worker
This commit is contained in:
Joe Bryan 2022-04-22 21:58:45 -04:00 committed by GitHub
commit ab0381447c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 593 additions and 590 deletions

View File

@ -115,5 +115,5 @@ jobs:
CACHIX_CACHE: ares
CACHIX_AUTH_TOKEN: ${{ secrets.CACHIX_AUTH_TOKEN }}
- run: mingw32-make build/urbit build/urbit-worker
- run: mingw32-make build/urbit
- run: build/urbit -l -d -B ../../bin/solid.pill -F bus && curl -f --data '{"source":{"dojo":"+hood/exit"},"sink":{"app":"hood"}}' http://localhost:12321

View File

@ -126,7 +126,6 @@ let
contents = {
"${name}/urbit" = "${urbit}/bin/urbit";
"${name}/urbit-worker" = "${urbit}/bin/urbit-worker";
};
};

View File

@ -56,7 +56,6 @@ in stdenv.mkDerivation {
installPhase = ''
mkdir -p $out/bin
cp ./build/urbit $out/bin/urbit
cp ./build/urbit-worker $out/bin/urbit-worker
'';
dontDisableStatic = enableStatic;

View File

@ -24,7 +24,7 @@ all_srcs = $(common) $(daemon) $(worker)
test_exes = $(shell echo $(tests) | sed 's/tests\//.\/build\//g' | sed 's/\.c//g')
bench_exes = $(shell echo $(bench) | sed 's/bench\//.\/build\//g' | sed 's/\.c//g')
all_exes = $(test_exes) $(bench_exes) ./build/urbit ./build/urbit-worker
all_exes = $(test_exes) $(bench_exes) ./build/urbit
# -Werror promotes all warnings that are enabled into errors (this is on)
# -Wall issues all types of errors. This is off (for now)
@ -67,12 +67,7 @@ build/%_tests: $(common_objs) tests/%_tests.o
@mkdir -p ./build
@$(CC) $^ $(LDFLAGS) -o $@
build/urbit: $(common_objs) $(daemon_objs)
@echo CC -o $@
@mkdir -p ./build
@$(CC) $^ $(LDFLAGS) -o $@
build/urbit-worker: $(common_objs) $(worker_objs)
build/urbit: $(common_objs) $(daemon_objs) $(worker_objs)
@echo CC -o $@
@mkdir -p ./build
@$(CC) $^ $(LDFLAGS) -o $@

View File

@ -4,6 +4,8 @@
#define U3_GLOBAL
#define C3_GLOBAL
#include "all.h"
#include "rsignal.h"
#include <vere/serf.h>
#include "vere/vere.h"
#if !defined(U3_OS_mingw)
#include <sigsegv.h>
@ -19,6 +21,16 @@
#include "ca-bundle.h"
// serf module state
//
static u3_serf u3V; // one serf per process
static u3_moat inn_u; // input stream
static u3_mojo out_u; // output stream
static u3_cue_xeno* sil_u; // cue handle
#undef SERF_TRACE_JAM
#undef SERF_TRACE_CUE
/* Require unsigned char
*/
STATIC_ASSERT(( 0 == CHAR_MIN && UCHAR_MAX == CHAR_MAX ),
@ -515,6 +527,27 @@ _setup_ssl_curl(void* arg)
curl_easy_setopt(curl, CURLOPT_SSL_CTX_FUNCTION, _curl_ssl_ctx_cb);
}
/* _cw_usage(): print utility usage.
*/
static void
_cw_usage(c3_c* s)
{
fprintf(stderr,
"\nutilities:\n"
" %s cram <pier> jam state:\n"
" %s grab <pier> measure memory usage:\n"
" %s info <pier> print pier info:\n"
" %s meld <pier> deduplicate snapshot:\n"
" %s pack <pier> defragment snapshot:\n"
" %s queu <pier> <at-event> cue state:\n"
"\n run as a 'serf':\n"
" %s serf <pier> <key> <flags> <cache-size> <at-event>"
#ifdef U3_OS_mingw
" <ctrlc-handle>"
#endif
"\n",
s, s, s, s, s, s, s);
}
/* u3_ve_usage(): print usage and exit.
*/
@ -589,6 +622,7 @@ u3_ve_usage(c3_i argc, c3_c** argv)
for ( i=0; use_c[i]; i++ ) {
fprintf(stderr, use_c[i], argv[0]);
}
_cw_usage(argv[0]);
exit(1);
}
@ -656,30 +690,549 @@ _stop_on_boot_completed_cb()
u3_king_exit();
}
static c3_i
_debug_db_stats(const c3_c* dir_c)
/* _cw_serf_fail(): failure stub.
*/
static void
_cw_serf_fail(void* ptr_v, ssize_t err_i, const c3_c* err_c)
{
#if defined(U3_CPU_aarch64) && defined(U3_OS_linux)
const size_t siz_i = 64424509440;
#else
const size_t siz_i = 1099511627776;
#endif
c3_c* log_c = c3_malloc(10 + strlen(dir_c));
strcpy(log_c, dir_c);
strcat(log_c, "/.urb/log");
MDB_env* mdb_u = u3_lmdb_init(log_c, siz_i);
if ( mdb_u ) {
u3_lmdb_stat(mdb_u, stdout);
u3_lmdb_exit(mdb_u);
return 0;
if ( UV_EOF == err_i ) {
fprintf(stderr, "serf: pier unexpectedly shut down\r\n");
}
else {
return 1;
fprintf(stderr, "serf: pier error: %s\r\n", err_c);
}
exit(1);
}
/* _cw_serf_send(): send plea back to daemon.
*/
static void
_cw_serf_send(u3_noun pel)
{
c3_d len_d;
c3_y* byt_y;
#ifdef SERF_TRACE_JAM
u3t_event_trace("serf ipc jam", 'B');
#endif
u3s_jam_xeno(pel, &len_d, &byt_y);
#ifdef SERF_TRACE_JAM
u3t_event_trace("serf ipc jam", 'E');
#endif
u3_newt_send(&out_u, len_d, byt_y);
u3z(pel);
}
/* _cw_serf_send_slog(): send hint output (hod is [priority tank]).
*/
static void
_cw_serf_send_slog(u3_noun hod)
{
_cw_serf_send(u3nc(c3__slog, hod));
}
/* _cw_serf_send_stdr(): send stderr output (%flog)
*/
static void
_cw_serf_send_stdr(c3_c* str_c)
{
_cw_serf_send(u3nc(c3__flog, u3i_string(str_c)));
}
/* _cw_serf_step_trace(): initialize or rotate trace file.
*/
static void
_cw_serf_step_trace(void)
{
if ( u3C.wag_w & u3o_trace ) {
if ( u3_Host.tra_u.con_w == 0 && u3_Host.tra_u.fun_w == 0 ) {
u3t_trace_open(u3V.dir_c);
}
else if ( u3_Host.tra_u.con_w >= 100000 ) {
u3t_trace_close();
u3t_trace_open(u3V.dir_c);
}
}
}
/* _cw_serf_writ(): process a command from the king.
*/
static void
_cw_serf_writ(void* vod_p, c3_d len_d, c3_y* byt_y)
{
u3_weak jar;
u3_noun ret;
_cw_serf_step_trace();
#ifdef SERF_TRACE_CUE
u3t_event_trace("serf ipc cue", 'B');
#endif
jar = u3s_cue_xeno_with(sil_u, len_d, byt_y);
#ifdef SERF_TRACE_CUE
u3t_event_trace("serf ipc cue", 'E');
#endif
if ( (u3_none == jar)
|| (c3n == u3_serf_writ(&u3V, jar, &ret)) )
{
_cw_serf_fail(0, -1, "bad jar");
}
else {
_cw_serf_send(ret);
// all references must now be counted, and all roots recorded
//
u3_serf_post(&u3V);
}
}
/* _cw_serf_stdio(): fix up std io handles
*/
static void
_cw_serf_stdio(c3_i* inn_i, c3_i* out_i)
{
// the serf is spawned with [FD 0] = events and [FD 1] = effects
// we dup [FD 0 & 1] so we don't accidently use them for something else
// we replace [FD 0] (stdin) with a fd pointing to /dev/null
// we replace [FD 1] (stdout) with a dup of [FD 2] (stderr)
//
c3_i nul_i = c3_open(c3_dev_null, O_RDWR, 0);
*inn_i = dup(0);
*out_i = dup(1);
dup2(nul_i, 0);
dup2(2, 1);
close(nul_i);
// set stream I/O to unbuffered because it's now a pipe not a console
//
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
}
/* _cw_serf_stdio(): cleanup on serf exit.
*/
static void
_cw_serf_exit(void)
{
u3s_cue_xeno_done(sil_u);
u3t_trace_close();
}
/* _cw_init_io(): initialize i/o streams.
*/
static void
_cw_init_io(uv_loop_t* lup_u)
{
// mars is spawned with [FD 0] = events and [FD 1] = effects
// we dup [FD 0 & 1] so we don't accidently use them for something else
// we replace [FD 0] (stdin) with a fd pointing to /dev/null
// we replace [FD 1] (stdout) with a dup of [FD 2] (stderr)
//
c3_i nul_i = c3_open(c3_dev_null, O_RDWR, 0);
c3_i inn_i = dup(0);
c3_i out_i = dup(1);
dup2(nul_i, 0);
dup2(2, 1);
close(nul_i);
// set stream I/O to unbuffered because it's now a pipe not a console
//
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
// Ignore SIGPIPE signals.
//
#ifndef U3_OS_mingw
{
struct sigaction sig_s = {{0}};
sigemptyset(&(sig_s.sa_mask));
sig_s.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &sig_s, 0);
}
#endif
// configure pipe to daemon process
//
{
c3_i err_i;
err_i = uv_timer_init(lup_u, &inn_u.tim_u);
c3_assert(!err_i);
err_i = uv_pipe_init(lup_u, &inn_u.pyp_u, 0);
c3_assert(!err_i);
uv_pipe_open(&inn_u.pyp_u, inn_i);
err_i = uv_pipe_init(lup_u, &out_u.pyp_u, 0);
c3_assert(!err_i);
uv_pipe_open(&out_u.pyp_u, out_i);
uv_stream_set_blocking((uv_stream_t*)&out_u.pyp_u, 1);
}
}
#ifdef U3_OS_mingw
/* _cw_intr_win_cb(): invoked when urth signals ctrl-c.
*/
static void
_cw_intr_win_cb(PVOID param, BOOLEAN timedOut)
{
rsignal_raise(SIGINT);
}
/* _cw_intr_win(): initialize ctrl-c handling.
*/
static void
_cw_intr_win(c3_c* han_c)
{
HANDLE h;
if ( 1 != sscanf(han_c, "%" PRIu64, &h) ) {
fprintf(stderr, "mars: ctrl-c event: bad handle %s: %s\r\n",
han_c, strerror(errno));
}
else {
if ( !RegisterWaitForSingleObject(&h, h, _cw_intr_win_cb,
NULL, INFINITE, 0) )
{
fprintf(stderr,
"mars: ctrl-c event: RegisterWaitForSingleObject(%u) failed (%d)\r\n",
h, GetLastError());
}
}
}
#endif
/* _cw_serf_commence(); initialize and run serf
*/
static void
_cw_serf_commence(c3_i argc, c3_c* argv[])
{
#ifdef U3_OS_mingw
if ( 8 > argc ) {
#else
if ( 7 > argc ) {
#endif
fprintf(stderr, "serf: missing args\n");
exit(1);
}
c3_d eve_d = 0;
uv_loop_t* lup_u = u3_Host.lup_u = uv_default_loop();
c3_c* dir_c = argv[2];
c3_c* key_c = argv[3]; // XX use passkey
c3_c* wag_c = argv[4];
c3_c* hap_c = argv[5];
c3_c* eve_c = argv[6];
#ifdef U3_OS_mingw
c3_c* han_c = argv[7];
_cw_intr_win(han_c);
#endif
_cw_init_io(lup_u);
memset(&u3V, 0, sizeof(u3V));
memset(&u3_Host.tra_u, 0, sizeof(u3_Host.tra_u));
// load passkey
//
// XX and then ... use passkey
//
{
sscanf(key_c, "%" PRIx64 ":%" PRIx64 ":%" PRIx64 ":%" PRIx64,
&u3V.key_d[0],
&u3V.key_d[1],
&u3V.key_d[2],
&u3V.key_d[3]);
}
// load runtime config
//
{
sscanf(wag_c, "%" SCNu32, &u3C.wag_w);
sscanf(hap_c, "%" SCNu32, &u3_Host.ops_u.hap_w);
if ( 1 != sscanf(eve_c, "%" PRIu64, &eve_d) ) {
fprintf(stderr, "serf: rock: invalid number '%s'\r\n", argv[4]);
}
}
sil_u = u3s_cue_xeno_init();
// set up writing
//
out_u.ptr_v = &u3V;
out_u.bal_f = _cw_serf_fail;
// set up reading
//
inn_u.ptr_v = &u3V;
inn_u.pok_f = _cw_serf_writ;
inn_u.bal_f = _cw_serf_fail;
// setup loom
//
{
u3V.dir_c = strdup(dir_c);
u3V.sen_d = u3V.dun_d = u3m_boot(dir_c);
if ( eve_d ) {
// XX need not be fatal, need a u3m_reboot equivalent
// XX can spuriously fail do to corrupt memory-image checkpoint,
// need a u3m_half_boot equivalent
// workaround is to delete/move the checkpoint in case of corruption
//
if ( c3n == u3u_uncram(u3V.dir_c, eve_d) ) {
fprintf(stderr, "serf (%" PRIu64 "): rock load failed\r\n", eve_d);
exit(1);
}
}
}
// set up logging
//
// XX must be after u3m_boot due to u3l_log
//
{
u3C.stderr_log_f = _cw_serf_send_stdr;
u3C.slog_f = _cw_serf_send_slog;
}
u3V.xit_f = _cw_serf_exit;
#if defined(SERF_TRACE_JAM) || defined(SERF_TRACE_CUE)
u3t_trace_open(u3V.dir_c);
#endif
// start serf
//
{
_cw_serf_send(u3_serf_init(&u3V));
}
// start reading
//
u3_newt_read_sync(&inn_u);
// enter loop
//
uv_run(lup_u, UV_RUN_DEFAULT);
u3m_stop();
}
/* _cw_disk_init(); open event log
*/
static u3_disk*
_cw_disk_init(c3_c* dir_c)
{
u3_disk_cb cb_u = {0};
u3_disk* log_u = u3_disk_init(dir_c, cb_u);
if ( !log_u ) {
fprintf(stderr, "unable to open event log\n");
exit(1);
}
return log_u;
}
/* _cw_info(); print pier info
*/
static void
_cw_info(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
c3_d eve_d = u3m_boot(dir_c);
u3_disk* log_u = _cw_disk_init(dir_c);
fprintf(stderr, "\r\nurbit: %s at event %" PRIu64 "\r\n", dir_c, eve_d);
u3_disk_slog(log_u);
printf("\n");
u3_lmdb_stat(log_u->mdb_u, stdout);
u3_disk_exit(log_u);
u3m_stop();
}
/* _cw_grab(); gc pier.
*/
static void
_cw_grab(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
u3m_boot(dir_c);
u3C.wag_w |= u3o_hashless;
u3_serf_grab();
u3m_stop();
}
/* _cw_cram(); jam persistent state (rock), and exit.
*/
static void
_cw_cram(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
c3_d eve_d = u3m_boot(dir_c);
u3_disk* log_u = _cw_disk_init(dir_c); // XX s/b try_aquire lock
c3_o ret_o;
fprintf(stderr, "urbit: cram: preparing\r\n");
if ( c3n == (ret_o = u3u_cram(dir_c, eve_d)) ) {
fprintf(stderr, "urbit: cram: unable to jam state\r\n");
}
else {
fprintf(stderr, "urbit: cram: rock saved at event %" PRIu64 "\r\n", eve_d);
}
// save even on failure, as we just did all the work of deduplication
//
u3e_save();
u3_disk_exit(log_u);
if ( c3n == ret_o ) {
exit(1);
}
u3m_stop();
}
/* _cw_queu(); cue rock, save, and exit.
*/
static void
_cw_queu(c3_i argc, c3_c* argv[])
{
c3_assert( 4 <= argc );
c3_c* dir_c = argv[2];
c3_c* eve_c = argv[3];
c3_d eve_d;
if ( 1 != sscanf(eve_c, "%" PRIu64 "", &eve_d) ) {
fprintf(stderr, "urbit: queu: invalid number '%s'\r\n", eve_c);
exit(1);
}
else {
u3_disk* log_u = _cw_disk_init(dir_c); // XX s/b try_aquire lock
fprintf(stderr, "urbit: queu: preparing\r\n");
u3m_boot(dir_c);
// XX can spuriously fail do to corrupt memory-image checkpoint,
// need a u3m_half_boot equivalent
// workaround is to delete/move the checkpoint in case of corruption
//
if ( c3n == u3u_uncram(dir_c, eve_d) ) {
fprintf(stderr, "urbit: queu: failed\r\n");
exit(1);
}
u3e_save();
u3_disk_exit(log_u);
fprintf(stderr, "urbit: queu: rock loaded at event %" PRIu64 "\r\n", eve_d);
u3m_stop();
}
}
/* _cw_uniq(); deduplicate persistent nouns
*/
static void
_cw_meld(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
u3_disk* log_u = _cw_disk_init(dir_c); // XX s/b try_aquire lock
c3_w pre_w;
u3C.wag_w |= u3o_hashless;
u3m_boot(dir_c);
pre_w = u3a_open(u3R);
u3u_meld();
u3a_print_memory(stderr, "urbit: meld: gained", (u3a_open(u3R) - pre_w));
u3e_save();
u3_disk_exit(log_u);
u3m_stop();
}
/* _cw_pack(); compact memory, save, and exit.
*/
static void
_cw_pack(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
u3_disk* log_u = _cw_disk_init(dir_c); // XX s/b try_aquire lock
u3m_boot(dir_c);
u3a_print_memory(stderr, "urbit: pack: gained", u3m_pack());
u3e_save();
u3_disk_exit(log_u);
u3m_stop();
}
/* _cw_utils(): "worker" utilities and "serf" entrypoint
*/
static c3_i
_cw_utils(c3_i argc, c3_c* argv[])
{
// utility commands and positional arguments, by analogy
//
// $@ ~ :: usage
// $% [%cram dir=@t] :: jam state
// [%grab dir=@t] :: gc
// [%info dir=@t] :: print
// [%meld dir=@t] :: deduplicate
// [%pack dir=@t] :: defragment
// [%queu dir=@t eve=@ud] :: cue state
// :: :: ipc:
// [%serf dir=@t key=@t wag=@t hap=@ud eve=@ud] :: compute
// ==
//
// NB: don't print to anything other than stderr;
// other streams may be used for ipc.
//
if ( (2 < argc) && 4 == strlen(argv[1]) ) {
c3_m mot_m;
{
c3_c* s = argv[1]; mot_m = c3_s4(s[0], s[1], s[2], s[3]);
}
switch ( mot_m ) {
case c3__cram: _cw_cram(argc, argv); return 1;
case c3__grab: _cw_grab(argc, argv); return 1;
case c3__info: _cw_info(argc, argv); return 1;
case c3__meld: _cw_meld(argc, argv); return 1;
case c3__pack: _cw_pack(argc, argv); return 1;
case c3__queu: _cw_queu(argc, argv); return 1;
case c3__serf: _cw_serf_commence(argc, argv); return 1;
}
}
return 0;
}
c3_i
@ -688,30 +1241,17 @@ main(c3_i argc,
{
// Parse options.
//
if ( c3n == _main_getopt(argc, argv) ) {
if ( (3 == argc)
&& (0 == strcmp("db-info", argv[1])) )
{
return _debug_db_stats(argv[2]);
}
if ( _cw_utils(argc, argv) ) {
return 0;
}
else if ( c3n == _main_getopt(argc, argv) ) {
u3_ve_usage(argc, argv);
return 1;
}
// Set `u3_Host.wrk_c` to the worker executable path.
c3_i urbit_exe_len = strlen(argv[0]);
c3_i worker_exe_len = 1 + urbit_exe_len + strlen("-worker");
u3_Host.wrk_c = c3_malloc(worker_exe_len);
#if defined(U3_OS_mingw)
if ( urbit_exe_len >= 4 && !strcmp(argv[0] + urbit_exe_len - 4, ".exe")) {
snprintf(u3_Host.wrk_c, worker_exe_len, "%.*s-worker.exe", urbit_exe_len - 4, argv[0]);
} else {
snprintf(u3_Host.wrk_c, worker_exe_len, "%s-worker", argv[0]);
if ( !u3_Host.wrk_c ) {
u3_Host.wrk_c = strdup(argv[0]);
}
#else
snprintf(u3_Host.wrk_c, worker_exe_len, "%s-worker", argv[0]);
#endif
if ( c3y == u3_Host.ops_u.dem ) {
// In daemon mode, run the urbit as a background process, but don't
@ -834,7 +1374,7 @@ main(c3_i argc,
}
}
#if defined(U3_OS_mingw)
#ifdef U3_OS_mingw
// Initialize event used to transmit Ctrl-C to worker process
//
{
@ -844,7 +1384,7 @@ main(c3_i argc,
exit(1);
}
}
#endif
#endif
// starting u3m configures OpenSSL memory functions, so we must do it
// before any OpenSSL allocations

View File

@ -492,6 +492,7 @@
# define c3__good c3_s4('g','o','o','d')
# define c3__gorm c3_s4('g','o','r','m')
# define c3__goto c3_s4('g','o','t','o')
# define c3__grab c3_s4('g','r','a','b')
# define c3__grad c3_s4('g','r','a','d')
# define c3__gram c3_s4('g','r','a','m')
# define c3__gran c3_s4('g','r','a','n')
@ -935,6 +936,7 @@
# define c3__quem c3_s4('q','u','e','m')
# define c3__ques c3_s4('q','u','e','s')
# define c3__quet c3_s4('q','u','e','t')
# define c3__queu c3_s4('q','u','e','u')
# define c3__quid c3_s4('q','u','i','d')
# define c3__quil c3_s4('q','u','i','l')
# define c3__quix c3_s4('q','u','i','x')
@ -990,6 +992,7 @@
# define c3__send c3_s4('s','e','n','d')
# define c3__seq c3_s3('s','e','q')
# define c3__serd c3_s4('s','e','r','d')
# define c3__serf c3_s4('s','e','r','f')
# define c3__set c3_s3('s','e','t')
# define c3__sfix c3_s4('s','f','i','x')
# define c3__sgbc c3_s4('s','g','b','c')

View File

@ -603,9 +603,9 @@ _king_sign_cb(uv_signal_t* sil_u, c3_i num_i)
u3l_log("\r\ninterrupt\r\n");
u3_term_ef_ctlc();
#if defined(U3_OS_mingw)
#ifdef U3_OS_mingw
PulseEvent(u3_Host.cev_u);
#endif
#endif
break;
}

View File

@ -1190,13 +1190,13 @@ u3_lord_init(c3_c* pax_c, c3_w wag_w, c3_d key_d[4], u3_lord_cb cb_u)
arg_c[6] = "0";
}
#if defined(U3_OS_mingw)
sprintf(cev_c, "%u", u3_Host.cev_u);
#ifdef U3_OS_mingw
sprintf(cev_c, "%" PRIu64, u3_Host.cev_u);
arg_c[7] = cev_c;
arg_c[8] = 0;
#else
#else
arg_c[7] = 0;
#endif
#endif
uv_pipe_init(u3L, &god_u->inn_u.pyp_u, 0);
uv_timer_init(u3L, &god_u->out_u.tim_u);

View File

@ -1,533 +0,0 @@
/* worker/main.c
**
** the main loop of a serf process.
*/
#include "all.h"
#include "rsignal.h"
#include <vere/vere.h>
#include <vere/serf.h>
#include "ur/hashcons.h"
static u3_serf u3V; // one serf per process
static u3_moat inn_u; // input stream
static u3_mojo out_u; // output stream
static u3_cue_xeno* sil_u; // cue handle
#undef SERF_TRACE_JAM
#undef SERF_TRACE_CUE
/* _cw_serf_fail(): failure stub.
*/
static void
_cw_serf_fail(void* ptr_v, ssize_t err_i, const c3_c* err_c)
{
if ( UV_EOF == err_i ) {
fprintf(stderr, "serf: pier unexpectedly shut down\r\n");
}
else {
fprintf(stderr, "serf: pier error: %s\r\n", err_c);
}
exit(1);
}
/* _cw_serf_send(): send plea back to daemon.
*/
static void
_cw_serf_send(u3_noun pel)
{
c3_d len_d;
c3_y* byt_y;
#ifdef SERF_TRACE_JAM
u3t_event_trace("serf ipc jam", 'B');
#endif
u3s_jam_xeno(pel, &len_d, &byt_y);
#ifdef SERF_TRACE_JAM
u3t_event_trace("serf ipc jam", 'E');
#endif
u3_newt_send(&out_u, len_d, byt_y);
u3z(pel);
}
/* _cw_serf_send_slog(): send hint output (hod is [priority tank]).
*/
static void
_cw_serf_send_slog(u3_noun hod)
{
_cw_serf_send(u3nc(c3__slog, hod));
}
/* _cw_serf_send_stdr(): send stderr output (%flog)
*/
static void
_cw_serf_send_stdr(c3_c* str_c)
{
_cw_serf_send(u3nc(c3__flog, u3i_string(str_c)));
}
/* _cw_serf_step_trace(): initialize or rotate trace file.
*/
static void
_cw_serf_step_trace(void)
{
if ( u3C.wag_w & u3o_trace ) {
if ( u3_Host.tra_u.con_w == 0 && u3_Host.tra_u.fun_w == 0 ) {
u3t_trace_open(u3V.dir_c);
}
else if ( u3_Host.tra_u.con_w >= 100000 ) {
u3t_trace_close();
u3t_trace_open(u3V.dir_c);
}
}
}
/* _cw_serf_writ(): process a command from the king.
*/
static void
_cw_serf_writ(void* vod_p, c3_d len_d, c3_y* byt_y)
{
u3_weak jar;
u3_noun ret;
_cw_serf_step_trace();
#ifdef SERF_TRACE_CUE
u3t_event_trace("serf ipc cue", 'B');
#endif
jar = u3s_cue_xeno_with(sil_u, len_d, byt_y);
#ifdef SERF_TRACE_CUE
u3t_event_trace("serf ipc cue", 'E');
#endif
if ( (u3_none == jar)
|| (c3n == u3_serf_writ(&u3V, jar, &ret)) )
{
_cw_serf_fail(0, -1, "bad jar");
}
else {
_cw_serf_send(ret);
// all references must now be counted, and all roots recorded
//
u3_serf_post(&u3V);
}
}
/* _cw_serf_stdio(): fix up std io handles
*/
static void
_cw_serf_stdio(c3_i* inn_i, c3_i* out_i)
{
// the serf is spawned with [FD 0] = events and [FD 1] = effects
// we dup [FD 0 & 1] so we don't accidently use them for something else
// we replace [FD 0] (stdin) with a fd pointing to /dev/null
// we replace [FD 1] (stdout) with a dup of [FD 2] (stderr)
//
c3_i nul_i = c3_open(c3_dev_null, O_RDWR, 0);
*inn_i = dup(0);
*out_i = dup(1);
dup2(nul_i, 0);
dup2(2, 1);
close(nul_i);
// set stream I/O to unbuffered because it's now a pipe not a console
//
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
}
/* _cw_serf_stdio(): cleanup on serf exit.
*/
static void
_cw_serf_exit(void)
{
u3s_cue_xeno_done(sil_u);
u3t_trace_close();
}
#if defined(U3_OS_mingw)
/* _mingw_ctrlc_cb(): invoked when the lord signals the Ctrl-C event
*/
static void
_mingw_ctrlc_cb(PVOID param, BOOLEAN timedOut)
{
rsignal_raise(SIGINT);
}
#endif
/* _cw_serf_commence(); initialize and run serf
*/
static void
_cw_serf_commence(c3_i argc, c3_c* argv[])
{
c3_i inn_i, out_i;
_cw_serf_stdio(&inn_i, &out_i);
#if defined(U3_OS_mingw)
c3_assert( 8 == argc );
// Initialize serf's end of Ctrl-C handling
//
{
HANDLE h;
if ( 1 != sscanf(argv[7], "%u", &h) ) {
fprintf(stderr, "serf: Ctrl-C event: bad handle %s: %s\r\n", argv[7], strerror(errno));
} else
if ( !RegisterWaitForSingleObject(&h, h, _mingw_ctrlc_cb, NULL, INFINITE, 0) ) {
fprintf(stderr, "serf: Ctrl-C event: RegisterWaitForSingleObject(%u) failed (%d)\r\n", h, GetLastError());
}
}
#else
c3_assert( 7 == argc );
#endif
uv_loop_t* lup_u = uv_default_loop();
c3_c* dir_c = argv[2];
c3_c* key_c = argv[3];
c3_c* wag_c = argv[4];
c3_c* hap_c = argv[5];
c3_d eve_d = 0;
if ( 1 != sscanf(argv[6], "%" PRIu64 "", &eve_d) ) {
fprintf(stderr, "serf: rock: invalid number '%s'\r\n", argv[4]);
}
memset(&u3V, 0, sizeof(u3V));
memset(&u3_Host.tra_u, 0, sizeof(u3_Host.tra_u));
// load passkey
//
// XX and then ... use passkey
//
{
sscanf(key_c, "%" PRIx64 ":%" PRIx64 ":%" PRIx64 ":%" PRIx64 "",
&u3V.key_d[0],
&u3V.key_d[1],
&u3V.key_d[2],
&u3V.key_d[3]);
}
// load runtime config
//
{
sscanf(wag_c, "%" SCNu32, &u3C.wag_w);
sscanf(hap_c, "%" SCNu32, &u3_Host.ops_u.hap_w);
}
// Ignore SIGPIPE signals.
//
#ifndef U3_OS_mingw
{
struct sigaction sig_s = {{0}};
sigemptyset(&(sig_s.sa_mask));
sig_s.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &sig_s, 0);
}
#endif
// configure pipe to daemon process
//
{
c3_i err_i;
err_i = uv_timer_init(lup_u, &inn_u.tim_u);
c3_assert(!err_i);
err_i = uv_pipe_init(lup_u, &inn_u.pyp_u, 0);
c3_assert(!err_i);
uv_pipe_open(&inn_u.pyp_u, inn_i);
err_i = uv_pipe_init(lup_u, &out_u.pyp_u, 0);
c3_assert(!err_i);
uv_pipe_open(&out_u.pyp_u, out_i);
uv_stream_set_blocking((uv_stream_t*)&out_u.pyp_u, 1);
}
sil_u = u3s_cue_xeno_init();
// set up writing
//
out_u.ptr_v = &u3V;
out_u.bal_f = _cw_serf_fail;
// set up reading
//
inn_u.ptr_v = &u3V;
inn_u.pok_f = _cw_serf_writ;
inn_u.bal_f = _cw_serf_fail;
// setup loom
//
{
u3V.dir_c = strdup(dir_c);
u3V.sen_d = u3V.dun_d = u3m_boot(dir_c);
if ( eve_d ) {
// XX need not be fatal, need a u3m_reboot equivalent
// XX can spuriously fail do to corrupt memory-image checkpoint,
// need a u3m_half_boot equivalent
// workaround is to delete/move the checkpoint in case of corruption
//
if ( c3n == u3u_uncram(u3V.dir_c, eve_d) ) {
fprintf(stderr, "serf (%" PRIu64 "): rock load failed\r\n", eve_d);
exit(1);
}
}
}
// set up logging
//
// XX must be after u3m_boot due to u3l_log
//
{
u3C.stderr_log_f = _cw_serf_send_stdr;
u3C.slog_f = _cw_serf_send_slog;
}
u3V.xit_f = _cw_serf_exit;
#if defined(SERF_TRACE_JAM) || defined(SERF_TRACE_CUE)
u3t_trace_open(u3V.dir_c);
#endif
// start serf
//
{
_cw_serf_send(u3_serf_init(&u3V));
}
// start reading
//
u3_newt_read_sync(&inn_u);
// enter loop
//
uv_run(lup_u, UV_RUN_DEFAULT);
u3m_stop();
}
/* _cw_info(); print pier info
*/
static void
_cw_info(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
c3_d eve_d = u3m_boot(dir_c);
fprintf(stderr, "urbit-worker: %s at event %" PRIu64 "\r\n", dir_c, eve_d);
u3m_stop();
}
/* _cw_grab(); gc pier.
*/
static void
_cw_grab(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
u3m_boot(dir_c);
u3C.wag_w |= u3o_hashless;
u3_serf_grab();
u3m_stop();
}
/* _cw_cram(); jam persistent state (rock), and exit.
*/
static void
_cw_cram(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
c3_d eve_d = u3m_boot(dir_c);
c3_o ret_o;
fprintf(stderr, "urbit-worker: cram: preparing\r\n");
if ( c3n == (ret_o = u3u_cram(dir_c, eve_d)) ) {
fprintf(stderr, "urbit-worker: cram: unable to jam state\r\n");
}
else {
fprintf(stderr, "urbit-worker: cram: rock saved at event %" PRIu64 "\r\n", eve_d);
}
// save even on failure, as we just did all the work of deduplication
//
u3e_save();
if ( c3n == ret_o ) {
exit(1);
}
u3m_stop();
}
/* _cw_queu(); cue rock, save, and exit.
*/
static void
_cw_queu(c3_i argc, c3_c* argv[])
{
c3_assert( 4 <= argc );
c3_c* dir_c = argv[2];
c3_c* eve_c = argv[3];
c3_d eve_d;
if ( 1 != sscanf(eve_c, "%" PRIu64 "", &eve_d) ) {
fprintf(stderr, "urbit-worker: queu: invalid number '%s'\r\n", eve_c);
exit(1);
}
else {
fprintf(stderr, "urbit-worker: queu: preparing\r\n");
memset(&u3V, 0, sizeof(u3V));
u3V.dir_c = strdup(dir_c);
u3V.sen_d = u3V.dun_d = u3m_boot(dir_c);
// XX can spuriously fail do to corrupt memory-image checkpoint,
// need a u3m_half_boot equivalent
// workaround is to delete/move the checkpoint in case of corruption
//
if ( c3n == u3u_uncram(dir_c, eve_d) ) {
fprintf(stderr, "urbit-worker: queu: failed\r\n");
exit(1);
}
u3e_save();
fprintf(stderr, "urbit-worker: queu: rock loaded at event %" PRIu64 "\r\n", eve_d);
u3m_stop();
}
}
/* _cw_uniq(); deduplicate persistent nouns
*/
static void
_cw_meld(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
u3C.wag_w |= u3o_hashless;
u3m_boot(dir_c);
u3_serf_grab();
u3u_meld();
u3_serf_grab();
u3e_save();
}
/* _cw_pack(); compact memory, save, and exit.
*/
static void
_cw_pack(c3_i argc, c3_c* argv[])
{
c3_assert( 3 <= argc );
c3_c* dir_c = argv[2];
u3m_boot(dir_c);
u3a_print_memory(stderr, "urbit-worker: pack: gained", u3m_pack());
u3e_save();
u3m_stop();
}
/* _cw_usage(): print urbit-worker usage.
*/
static void
_cw_usage(c3_i argc, c3_c* argv[])
{
fprintf(stderr,
"\rurbit-worker usage:\n"
" print pier info:\n"
" %s info <pier>\n\n"
" gc persistent state:\n"
" %s grab <pier>\n\n"
" compact persistent state:\n"
" %s pack <pier>\n\n"
" deduplicate persistent state:\n"
" %s meld <pier>\n\n"
" jam persistent state:\n"
" %s cram <pier>\n\n"
" cue persistent state:\n"
" %s queu <pier> <at-event>\n\n"
" run as a 'serf':\n"
" %s serf <pier> <key> <flags> <cache-size> <at-event>"
#if defined(U3_OS_mingw)
" <ctrlc-handle>"
#endif
"\n",
argv[0], argv[0], argv[0], argv[0], argv[0], argv[0], argv[0]);
}
/* main(): main() when run as urbit-worker
*/
c3_i
main(c3_i argc, c3_c* argv[])
{
// urbit-worker commands and positional arguments, by analogy
//
// $@ ~ ;; usage
// $% [%cram dir=@t]
// [%queu dir=@t eve=@ud]
// [%pack dir=@t]
// [%serf dir=@t key=@t wag=@t hap=@ud eve=@ud]
// ==
//
// NB: don't print to anything other than stderr;
// other streams may have special requirements (in the case of "serf")
//
if ( 2 > argc ) {
_cw_usage(argc, argv);
exit(1);
}
else {
if ( 0 == strcmp("serf", argv[1]) ) {
_cw_serf_commence(argc, argv);
}
else if ( 0 == strcmp("info", argv[1]) ) {
_cw_info(argc, argv);
}
else if ( 0 == strcmp("grab", argv[1]) ) {
_cw_grab(argc, argv);
}
else if ( 0 == strcmp("cram", argv[1]) ) {
_cw_cram(argc, argv);
}
else if ( 0 == strcmp("queu", argv[1]) ) {
_cw_queu(argc, argv);
}
else if ( 0 == strcmp("meld", argv[1]) ) {
_cw_meld(argc, argv);
}
else if ( 0 == strcmp("pack", argv[1]) ) {
_cw_pack(argc, argv);
}
else {
fprintf(stderr, "unknown command '%s'\r\n", argv[1]);
_cw_usage(argc, argv);
exit(1);
}
}
return 0;
}