Merge pull request #3006 from urbit/ipc-redux-vere

u3/serf: ipc-redux: refactoring and bugfixes
This commit is contained in:
Joe Bryan 2020-06-11 17:08:50 -07:00 committed by GitHub
commit 9bf98f7b6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 868 additions and 714 deletions

View File

@ -84,6 +84,11 @@
# define u3nt(a, b, c) u3i_trel(a, b, c)
# define u3nq(a, b, c, d) u3i_qual(a, b, c, d)
/* u3nl(), u3_none-terminated varargs list
*/
# define u3nl u3i_list
/* u3du(), u3ud(): noun/cell test.
*/
# define u3du(som) (u3r_du(som))

View File

@ -1,60 +1,67 @@
/* include/n/i.h
/* include/noun/imprison.h
**
** This file is in the public domain.
*/
/* General constructors.
*/
/* u3i_words():
**
** Copy [a] words from [b] into an atom.
*/
u3_noun
u3i_words(c3_w a_w,
const c3_w* b_w);
/* u3i_bytes():
**
** Copy `a` bytes from `b` to an LSB first atom.
/* u3i_bytes(): Copy [a] bytes from [b] to an LSB first atom.
*/
u3_noun
u3i_bytes(c3_w a_w,
const c3_y* b_y);
/* u3i_mp():
**
** Copy the GMP integer `a` into an atom, and clear it.
/* u3i_words(): Copy [a] words from [b] into an atom.
*/
u3_noun
u3i_words(c3_w a_w,
const c3_w* b_w);
/* u3i_chubs(): Copy [a] chubs from [b] into an atom.
*/
u3_atom
u3i_chubs(c3_w a_w,
const c3_d* b_d);
/* u3i_mp(): Copy the GMP integer [a] into an atom, and clear it.
*/
u3_noun
u3i_mp(mpz_t a_mp);
/* u3i_vint():
**
** Create `a + 1`.
/* u3i_vint(): increment [a].
*/
u3_noun
u3i_vint(u3_noun a);
/* u3i_cell():
**
** Produce the cell `[a b]`.
/* u3i_cell(): Produce the cell `[a b]`.
*/
u3_noun
u3i_cell(u3_noun a, u3_noun b);
/* u3i_trel():
**
** Produce the triple `[a b c]`.
/* u3i_trel(): Produce the triple `[a b c]`.
*/
u3_noun
u3i_trel(u3_noun a, u3_noun b, u3_noun c);
/* u3i_qual():
**
** Produce the cell `[a b c d]`.
/* u3i_qual(): Produce the cell `[a b c d]`.
*/
u3_noun
u3i_qual(u3_noun a, u3_noun b, u3_noun c, u3_noun d);
/* u3i_string(): Produce an LSB-first atom from the C string [a].
*/
u3_noun
u3i_string(const c3_c* a_c);
/* u3i_tape(): from a C string, to a list of bytes.
*/
u3_atom
u3i_tape(const c3_c* txt_c);
/* u3i_list(): list from `u3_none`-terminated varargs.
*/
u3_noun
u3i_list(u3_weak som, ...);
/* u3i_edit():
**
** Mutate `big` at axis `axe` with new value `som`
@ -63,13 +70,6 @@
u3_noun
u3i_edit(u3_noun big, u3_noun axe, u3_noun som);
/* u3i_string():
**
** Produce an LSB-first atom from the C string `a`.
*/
u3_noun
u3i_string(const c3_c* a_c);
/* u3i_molt():
**
** Mutate `som` with a 0-terminated list of axis, noun pairs.
@ -77,18 +77,3 @@
*/
u3_noun
u3i_molt(u3_noun som, ...);
/* u3i_chubs():
**
** Construct `a` double-words from `b`, LSD first, as an atom.
*/
u3_atom
u3i_chubs(c3_w a_w,
const c3_d* b_d);
/* u3i_tape(): from a C string, to a list of bytes.
*/
u3_atom
u3i_tape(const c3_c* txt_c);

View File

@ -12,7 +12,7 @@
c3_d dun_d; // last event processed
c3_l mug_l; // hash of state
c3_o pac_o; // pack kernel
c3_o rec_o; // reclaim cash
c3_o rec_o; // reclaim cache
c3_o mut_o; // mutated kerne
u3_noun sac; // space measurementl
} u3_serf;
@ -24,6 +24,11 @@
u3_noun
u3_serf_init(u3_serf* sef_u);
/* u3_serf_unpack(): initialize from rock at [eve_d].
*/
void
u3_serf_unpack(u3_serf* sef_u, c3_d eve_d);
/* u3_serf_writ(): apply writ [wit], producing plea [*pel] on c3y.
*/
c3_o

View File

@ -46,15 +46,6 @@
*/
typedef void (*u3_moor_bail)(void*, const c3_c* err_c);
/* u3_mess: blob message in process.
*/
typedef struct _u3_mess {
c3_d len_d; // blob length in bytes
c3_d has_d; // currently held
struct _u3_meat* meq_u; // exit of message queue
struct _u3_meat* qem_u; // entry of message queue
} u3_mess;
/* u3_meat: blob message block.
*/
typedef struct _u3_meat {
@ -63,35 +54,61 @@
c3_y hun_y[0];
} u3_meat;
/* u3_mess_type: in-process message type.
*/
typedef enum {
u3_mess_head = 0, // awaiting header
u3_mess_tail = 1 // awaiting body
} u3_mess_type;
/* u3_mess: blob message in process.
*/
typedef struct _u3_mess {
u3_mess_type sat_e; // msg type
union { //
struct { // awaiting header:
c3_y len_y[8]; // header bytes
c3_y has_y; // length
} hed_u; //
struct { // awaiting body
u3_meat* met_u; // partial message
c3_d has_d; // length
} tal_u; //
};
} u3_mess;
/* u3_moat: inbound message stream.
*/
typedef struct _u3_moat {
uv_pipe_t pyp_u; // input stream
u3_moor_bail bal_f; // error response function
void* vod_p; // callback pointer
void* ptr_v; // callback pointer
u3_moor_poke pok_f; // action function
struct _u3_mess* mes_u; // message in progress
c3_d len_d; // length of stray bytes
c3_y* rag_y; // stray bytes
u3_mess mes_u; // message in progress
uv_timer_t tim_u; // queue timer
u3_meat* ent_u; // entry of message queue
u3_meat* ext_u; // exit of message queue
} u3_moat;
/* u3_mojo: outbound message stream.
*/
typedef struct _u3_mojo {
uv_pipe_t pyp_u; // output stream
uv_pipe_t pyp_u; // output stream
u3_moor_bail bal_f; // error response function
void* ptr_v; // callback pointer
} u3_mojo;
/* u3_moor: two-way message stream, linked list */
typedef struct _u3_moor {
uv_pipe_t pyp_u;
u3_moor_bail bal_f;
void* vod_p;
u3_moor_poke pok_f;
struct _u3_mess* mes_u;
c3_d len_d;
c3_y* rag_y;
struct _u3_moor* nex_u;
uv_pipe_t pyp_u; // duplex stream
u3_moor_bail bal_f; // error response function
void* ptr_v; // callback pointer
u3_moor_poke pok_f; // action function
u3_mess mes_u; // message in progress
uv_timer_t tim_u; // queue timer
u3_meat* ent_u; // entry of message queue
u3_meat* ext_u; // exit of message queue
struct _u3_moor* nex_u; // next in list
} u3_moor;
/* u3_dent: directory entry.
@ -1057,24 +1074,22 @@
/** Stream messages.
**/
/* u3_newt_encode(): encode an atom to a length-prefixed byte buffer
*/
c3_y*
u3_newt_encode(u3_atom mat, c3_w* len_w);
/* u3_newt_decode(): decode a (partial) length-prefixed byte buffer
*/
void
u3_newt_decode(u3_moat* mot_u, c3_y* buf_y, c3_w len_w);
u3_newt_decode(u3_moat* mot_u, c3_y* buf_y, c3_d len_d);
/* u3_newt_write(): write atom to stream; free atom.
*/
void
u3_newt_write(u3_mojo* moj_u,
u3_atom mat,
void* vod_p);
u3_newt_write(u3_mojo* moj_u, u3_atom mat);
/* u3_newt_read(): activate reading on input stream.
/* u3_newt_read_sync(): start reading; multiple msgs synchronous.
*/
void
u3_newt_read_sync(u3_moat* mot_u);
/* u3_newt_read(): start reading; each msg asynchronous.
*/
void
u3_newt_read(u3_moat* mot_u);
@ -1118,6 +1133,11 @@
c3_o
u3_pier_save(u3_pier* pir_u);
/* u3_pier_pack(): save a portable snapshot.
*/
c3_o
u3_pier_pack(u3_pier* pir_u);
/* u3_pier_stub(): get the One Pier for unreconstructed code.
*/
u3_pier*

View File

@ -1,90 +1,25 @@
/* g/i.c
/* noun/imprison.c
**
*/
#include "all.h"
/* u3i_words():
**
** Copy [a] words from [b] into an atom.
*/
u3_noun
u3i_words(c3_w a_w,
const c3_w* b_w)
{
/* Strip trailing zeroes.
*/
while ( a_w && !b_w[a_w - 1] ) {
a_w--;
}
/* Check for cat.
*/
if ( !a_w ) {
return 0;
}
else if ( (a_w == 1) && !(b_w[0] >> 31) ) {
return b_w[0];
}
/* Allocate, fill, return.
*/
{
c3_w* nov_w = u3a_walloc(a_w + c3_wiseof(u3a_atom));
u3a_atom* nov_u = (void*)nov_w;
nov_u->mug_w = 0;
nov_u->len_w = a_w;
/* Fill the words.
*/
{
c3_w i_w;
for ( i_w=0; i_w < a_w; i_w++ ) {
nov_u->buf_w[i_w] = b_w[i_w];
}
}
return u3a_to_pug(u3a_outa(nov_w));
}
}
/* u3i_chubs():
**
** Construct `a` double-words from `b`, LSD first, as an atom.
*/
u3_atom
u3i_chubs(c3_w a_w,
const c3_d* b_d)
{
c3_w *b_w = c3_malloc(a_w * 8);
c3_w i_w;
u3_atom p;
for ( i_w = 0; i_w < a_w; i_w++ ) {
b_w[(2 * i_w)] = b_d[i_w] & 0xffffffffULL;
b_w[(2 * i_w) + 1] = b_d[i_w] >> 32ULL;
}
p = u3i_words((a_w * 2), b_w);
c3_free(b_w);
return p;
}
/* u3i_bytes():
**
** Copy `a` bytes from `b` to an LSB first atom.
/* u3i_bytes(): Copy [a] bytes from [b] to an LSB first atom.
*/
u3_noun
u3i_bytes(c3_w a_w,
const c3_y* b_y)
const c3_y* b_y)
{
/* Strip trailing zeroes.
*/
u3_noun pro;
u3t_on(mal_o);
// Strip trailing zeroes.
//
while ( a_w && !b_y[a_w - 1] ) {
a_w--;
}
/* Check for cat.
*/
// Check for cat.
//
if ( a_w <= 4 ) {
if ( !a_w ) {
return 0;
@ -103,18 +38,18 @@ u3i_bytes(c3_w a_w,
}
}
/* Allocate, fill, return.
*/
// Allocate, fill, return.
//
{
c3_w len_w = (a_w + 3) >> 2;
c3_w* nov_w = u3a_walloc((len_w + c3_wiseof(u3a_atom)));
c3_w len_w = (a_w + 3) >> 2;
c3_w* nov_w = u3a_walloc((len_w + c3_wiseof(u3a_atom)));
u3a_atom* nov_u = (void*)nov_w;
nov_u->mug_w = 0;
nov_u->len_w = len_w;
/* Clear the words.
*/
// Clear the words.
//
{
c3_w i_w;
@ -123,8 +58,8 @@ u3i_bytes(c3_w a_w,
}
}
/* Fill the bytes.
*/
// Fill the bytes.
//
{
c3_w i_w;
@ -132,13 +67,137 @@ u3i_bytes(c3_w a_w,
nov_u->buf_w[i_w >> 2] |= (b_y[i_w] << ((i_w & 3) * 8));
}
}
return u3a_to_pug(u3a_outa(nov_w));
pro = u3a_to_pug(u3a_outa(nov_w));
}
u3t_off(mal_o);
return pro;
}
/* u3i_mp():
**
** Copy the GMP integer `a` into an atom, and clear it.
/* u3i_words(): Copy [a] words from [b] into an atom.
*/
u3_noun
u3i_words(c3_w a_w,
const c3_w* b_w)
{
u3_noun pro;
u3t_on(mal_o);
// Strip trailing zeroes.
//
while ( a_w && !b_w[a_w - 1] ) {
a_w--;
}
// Check for cat.
//
if ( !a_w ) {
return 0;
}
else if ( (a_w == 1) && !(b_w[0] >> 31) ) {
return b_w[0];
}
// Allocate, fill, return.
//
{
c3_w* nov_w = u3a_walloc(a_w + c3_wiseof(u3a_atom));
u3a_atom* nov_u = (void*)nov_w;
nov_u->mug_w = 0;
nov_u->len_w = a_w;
// Fill the words.
//
{
c3_w i_w;
for ( i_w=0; i_w < a_w; i_w++ ) {
nov_u->buf_w[i_w] = b_w[i_w];
}
}
pro = u3a_to_pug(u3a_outa(nov_w));
}
u3t_off(mal_o);
return pro;
}
/* u3i_chubs(): Copy [a] chubs from [b] into an atom.
*/
u3_atom
u3i_chubs(c3_w a_w,
const c3_d* b_d)
{
u3_noun pro;
u3t_on(mal_o);
// Strip trailing zeroes.
//
while ( a_w && !b_d[a_w - 1] ) {
a_w--;
}
// Check for cat.
//
if ( !a_w ) {
return 0;
}
else if ( (1 == a_w) && !(b_d[0] >> 31) ) {
return (c3_w)b_d[0];
}
// Allocate, fill, return.
//
{
c3_w len_w = 2 * a_w;
if ( !(b_d[a_w - 1] >> 32) ) {
len_w--;
}
c3_w* nov_w = u3a_walloc(len_w + c3_wiseof(u3a_atom));
u3a_atom* nov_u = (void*)nov_w;
nov_u->mug_w = 0;
nov_u->len_w = len_w;
// Fill the words.
//
{
c3_w i_w, x_w, max_w = a_w - 1;
c3_d i_d;
for ( i_w = 0; i_w < max_w; i_w++ ) {
i_d = b_d[i_w];
x_w = 2 * i_w;
nov_u->buf_w[x_w] = i_d & 0xffffffffULL;
x_w++;
nov_u->buf_w[x_w] = i_d >> 32;
}
{
i_d = b_d[i_w];
x_w = 2 * i_w;
nov_u->buf_w[x_w] = i_d & 0xffffffffULL;
x_w++;
}
if ( x_w < len_w ) {
nov_u->buf_w[x_w] = i_d >> 32;
}
}
pro = u3a_to_pug(u3a_outa(nov_w));
}
u3t_off(mal_o);
return pro;
}
/* u3i_mp(): Copy the GMP integer [a] into an atom, and clear it.
*/
u3_noun
u3i_mp(mpz_t a_mp)
@ -152,9 +211,7 @@ u3i_mp(mpz_t a_mp)
return u3a_malt(buz_w);
}
/* u3i_vint():
**
** Create `a + 1`.
/* u3i_vint(): increment [a].
*/
u3_noun
u3i_vint(u3_noun a)
@ -176,67 +233,41 @@ u3i_vint(u3_noun a)
mpz_t a_mp;
u3r_mp(a_mp, a);
u3a_lose(a);
u3z(a);
mpz_add_ui(a_mp, a_mp, 1);
return u3i_mp(a_mp);
}
}
c3_w BAD;
/* u3i_cell():
**
** Produce the cell `[a b]`.
/* u3i_cell(): Produce the cell `[a b]`.
*/
u3_noun
u3i_cell(u3_noun a, u3_noun b)
{
u3_noun pro;
u3t_on(mal_o);
#ifdef U3_CPU_DEBUG
u3R->pro.cel_d++;
#endif
{
// c3_w* nov_w = u3a_walloc(c3_wiseof(u3a_cell));
c3_w* nov_w = u3a_celloc();
c3_w* nov_w = u3a_celloc();
u3a_cell* nov_u = (void *)nov_w;
u3_noun pro;
nov_u->mug_w = 0;
nov_u->hed = a;
nov_u->tel = b;
pro = u3a_to_pom(u3a_outa(nov_w));
#if 0
if ( (0x730e66cc == u3r_mug(pro)) &&
(c3__tssg == u3h(u3t(u3t(pro)))) ) {
static c3_w xuc_w;
u3l_log("BAD %x %p\r\n", pro, u3a_to_ptr(a));
BAD = pro;
if ( xuc_w == 1 ) u3m_bail(c3__exit);
xuc_w++;
}
#endif
#if 1
u3t_off(mal_o);
return pro;
#else
if ( !FOO ) return u3a_to_pom(u3a_outa(nov_w));
else {
u3_noun pro = u3a_to_pom(u3a_outa(nov_w));
u3m_p("leaked", pro);
u3l_log("pro %u, %x\r\n", pro, u3r_mug(pro));
abort();
}
#endif
}
u3t_off(mal_o);
return pro;
}
/* u3i_trel():
**
** Produce the triple `[a b c]`.
/* u3i_trel(): Produce the triple `[a b c]`.
*/
u3_noun
u3i_trel(u3_noun a, u3_noun b, u3_noun c)
@ -244,9 +275,7 @@ u3i_trel(u3_noun a, u3_noun b, u3_noun c)
return u3i_cell(a, u3i_cell(b, c));
}
/* u3i_qual():
**
** Produce the cell `[a b c d]`.
/* u3i_qual(): Produce the cell `[a b c d]`.
*/
u3_noun
u3i_qual(u3_noun a, u3_noun b, u3_noun c, u3_noun d)
@ -254,6 +283,57 @@ u3i_qual(u3_noun a, u3_noun b, u3_noun c, u3_noun d)
return u3i_cell(a, u3i_trel(b, c, d));
}
/* u3i_string(): Produce an LSB-first atom from the C string [a].
*/
u3_noun
u3i_string(const c3_c* a_c)
{
return u3i_bytes(strlen(a_c), (c3_y *)a_c);
}
/* u3i_tape(): from a C string, to a list of bytes.
*/
u3_atom
u3i_tape(const c3_c* txt_c)
{
if ( !*txt_c ) {
return u3_nul;
} else return u3i_cell(*txt_c, u3i_tape(txt_c + 1));
}
/* u3i_list(): list from `u3_none`-terminated varargs.
*/
u3_noun
u3i_list(u3_weak som, ...)
{
u3_noun lit = u3_nul;
va_list ap;
if ( u3_none == som ) {
return lit;
}
else {
lit = u3nc(som, lit);
}
{
u3_noun tem;
va_start(ap, som);
while ( 1 ) {
if ( u3_none == (tem = va_arg(ap, u3_weak)) ) {
break;
}
else {
lit = u3nc(tem, lit);
}
}
va_end(ap);
}
return u3kb_flop(lit);
}
static u3_noun
_edit_cat(u3_noun big, c3_l axe_l, u3_noun som)
{
@ -398,48 +478,6 @@ u3i_edit(u3_noun big, u3_noun axe, u3_noun som)
}
}
/* u3i_string():
**
** Produce an LSB-first atom from the C string `a`.
*/
u3_noun
u3i_string(const c3_c* a_c)
{
return u3i_bytes(strlen(a_c), (c3_y *)a_c);
}
/* u3i_tape(): from a C string, to a list of bytes.
*/
u3_atom
u3i_tape(const c3_c* txt_c)
{
if ( !*txt_c ) {
return u3_nul;
} else return u3i_cell(*txt_c, u3i_tape(txt_c + 1));
}
/* u3i_decimal():
**
** Parse `a` as a list of decimal digits.
*/
u3_atom
u3i_decimal(u3_noun a);
/* u3i_heximal():
**
** Parse `a` as a list of hex digits.
*/
u3_noun
u3i_heximal(u3_noun a);
/* u3i_list():
**
** Generate a null-terminated list, with `u3_none` as terminator.
*/
u3_noun
u3i_list(u3_weak one, ...);
/* u3i_molt():
**
** Mutate `som` with a 0-terminated list of axis, noun pairs.
@ -476,7 +514,7 @@ u3i_list(u3_weak one, ...);
struct _molt_pair* pms_m) // transfer
{
if ( len_w == 0 ) {
return u3a_gain(som);
return u3k(som);
}
else if ( (len_w == 1) && (1 == pms_m[0].axe_w) ) {
return pms_m[0].som;
@ -503,8 +541,8 @@ u3i_molt(u3_noun som, ...)
struct _molt_pair* pms_m;
u3_noun pro;
/* Count.
*/
// Count.
//
len_w = 0;
{
va_start(ap, som);
@ -521,8 +559,8 @@ u3i_molt(u3_noun som, ...)
c3_assert( 0 != len_w );
pms_m = alloca(len_w * sizeof(struct _molt_pair));
/* Install.
*/
// Install.
//
{
c3_w i_w;
@ -534,10 +572,9 @@ u3i_molt(u3_noun som, ...)
va_end(ap);
}
/* Apply.
*/
// Apply.
//
pro = _molt_apply(som, len_w, pms_m);
u3a_lose(som);
u3z(som);
return pro;
}

View File

@ -1387,7 +1387,7 @@ u3r_mug_chub(c3_d num_d)
c3_w buf_w[2];
buf_w[0] = (c3_w)(num_d & 0xffffffffULL);
buf_w[1] = (c3_w)(num_d >> 32ULL);
buf_w[1] = (c3_w)(num_d >> 32);
return u3r_mug_words(buf_w, 2);
}
@ -1405,14 +1405,30 @@ u3r_mug_string(const c3_c *a_c)
c3_w
u3r_mug_words(const c3_w* key_w, c3_w len_w)
{
c3_w byt_w = 0;
c3_w wor_w;
c3_w byt_w;
while ( 0 < len_w ) {
wor_w = key_w[--len_w];
byt_w += _(u3a_is_cat(wor_w)) ? u3r_met(3, wor_w) : 4;
// ignore trailing zeros
//
while ( len_w && !key_w[len_w - 1] ) {
len_w--;
}
// calculate byte-width a la u3r_met(3, ...)
//
if ( !len_w ) {
byt_w = 0;
}
else {
c3_w gal_w = len_w - 1;
c3_w daz_w = key_w[gal_w];
byt_w = (gal_w << 2)
+ ((daz_w >> 24) ? 4 : (daz_w >> 16) ? 3 : (daz_w >> 8) ? 2 : 1);
}
// XX: assumes little-endian
//
return u3r_mug_bytes((c3_y*)key_w, byt_w);
}
@ -1422,8 +1438,7 @@ c3_w
u3r_mug_both(c3_w lef_w, c3_w rit_w)
{
c3_w ham_w = lef_w ^ (0x7fffffff ^ rit_w);
return u3r_mug_words(&ham_w, (0 == ham_w) ? 0 : 1);
return u3r_mug_words(&ham_w, 1);
}
/* u3r_mug_cell(): Compute the mug of the cell `[hed tel]`.
@ -1531,7 +1546,7 @@ u3r_mug(u3_noun veb)
// veb is a direct atom, mug is not memoized
//
if ( _(u3a_is_cat(veb)) ) {
mug_w = u3r_mug_bytes((c3_y*)&veb, u3r_met(3, veb));
mug_w = u3r_mug_words(&veb, 1);
goto retreat;
}
// veb is indirect, a pointer into the loom
@ -1549,7 +1564,7 @@ u3r_mug(u3_noun veb)
//
else if ( _(u3a_is_atom(veb)) ) {
u3a_atom* vat_u = (u3a_atom*)veb_u;
mug_w = u3r_mug_bytes((c3_y*)vat_u->buf_w, u3r_met(3, veb));
mug_w = u3r_mug_words(vat_u->buf_w, vat_u->len_w);
vat_u->mug_w = mug_w;
goto retreat;
}

View File

@ -94,6 +94,87 @@ _test_mug(void)
c3_free(str_w);
}
{
c3_w som_w[4];
u3_noun som;
{
som_w[0] = 0;
som_w[1] = 0;
som_w[2] = 0;
som_w[3] = 1;
som = u3i_words(4, som_w);
if ( 0x519bd45c != u3r_mug(som) ) {
fprintf(stderr, "fail (j) (1)\r\n");
exit(1);
}
if ( 0x519bd45c != u3r_mug_words(som_w, 4) ) {
fprintf(stderr, "fail (j) (2)\r\n");
exit(1);
}
u3z(som);
}
{
som_w[0] = 0;
som_w[1] = 1;
som_w[2] = 0;
som_w[3] = 1;
som = u3i_words(4, som_w);
if ( 0x540eb8a9 != u3r_mug(som) ) {
fprintf(stderr, "fail (k) (1)\r\n");
exit(1);
}
if ( 0x540eb8a9 != u3r_mug_words(som_w, 4) ) {
fprintf(stderr, "fail (k) (2)\r\n");
exit(1);
}
u3z(som);
}
{
som_w[0] = 1;
som_w[1] = 1;
som_w[2] = 0;
som_w[3] = 1;
som = u3i_words(4, som_w);
if ( 0x319d28f9 != u3r_mug(som) ) {
fprintf(stderr, "fail (l) (1)\r\n");
exit(1);
}
if ( 0x319d28f9 != u3r_mug_words(som_w, 4) ) {
fprintf(stderr, "fail (l) (2)\r\n");
exit(1);
}
u3z(som);
}
{
som_w[0] = 0;
som_w[1] = 0;
som_w[2] = 0;
som_w[3] = 0xffff;
som = u3i_words(4, som_w);
if ( 0x5230a260 != u3r_mug(som) ) {
fprintf(stderr, "fail (m) (1)\r\n");
exit(1);
}
if ( 0x5230a260 != u3r_mug_words(som_w, 4) ) {
fprintf(stderr, "fail (m) (2)\r\n");
exit(1);
}
u3z(som);
}
}
fprintf(stderr, "test_mug: ok\n");
}

View File

@ -10,20 +10,43 @@ _setup(void)
u3m_pave(c3y, c3n);
}
static c3_w pok_w;
static c3_w bal_w;
static void
_moat_poke_cb(void* vod_p, u3_atom a)
/* _newt_encode(): synchronous serialization into a single buffer, for test purposes
*/
static c3_y*
_newt_encode(u3_atom mat, c3_w* len_w)
{
pok_w++;
u3z(a);
c3_w met_w = u3r_met(3, mat);
c3_y* buf_y;
*len_w = 8 + met_w;
buf_y = c3_malloc(*len_w);
// write header; c3_d is futureproofing
//
buf_y[0] = ((met_w >> 0) & 0xff);
buf_y[1] = ((met_w >> 8) & 0xff);
buf_y[2] = ((met_w >> 16) & 0xff);
buf_y[3] = ((met_w >> 24) & 0xff);
buf_y[4] = buf_y[5] = buf_y[6] = buf_y[7] = 0;
u3r_bytes(0, met_w, buf_y + 8, mat);
u3z(mat);
return buf_y;
}
static void
_moat_bail_cb(void* vod_p, const c3_c* err_c)
static c3_w
_moat_length(u3_moat* mot_u)
{
bal_w++;
u3_meat* met_u = mot_u->ext_u;
c3_w len_w = 0;
while ( met_u ) {
met_u = met_u->nex_u;
len_w++;
}
return len_w;
}
/* _test_newt_smol(): various scenarios with small messages
@ -39,19 +62,16 @@ _test_newt_smol(void)
c3_y* buf_y;
memset(&mot_u, 0, sizeof(u3_moat));
mot_u.pok_f = _moat_poke_cb;
mot_u.bal_f = _moat_bail_cb;
// one message one buffer
//
{
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = u3_newt_encode(u3k(a), &len_w);
buf_y = _newt_encode(u3k(a), &len_w);
u3_newt_decode(&mot_u, buf_y, len_w);
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt smol fail (a)\n");
exit(1);
}
@ -60,10 +80,9 @@ _test_newt_smol(void)
// two messages one buffer
//
{
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = u3_newt_encode(u3k(a), &len_w);
buf_y = _newt_encode(u3k(a), &len_w);
buf_y = c3_realloc(buf_y, 2 * len_w);
memcpy(buf_y + len_w, buf_y, len_w);
@ -71,7 +90,7 @@ _test_newt_smol(void)
u3_newt_decode(&mot_u, buf_y, len_w);
if ( 2 != pok_w ) {
if ( 2 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt smol fail (b)\n");
exit(1);
}
@ -81,24 +100,24 @@ _test_newt_smol(void)
//
{
c3_y* end_y;
pok_w = 0;
bal_w = 0;
buf_y = u3_newt_encode(u3k(a), &len_w);
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = _newt_encode(u3k(a), &len_w);
end_y = c3_malloc(1);
end_y[0] = buf_y[len_w - 1];
u3_newt_decode(&mot_u, buf_y, len_w - 1);
if ( 0 != pok_w ) {
if ( 0 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt smol fail (c)\n");
exit(1);
}
u3_newt_decode(&mot_u, end_y, 1);
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt smol fail (d)\n");
exit(1);
}
@ -110,10 +129,9 @@ _test_newt_smol(void)
c3_y* haf_y;
c3_w haf_w, dub_w;
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = u3_newt_encode(u3k(a), &len_w);
buf_y = _newt_encode(u3k(a), &len_w);
dub_w = 2 * len_w;
haf_w = len_w / 2;
@ -130,14 +148,14 @@ _test_newt_smol(void)
u3_newt_decode(&mot_u, buf_y, dub_w - haf_w);
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt smol fail (e)\n");
exit(1);
}
u3_newt_decode(&mot_u, haf_y, haf_w);
if ( 2 != pok_w ) {
if ( 2 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt smol fail (f)\n");
exit(1);
}
@ -159,19 +177,16 @@ _test_newt_vast(void)
c3_y* buf_y;
memset(&mot_u, 0, sizeof(u3_moat));
mot_u.pok_f = _moat_poke_cb;
mot_u.bal_f = _moat_bail_cb;
// one message one buffer
//
{
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = u3_newt_encode(u3k(a), &len_w);
buf_y = _newt_encode(u3k(a), &len_w);
u3_newt_decode(&mot_u, buf_y, len_w);
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (a)\n");
exit(1);
}
@ -180,10 +195,9 @@ _test_newt_vast(void)
// two messages one buffer
//
{
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = u3_newt_encode(u3k(a), &len_w);
buf_y = _newt_encode(u3k(a), &len_w);
buf_y = c3_realloc(buf_y, 2 * len_w);
memcpy(buf_y + len_w, buf_y, len_w);
@ -191,7 +205,7 @@ _test_newt_vast(void)
u3_newt_decode(&mot_u, buf_y, len_w);
if ( 2 != pok_w ) {
if ( 2 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (b)\n");
exit(1);
}
@ -200,10 +214,9 @@ _test_newt_vast(void)
// one message many buffers
//
{
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = u3_newt_encode(u3k(a), &len_w);
buf_y = _newt_encode(u3k(a), &len_w);
{
c3_y* cop_y = c3_malloc(len_w);
@ -216,7 +229,7 @@ _test_newt_vast(void)
c3_y* end_y = c3_malloc(1);
end_y[0] = cop_y[haf_w];
if ( 0 != pok_w ) {
if ( 0 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (c) %u\n", haf_w);
exit(1);
}
@ -228,7 +241,7 @@ _test_newt_vast(void)
c3_free(cop_y);
}
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (d)\n");
exit(1);
}
@ -240,10 +253,9 @@ _test_newt_vast(void)
c3_y* haf_y;
c3_w haf_w, dub_w;
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = u3_newt_encode(u3k(a), &len_w);
buf_y = _newt_encode(u3k(a), &len_w);
dub_w = 2 * len_w;
haf_w = len_w / 2;
@ -260,14 +272,14 @@ _test_newt_vast(void)
u3_newt_decode(&mot_u, buf_y, dub_w - haf_w);
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (e)\n");
exit(1);
}
u3_newt_decode(&mot_u, haf_y, haf_w);
if ( 2 != pok_w ) {
if ( 2 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (f)\n");
exit(1);
}
@ -278,10 +290,9 @@ _test_newt_vast(void)
{
c3_w dub_w;
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = u3_newt_encode(u3k(a), &len_w);
buf_y = _newt_encode(u3k(a), &len_w);
dub_w = 2 * len_w;
@ -301,7 +312,7 @@ _test_newt_vast(void)
c3_y* end_y = c3_malloc(1);
end_y[0] = cop_y[haf_w];
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (g) %u\n", haf_w);
exit(1);
}
@ -313,7 +324,7 @@ _test_newt_vast(void)
c3_free(cop_y);
}
if ( 2 != pok_w ) {
if ( 2 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (h)\n");
exit(1);
}

View File

@ -413,18 +413,19 @@ _daemon_socket_connect(uv_stream_t *sock, int status)
if ( u3K.cli_u == 0 ) {
u3K.cli_u = c3_malloc(sizeof(u3_moor));
mor_u = u3K.cli_u;
mor_u->vod_p = 0;
mor_u->ptr_v = 0;
mor_u->nex_u = 0;
}
else {
for (mor_u = u3K.cli_u; mor_u->nex_u; mor_u = mor_u->nex_u);
mor_u->nex_u = c3_malloc(sizeof(u3_moor));
mor_u->nex_u->vod_p = mor_u;
mor_u->nex_u->ptr_v = mor_u;
mor_u = mor_u->nex_u;
mor_u->nex_u = 0;
}
uv_timer_init(u3L, &mor_u->tim_u);
uv_pipe_init(u3L, &mor_u->pyp_u, 0);
mor_u->pok_f = _daemon_fate;
mor_u->bal_f = _daemon_bail;
@ -841,7 +842,7 @@ _boothack_cb(uv_connect_t* con_u, c3_i sas_i)
else {
u3_noun dom = u3nc(c3__doom, _boothack_doom());
u3_atom mat = u3ke_jam(dom);
u3_newt_write(moj_u, mat, 0);
u3_newt_write(moj_u, mat);
c3_free(con_u);
@ -866,6 +867,7 @@ _daemon_loop_init()
u3_moor* mor_u = c3_malloc(sizeof(u3_moor));
uv_connect_t* con_u = c3_malloc(sizeof(uv_connect_t));
con_u->data = mor_u;
uv_timer_init(u3L, &mor_u->tim_u);
uv_pipe_init(u3L, &mor_u->pyp_u, 0);
uv_pipe_connect(con_u, &mor_u->pyp_u, u3K.soc_c, _boothack_cb);
}

View File

@ -657,9 +657,8 @@ _lord_writ_jam(u3_lord* god_u, u3_writ* wit_u)
static void
_lord_writ_send(u3_lord* god_u, u3_writ* wit_u)
{
_lord_writ_jam(god_u, wit_u);
u3_newt_write(&god_u->inn_u, wit_u->mat, 0);
u3_newt_write(&god_u->inn_u, wit_u->mat);
wit_u->mat = 0;
// ignore subprocess error on shutdown
@ -915,6 +914,7 @@ u3_lord_init(c3_c* pax_c, c3_w wag_w, c3_d key_d[4], u3_lord_cb cb_u)
arg_c[5] = 0;
uv_pipe_init(u3L, &god_u->inn_u.pyp_u, 0);
uv_timer_init(u3L, &god_u->out_u.tim_u);
uv_pipe_init(u3L, &god_u->out_u.pyp_u, 0);
god_u->cod_u[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
@ -943,12 +943,13 @@ u3_lord_init(c3_c* pax_c, c3_w wag_w, c3_d key_d[4], u3_lord_cb cb_u)
// start reading from proc
//
{
god_u->out_u.vod_p = god_u;
god_u->out_u.ptr_v = god_u;
god_u->out_u.pok_f = _lord_plea;
god_u->out_u.bal_f = _lord_bail;
// XX distinguish from out_u.bal_f ?
//
god_u->inn_u.ptr_v = god_u;
god_u->inn_u.bal_f = _lord_bail;
u3_newt_read(&god_u->out_u);

View File

@ -31,343 +31,351 @@
#include "all.h"
#include "vere/vere.h"
/* _newt_gain_meat(): add a block to an existing message
/* _newt_mess_head(): await next msg header.
*/
static void
_newt_gain_meat(u3_moat* mot_u)
_newt_mess_head(u3_mess* mes_u)
{
c3_assert( 0 != mot_u->mes_u );
// create block
//
u3_meat* met_u = c3_malloc(mot_u->len_d + (c3_d) sizeof(u3_meat));
met_u->nex_u = 0;
met_u->len_d = mot_u->len_d;
memcpy(met_u->hun_y, mot_u->rag_y, mot_u->len_d);
// enqueue block
//
if ( !mot_u->mes_u->meq_u ) {
mot_u->mes_u->meq_u = mot_u->mes_u->qem_u = met_u;
}
else {
mot_u->mes_u->qem_u->nex_u = met_u;
mot_u->mes_u->qem_u = met_u;
}
mot_u->mes_u->has_d += met_u->len_d;
// free consumed stray bytes
//
c3_free(mot_u->rag_y);
mot_u->len_d = 0;
mot_u->rag_y = 0;
mes_u->sat_e = u3_mess_head;
mes_u->hed_u.has_y = 0;
}
/* _newt_gain_mess(): begin parsing a new message
/* _newt_mess_tail(): await msg body.
*/
static void
_newt_gain_mess(u3_moat* mot_u)
_newt_mess_tail(u3_mess* mes_u, c3_d len_d)
{
c3_assert( 8ULL <= mot_u->len_d );
c3_assert( 0 == mot_u->mes_u );
u3_meat* met_u = c3_malloc(len_d + sizeof(*met_u));
met_u->nex_u = 0;
met_u->len_d = len_d;
c3_d nel_d = 0ULL;
mes_u->sat_e = u3_mess_tail;
mes_u->tal_u.has_d = 0;
mes_u->tal_u.met_u = met_u;
}
nel_d |= ((c3_d) mot_u->rag_y[0]) << 0ULL;
nel_d |= ((c3_d) mot_u->rag_y[1]) << 8ULL;
nel_d |= ((c3_d) mot_u->rag_y[2]) << 16ULL;
nel_d |= ((c3_d) mot_u->rag_y[3]) << 24ULL;
nel_d |= ((c3_d) mot_u->rag_y[4]) << 32ULL;
nel_d |= ((c3_d) mot_u->rag_y[5]) << 40ULL;
nel_d |= ((c3_d) mot_u->rag_y[6]) << 48ULL;
nel_d |= ((c3_d) mot_u->rag_y[7]) << 56ULL;
c3_assert( 0ULL != nel_d );
// very likely to be a bad write, we can't jam anything this big
//
if ( 0xFFFFFFFFULL < nel_d ) {
u3l_log("newt: %d warn: large read %" PRIu64 "\r\n",
getpid(),
nel_d);
}
mot_u->len_d -= 8ULL;
mot_u->mes_u = c3_malloc(sizeof(u3_mess));
mot_u->mes_u->len_d = nel_d;
mot_u->mes_u->has_d = 0;
mot_u->mes_u->meq_u = mot_u->mes_u->qem_u = 0;
if ( 0ULL == mot_u->len_d ) {
c3_free(mot_u->rag_y);
mot_u->rag_y = 0;
/* _newt_meat_plan(): enqueue complete msg.
*/
static void
_newt_meat_plan(u3_moat* mot_u, u3_meat* met_u)
{
if ( mot_u->ent_u ) {
mot_u->ent_u->nex_u = met_u;
mot_u->ent_u = met_u;
}
else {
// remove consumed length from stray bytes
//
c3_y* buf_y = c3_malloc(mot_u->len_d);
memcpy(buf_y, mot_u->rag_y + 8, mot_u->len_d);
c3_free(mot_u->rag_y);
mot_u->rag_y = buf_y;
mot_u->ent_u = mot_u->ext_u = met_u;
}
}
/* _newt_poke_mess(): pass message to [mot_u] callback
/* _newt_meat_poke(): deliver completed msg.
*/
static void
_newt_poke_mess(u3_moat* mot_u)
_newt_meat_poke(u3_moat* mot_u, u3_meat* met_u)
{
c3_assert( 0 != mot_u->mes_u );
c3_assert( mot_u->mes_u->has_d >= mot_u->mes_u->len_d );
u3_noun mat = u3i_bytes((c3_w)met_u->len_d, met_u->hun_y);
mot_u->pok_f(mot_u->ptr_v, mat);
c3_free(met_u);
}
c3_d len_d = mot_u->mes_u->len_d;
c3_y* buf_y = c3_malloc(len_d);
c3_d pat_d = 0;
u3_meat* met_u;
/* _newt_meat_next_sync(): deliver completed msgs, synchronously.
*/
static void
_newt_meat_next_sync(u3_moat* mot_u)
{
u3_meat* met_u = mot_u->ext_u;
// we should have just cleared this
//
c3_assert(!mot_u->rag_y);
c3_assert(!mot_u->len_d);
while ( met_u ) {
u3_meat* nex_u = met_u->nex_u;
_newt_meat_poke(mot_u, met_u);
met_u = nex_u;
}
// collect queue blocks, cleaning them up; return any spare meat
// to the rag.
//
{
met_u = mot_u->mes_u->meq_u;
mot_u->ent_u = mot_u->ext_u = 0;
}
while ( met_u && (pat_d < len_d) ) {
u3_meat* nex_u = met_u->nex_u;
c3_d end_d = (pat_d + met_u->len_d);
c3_d eat_d;
c3_d rem_d;
static void
_newt_meat_next_cb(uv_timer_t* tim_u);
eat_d = c3_min(len_d, end_d) - pat_d;
memcpy(buf_y + pat_d, met_u->hun_y, eat_d);
pat_d += eat_d;
/* _newt_meat_next(): deliver completed msgs, asynchronously.
*/
static void
_newt_meat_next(u3_moat* mot_u)
{
u3_meat* met_u = mot_u->ext_u;
rem_d = (met_u->len_d - eat_d);
if ( rem_d ) {
mot_u->rag_y = c3_malloc(rem_d);
memcpy(mot_u->rag_y, met_u->hun_y + eat_d, rem_d);
mot_u->len_d = rem_d;
if ( met_u ) {
mot_u->ext_u = met_u->nex_u;
// one: unless we got a bad length, this has to be the last
// block in the message.
//
// two: bad data on a newt channel can cause us to assert.
// that's actually the right thing for a private channel.
///
c3_assert(0 == nex_u);
}
c3_free(met_u);
met_u = nex_u;
if ( mot_u->ext_u ) {
uv_timer_start(&mot_u->tim_u, _newt_meat_next_cb, 0, 0);
}
else {
mot_u->ent_u = 0;
}
c3_assert(pat_d == len_d);
// clear the message
//
c3_free(mot_u->mes_u);
mot_u->mes_u = 0;
_newt_meat_poke(mot_u, met_u);
}
}
// build and send the object
//
{
u3_noun mat = u3i_bytes((c3_w)len_d, buf_y);
mot_u->pok_f(mot_u->vod_p, mat);
}
c3_free(buf_y);
/* _newt_meat_next_cb(): handle next msg after timer.
*/
static void
_newt_meat_next_cb(uv_timer_t* tim_u)
{
u3_moat* mot_u = tim_u->data;
_newt_meat_next(mot_u);
}
/* u3_newt_decode(): decode a (partial) length-prefixed byte buffer
*/
void
u3_newt_decode(u3_moat* mot_u, c3_y* buf_y, c3_w len_w)
u3_newt_decode(u3_moat* mot_u, c3_y* buf_y, c3_d len_d)
{
// grow read buffer by `len_d` bytes
u3_mess* mes_u = &mot_u->mes_u;
while ( len_d ) {
switch( mes_u->sat_e ) {
// read up to 8 length bytes as needed
//
case u3_mess_head: {
c3_y* len_y = mes_u->hed_u.len_y;
c3_y has_y = mes_u->hed_u.has_y;
c3_y ned_y = 8 - has_y;
c3_y cop_y = c3_min(ned_y, len_d);
memcpy(len_y + has_y, buf_y, cop_y);
buf_y += cop_y;
len_d -= cop_y;
ned_y -= cop_y;
// moar bytes needed, yield
//
if ( ned_y ) {
mes_u->hed_u.has_y = (has_y + cop_y);
}
// length known, allocate message
//
else {
c3_d met_d = (((c3_d)len_y[0]) << 0)
| (((c3_d)len_y[1]) << 8)
| (((c3_d)len_y[2]) << 16)
| (((c3_d)len_y[3]) << 24)
| (((c3_d)len_y[4]) << 32)
| (((c3_d)len_y[5]) << 40)
| (((c3_d)len_y[6]) << 48)
| (((c3_d)len_y[7]) << 56);
// must be non-zero, only 32 bits supported
//
c3_assert( met_d );
c3_assert( 0xFFFFFFFFULL > met_d );
// await body
//
_newt_mess_tail(mes_u, met_d);
}
} break;
case u3_mess_tail: {
u3_meat* met_u = mes_u->tal_u.met_u;
c3_d has_d = mes_u->tal_u.has_d;
c3_d ned_d = met_u->len_d - has_d;
c3_d cop_d = c3_min(ned_d, len_d);
memcpy(met_u->hun_y + has_d, buf_y, cop_d);
buf_y += cop_d;
len_d -= cop_d;
ned_d -= cop_d;
// moar bytes needed, yield
//
if ( ned_d ) {
mes_u->tal_u.has_d = (has_d + cop_d);
}
// message completed, enqueue and await next header
//
else {
_newt_meat_plan(mot_u, met_u);
_newt_mess_head(mes_u);
}
} break;
}
}
}
/* _newt_read(): handle async read result.
*/
static c3_o
_newt_read(u3_moat* mot_u,
ssize_t len_i,
const uv_buf_t* buf_u)
{
if ( 0 > len_i ) {
c3_free(buf_u->base);
uv_read_stop((uv_stream_t*)&mot_u->pyp_u);
fprintf(stderr, "newt: read failed %s\r\n", uv_strerror(len_i));
mot_u->bal_f(mot_u->ptr_v, uv_strerror(len_i));
return c3n;
}
// EAGAIN/EWOULDBLOCK
//
if ( mot_u->rag_y ) {
// XX check SIZE_MAX?
//
c3_d nel_d = mot_u->len_d + len_w;
mot_u->rag_y = c3_realloc(mot_u->rag_y, nel_d);
memcpy(mot_u->rag_y + mot_u->len_d, buf_y, len_w);
mot_u->len_d = nel_d;
c3_free(buf_y);
else if ( 0 == len_i ) {
c3_free(buf_u->base);
return c3n;
}
else {
mot_u->rag_y = buf_y;
mot_u->len_d = (c3_d)len_w;
}
// process stray bytes, trying to create a new message
// or add a block to an existing one.
//
while ( mot_u->rag_y ) {
// no message
//
if ( !mot_u->mes_u ) {
// but enough stray bytes to start one
//
if ( 8ULL <= mot_u->len_d ) {
_newt_gain_mess(mot_u);
}
else {
break;
}
}
else {
// there is a live message, add a block to the queue.
//
_newt_gain_meat(mot_u);
// check for message completions
//
if ( mot_u->mes_u->has_d >= mot_u->mes_u->len_d ) {
_newt_poke_mess(mot_u);
}
}
u3_newt_decode(mot_u, (c3_y*)buf_u->base, (c3_d)len_i);
c3_free(buf_u->base);
return c3y;
}
}
/* _raft_alloc(): libuv-style allocator for raft.
/* _newt_read_sync_cb(): async read callback, sync msg delivery.
*/
static void
_newt_alloc(uv_handle_t* had_u,
size_t len_i,
uv_buf_t* buf_u)
_newt_read_sync_cb(uv_stream_t* str_u,
ssize_t len_i,
const uv_buf_t* buf_u)
{
void* ptr_v = c3_malloc(len_i);
u3_moat* mot_u = (void *)str_u;
*buf_u = uv_buf_init(ptr_v, len_i);
if ( c3y == _newt_read(mot_u, len_i, buf_u) ) {
_newt_meat_next_sync(mot_u);
}
}
/* _newt_read_cb(): stream input callback.
/* _newt_read_cb(): async read callback, async msg delivery.
*/
void
static void
_newt_read_cb(uv_stream_t* str_u,
ssize_t len_i,
const uv_buf_t* buf_u)
{
u3_moat* mot_u = (void *)str_u;
if ( 0 > len_i ) {
c3_free(buf_u->base);
uv_read_stop(str_u);
mot_u->bal_f(mot_u->vod_p, uv_strerror(len_i));
}
// EAGAIN/EWOULDBLOCK
//
else if ( 0 == len_i ) {
c3_free(buf_u->base);
}
else {
u3_newt_decode(mot_u, (c3_y*)buf_u->base, (c3_w)len_i);
if ( c3y == _newt_read(mot_u, len_i, buf_u) ) {
_newt_meat_next(mot_u);
}
}
/* u3_newt_read(): start stream reading.
/* _newt_alloc(): libuv-style allocator.
*/
static void
_newt_alloc(uv_handle_t* had_u,
size_t len_i,
uv_buf_t* buf_u)
{
// XX pick an appropriate size
//
void* ptr_v = c3_malloc(len_i);
*buf_u = uv_buf_init(ptr_v, len_i);
}
static void
_newt_read_init(u3_moat* mot_u, uv_read_cb read_cb_f)
{
// zero-initialize completed msg queue
//
mot_u->ent_u = mot_u->ext_u = 0;
// store pointer for queue timer callback
//
mot_u->tim_u.data = mot_u;
// await next msg header
//
_newt_mess_head(&mot_u->mes_u);
{
c3_i sas_i;
if ( 0 != (sas_i = uv_read_start((uv_stream_t*)&mot_u->pyp_u,
_newt_alloc,
read_cb_f)) )
{
fprintf(stderr, "newt: read failed %s\r\n", uv_strerror(sas_i));
mot_u->bal_f(mot_u->ptr_v, uv_strerror(sas_i));
}
}
}
/* u3_newt_read_sync(): start reading; multiple msgs synchronous.
*/
void
u3_newt_read_sync(u3_moat* mot_u)
{
_newt_read_init(mot_u, _newt_read_sync_cb);
}
/* u3_newt_read(): start reading; each msg asynchronous.
*/
void
u3_newt_read(u3_moat* mot_u)
{
c3_i err_i;
mot_u->mes_u = 0;
mot_u->len_d = 0;
mot_u->rag_y = 0;
err_i = uv_read_start((uv_stream_t*) &mot_u->pyp_u,
_newt_alloc,
_newt_read_cb);
if ( err_i != 0 ) {
mot_u->bal_f(mot_u, uv_strerror(err_i));
}
_newt_read_init(mot_u, _newt_read_cb);
}
/* u3_write_t: write request for newt
/* n_req: write request for newt
*/
typedef struct _u3_write_t {
typedef struct _n_req {
uv_write_t wri_u;
u3_mojo* moj_u;
void* vod_p;
c3_y* buf_y;
} u3_write_t;
c3_y buf_y[0];
} n_req;
/* _newt_write_cb(): generic write callback.
*/
static void
_newt_write_cb(uv_write_t* wri_u, c3_i sas_i)
{
u3_write_t* req_u = (struct _u3_write_t*)wri_u;
void* vod_p = req_u->vod_p;
u3_mojo* moj_u = req_u->moj_u;
n_req* req_u = (n_req*)wri_u;
u3_mojo* moj_u = req_u->moj_u;
c3_free(req_u->buf_y);
c3_free(req_u);
if ( 0 != sas_i ) {
u3l_log("newt: bad write %d\r\n", sas_i);
moj_u->bal_f(vod_p, uv_strerror(sas_i));
fprintf(stderr, "newt: write failed %s\r\n", uv_strerror(sas_i));
moj_u->bal_f(moj_u->ptr_v, uv_strerror(sas_i));
}
}
/* u3_newt_encode(): encode an atom to a length-prefixed byte buffer
*/
c3_y*
u3_newt_encode(u3_atom mat, c3_w* len_w)
{
c3_w met_w = u3r_met(3, mat);
c3_y* buf_y;
*len_w = 8 + met_w;
buf_y = c3_malloc(*len_w);
// write header; c3_d is futureproofing
//
buf_y[0] = ((met_w >> 0) & 0xff);
buf_y[1] = ((met_w >> 8) & 0xff);
buf_y[2] = ((met_w >> 16) & 0xff);
buf_y[3] = ((met_w >> 24) & 0xff);
buf_y[4] = buf_y[5] = buf_y[6] = buf_y[7] = 0;
u3r_bytes(0, met_w, buf_y + 8, mat);
u3z(mat);
return buf_y;
}
/* u3_newt_write(): write atom to stream; free atom.
*/
void
u3_newt_write(u3_mojo* moj_u,
u3_atom mat,
void* vod_p)
u3_newt_write(u3_mojo* moj_u, u3_atom mat)
{
u3_write_t* req_u = c3_malloc(sizeof(*req_u));
c3_w len_w;
c3_y* buf_y = u3_newt_encode(mat, &len_w);
uv_buf_t buf_u;
c3_i err_i;
c3_w len_w = u3r_met(3, mat);
n_req* req_u = c3_malloc(8 + len_w + sizeof(*req_u));
req_u->moj_u = moj_u;
req_u->buf_y = buf_y;
buf_u = uv_buf_init((c3_c*)buf_y, len_w);
if ( 0 != (err_i = uv_write((uv_write_t*)req_u,
(uv_stream_t*)&moj_u->pyp_u,
&buf_u, 1,
_newt_write_cb)) )
// write header; c3_d is futureproofing
//
req_u->buf_y[0] = ((len_w >> 0) & 0xff);
req_u->buf_y[1] = ((len_w >> 8) & 0xff);
req_u->buf_y[2] = ((len_w >> 16) & 0xff);
req_u->buf_y[3] = ((len_w >> 24) & 0xff);
req_u->buf_y[4] = req_u->buf_y[5] = req_u->buf_y[6] = req_u->buf_y[7] = 0;
// write payload
//
u3r_bytes(0, len_w, req_u->buf_y + 8, mat);
u3z(mat);
{
moj_u->bal_f(moj_u, uv_strerror(err_i));
uv_buf_t buf_u = uv_buf_init((c3_c*)req_u->buf_y, 8 + len_w);
c3_i sas_i;
if ( 0 != (sas_i = uv_write(&req_u->wri_u,
(uv_stream_t*)&moj_u->pyp_u,
&buf_u, 1,
_newt_write_cb)) )
{
c3_free(req_u);
fprintf(stderr, "newt: write failed %s\r\n", uv_strerror(sas_i));
moj_u->bal_f(moj_u->ptr_v, uv_strerror(sas_i));
}
}
}

View File

@ -574,9 +574,6 @@ _pier_play_read(u3_play* pay_u)
}
}
c3_o
u3_pier_pack(u3_pier* pir_u);
/* _pier_play(): send a batch of events to the worker for log replay.
*/
static void

View File

@ -1401,6 +1401,9 @@ _term_io_kick(u3_auto* car_u, u3_noun wir, u3_noun cad)
//
case c3__pack: {
ret_o = c3y;
// XX would be
//
// u3_assure(u3_pier_pack(car_u->pir_u));
} break;
}
}

View File

@ -40,7 +40,7 @@ _newt_fail(void* vod_p, const c3_c* wut_c)
static void
_newt_send(u3_noun pel)
{
u3_newt_write(&out_u, u3ke_jam(pel), 0);
u3_newt_write(&out_u, u3ke_jam(pel));
}
/* _newt_send_slog(): send hint output (hod is [priority tank]).
@ -144,6 +144,8 @@ main(c3_i argc, c3_c* argv[])
{
c3_i err_i;
err_i = uv_timer_init(lup_u, &inn_u.tim_u);
c3_assert(!err_i);
err_i = uv_pipe_init(lup_u, &inn_u.pyp_u, 0);
c3_assert(!err_i);
uv_pipe_open(&inn_u.pyp_u, inn_i);
@ -151,15 +153,18 @@ main(c3_i argc, c3_c* argv[])
err_i = uv_pipe_init(lup_u, &out_u.pyp_u, 0);
c3_assert(!err_i);
uv_pipe_open(&out_u.pyp_u, out_i);
uv_stream_set_blocking((uv_stream_t*)&out_u.pyp_u, 1);
}
// set up writing
//
out_u.ptr_v = &u3V;
out_u.bal_f = _newt_fail;
// set up reading
//
inn_u.vod_p = &u3V;
inn_u.ptr_v = &u3V;
inn_u.pok_f = _newt_writ;
inn_u.bal_f = _newt_fail;
@ -170,54 +175,7 @@ main(c3_i argc, c3_c* argv[])
u3V.sen_d = u3V.dun_d = u3m_boot(dir_c);
if ( eve_d ) {
c3_o roc_o;
c3_c nam_c[8193];
snprintf(nam_c, 8192, "%s/.urb/roc/%" PRIu64 ".jam", u3V.dir_c, eve_d);
struct stat buf_b;
c3_i fid_i = open(nam_c, O_RDONLY, 0644);
if ( (fid_i < 0) || (fstat(fid_i, &buf_b) < 0) ) {
fprintf(stderr, "serf: rock: %s not found\r\n", nam_c);
roc_o = c3n;
}
else {
fprintf(stderr, "serf: rock: %s found\r\n", nam_c);
roc_o = c3y;
}
close(fid_i);
if ( c3y == roc_o ) {
if ( c3n == u3e_hold() ) {
fprintf(stderr, "serf: unable to backup checkpoint\r\n");
}
else {
u3m_wipe();
if ( c3n == u3m_rock_load(u3V.dir_c, eve_d) ) {
fprintf(stderr, "serf: compaction failed, restoring checkpoint\r\n");
if ( c3n == u3e_fall() ) {
fprintf(stderr, "serf: unable to restore checkpoint\r\n");
c3_assert(0);
}
}
if ( c3n == u3e_drop() ) {
fprintf(stderr, "serf: warning: orphaned backup checkpoint file\r\n");
}
fprintf(stderr, "serf (%" PRIu64 "): compacted loom\r\n", eve_d);
u3V.sen_d = u3V.dun_d = eve_d;
// save now for flexibility
//
u3e_save();
}
}
u3_serf_unpack(&u3V, eve_d);
}
}
@ -238,7 +196,7 @@ main(c3_i argc, c3_c* argv[])
// start reading
//
u3_newt_read(&inn_u);
u3_newt_read_sync(&inn_u);
// enter loop
//

View File

@ -290,9 +290,7 @@ _serf_static_grab(void)
static void
_serf_pack(u3_serf* sef_u)
{
// skip for now
//
// _serf_static_grab();
_serf_static_grab();
u3l_log("serf (%" PRIu64 "): compacting loom\r\n", sef_u->dun_d);
@ -301,39 +299,11 @@ _serf_pack(u3_serf* sef_u)
return;
}
if ( c3n == u3e_hold() ) {
u3l_log("serf: unable to backup checkpoint\r\n");
return;
}
u3m_wipe();
if ( c3n == u3m_rock_load(sef_u->dir_c, sef_u->dun_d) ) {
u3l_log("serf: compaction failed, restoring checkpoint\r\n");
if ( c3n == u3e_fall() ) {
fprintf(stderr, "serf: unable to restore checkpoint\r\n");
c3_assert(0);
}
}
if ( c3n == u3e_drop() ) {
u3l_log("serf: warning: orphaned backup checkpoint file\r\n");
}
// leave these for now
//
// if ( c3n == u3m_rock_drop(sef_u->dir_c, sef_u->dun_d) ) {
// u3l_log("serf: warning: orphaned state file\r\n");
// }
u3_serf_unpack(sef_u, sef_u->dun_d);
u3l_log("serf (%" PRIu64 "): compacted loom\r\n", sef_u->dun_d);
_serf_static_grab();
// save now for flexibility
//
u3e_save();
}
/* u3_serf_post(): update serf state post-writ.
@ -346,7 +316,7 @@ u3_serf_post(u3_serf* sef_u)
sef_u->rec_o = c3n;
}
// XX this runs on replay too
// XX this runs on replay too, |mass s/b elsewhere
//
if ( c3y == sef_u->mut_o ) {
sef_u->mut_o = c3n;
@ -432,14 +402,11 @@ _serf_sure_feck(u3_serf* sef_u, c3_w pre_w, u3_noun vir)
if ( (pre_w > low_w) && !(pos_w > low_w) ) {
// XX set flag(s) in u3V so we don't repeat endlessly?
// XX pack here too?
//
pac_o = c3y;
rec_o = c3y;
pri = 1;
}
else if ( (pre_w > hig_w) && !(pos_w > hig_w) ) {
pac_o = c3y;
rec_o = c3y;
pri = 0;
}
@ -453,12 +420,6 @@ _serf_sure_feck(u3_serf* sef_u, c3_w pre_w, u3_noun vir)
rec_o = c3y;
}
// pack every 20K events
//
if ( 0 == (sef_u->dun_d % 20000ULL) ) {
pac_o = c3y;
}
// notify daemon of memory pressure via "fake" effect
//
if ( u3_none != pri ) {
@ -488,6 +449,24 @@ _serf_sure_core(u3_serf* sef_u, u3_noun cor)
sef_u->mut_o = c3y;
}
/* _serf_sure(): event succeeded, save state and process effects.
*/
static u3_noun
_serf_sure(u3_serf* sef_u, c3_w pre_w, u3_noun par)
{
// vir/(list ovum) list of effects
// cor/arvo arvo core
//
u3_noun vir, cor;
u3x_cell(par, &vir, &cor);
_serf_sure_core(sef_u, u3k(cor));
vir = _serf_sure_feck(sef_u, pre_w, u3k(vir));
u3z(par);
return vir;
}
/* _serf_make_crud():
*/
static u3_noun
@ -572,34 +551,23 @@ static u3_noun
_serf_work(u3_serf* sef_u, u3_noun job)
{
u3_noun gon;
c3_w pre_w = u3a_open(u3R);
// %work must be performed against an extant kernel
//
c3_assert( 0 != sef_u->mug_l);
c3_w pre_w = u3a_open(u3R);
// event numbers must be continuous
//
c3_assert( sef_u->sen_d == sef_u->dun_d);
sef_u->sen_d++;
gon = _serf_poke(sef_u, "work", job);
gon = _serf_poke(sef_u, "work", job); // retain
// event accepted
//
if ( u3_blip == u3h(gon) ) {
// vir/(list ovum) list of effects
// cor/arvo arvo core
//
u3_noun vir, cor;
u3x_trel(gon, 0, &vir, &cor);
_serf_sure_core(sef_u, u3k(cor));
vir = _serf_sure_feck(sef_u, pre_w, u3k(vir));
u3_noun vir = _serf_sure(sef_u, pre_w, u3k(u3t(gon)));
u3z(gon); u3z(job);
return u3nc(c3__done, u3nt(u3i_chubs(1, &sef_u->dun_d),
u3i_words(1, &sef_u->mug_l),
sef_u->mug_l,
vir));
}
// event rejected
@ -609,27 +577,20 @@ _serf_work(u3_serf* sef_u, u3_noun job)
//
u3_noun dud = u3k(gon);
// XX reclaim/pack on %meme first?
// XX reclaim on %meme first?
//
job = _serf_make_crud(job, dud);
gon = _serf_poke(sef_u, "crud", u3k(job));
gon = _serf_poke(sef_u, "crud", job); // retain
// error notification accepted
//
if ( u3_blip == u3h(gon) ) {
// vir/(list ovum) list of effects
// cor/arvo arvo core
//
u3_noun vir, cor;
u3x_trel(gon, 0, &vir, &cor);
_serf_sure_core(sef_u, u3k(cor));
vir = _serf_sure_feck(sef_u, pre_w, u3k(vir));
u3_noun vir = _serf_sure(sef_u, pre_w, u3k(u3t(gon)));
u3z(gon); u3z(dud);
return u3nc(c3__swap, u3nq(u3i_chubs(1, &sef_u->dun_d),
u3i_words(1, &sef_u->mug_l),
sef_u->mug_l,
job,
vir));
}
@ -638,7 +599,7 @@ _serf_work(u3_serf* sef_u, u3_noun job)
else {
sef_u->sen_d = sef_u->dun_d;
// XX reclaim/pack on %meme ?
// XX reclaim on %meme ?
//
u3z(job);
@ -673,6 +634,10 @@ u3_serf_work(u3_serf* sef_u, u3_noun job)
u3t_event_trace(lab_c, 'B');
}
// %work must be performed against an extant kernel
//
c3_assert( 0 != sef_u->mug_l);
pro = u3nc(c3__work, _serf_work(sef_u, job));
if ( tac_t ) {
@ -778,7 +743,7 @@ _serf_play_list(u3_serf* sef_u, u3_noun eve)
_serf_sure_core(sef_u, u3k(cor));
// process effects to set pack/reclaim flags
// process effects to set u3_serf_post flags
//
u3z(_serf_sure_feck(sef_u, pre_w, u3k(vir)));
@ -802,21 +767,20 @@ _serf_play_list(u3_serf* sef_u, u3_noun eve)
u3z(gon);
// XX reclaim/pack on meme
// XX retry?
// XX reclaim on meme ?
//
// send failure notification
//
u3z(vev);
return u3nc(c3__bail, u3nt(u3i_chubs(1, &sef_u->dun_d),
u3i_words(1, &sef_u->mug_l),
sef_u->mug_l,
dud));
}
}
u3z(vev);
return u3nc(c3__done, u3i_words(1, &sef_u->mug_l));
return u3nc(c3__done, sef_u->mug_l);
}
/* u3_serf_play(): apply event list, producing status.
@ -960,6 +924,8 @@ u3_serf_live(u3_serf* sef_u, u3_noun com, u3_noun* ret)
return c3y;
}
// NB: the %pack $writ only saves the rock, it doesn't load it
//
case c3__pack: {
c3_d eve_d;
@ -1092,8 +1058,68 @@ _serf_ripe(u3_serf* sef_u)
? 0
: u3r_mug(u3A->roc);
return u3nc(u3i_chubs(1, &sef_u->dun_d),
u3i_words(1, &sef_u->mug_l));
return u3nc(u3i_chubs(1, &sef_u->dun_d), sef_u->mug_l);
}
/* u3_serf_unpack(): initialize from rock at [eve_d].
*/
void
u3_serf_unpack(u3_serf* sef_u, c3_d eve_d)
{
c3_o roc_o;
c3_c nam_c[8193];
snprintf(nam_c, 8192, "%s/.urb/roc/%" PRIu64 ".jam", sef_u->dir_c, eve_d);
struct stat buf_b;
c3_i fid_i = open(nam_c, O_RDONLY, 0644);
if ( (fid_i < 0) || (fstat(fid_i, &buf_b) < 0) ) {
fprintf(stderr, "serf: rock: %s not found\r\n", nam_c);
roc_o = c3n;
}
else {
fprintf(stderr, "serf: rock: %s found\r\n", nam_c);
roc_o = c3y;
}
close(fid_i);
if ( c3y == roc_o ) {
if ( c3n == u3e_hold() ) {
fprintf(stderr, "serf: unable to backup checkpoint\r\n");
}
else {
u3m_wipe();
if ( c3n == u3m_rock_load(sef_u->dir_c, eve_d) ) {
fprintf(stderr, "serf: compaction failed, restoring checkpoint\r\n");
if ( c3n == u3e_fall() ) {
fprintf(stderr, "serf: unable to restore checkpoint\r\n");
c3_assert(0);
}
}
if ( c3n == u3e_drop() ) {
fprintf(stderr, "serf: warning: orphaned backup checkpoint file\r\n");
}
// leave rocks on disk
//
// if ( c3n == u3m_rock_drop(sef_u->dir_c, sef_u->dun_d) ) {
// u3l_log("serf: warning: orphaned state file\r\n");
// }
fprintf(stderr, "serf (%" PRIu64 "): compacted loom\r\n", eve_d);
sef_u->sen_d = sef_u->dun_d = eve_d;
// save now for flexibility
//
u3e_save();
}
}
}
/* u3_serf_init(): init or restore, producing status.