newt: delivers inbound messages asynchronously

This commit is contained in:
Joe Bryan 2020-06-04 11:09:22 -07:00
parent a31f27a575
commit 3f26140cf4
6 changed files with 202 additions and 241 deletions

View File

@ -46,15 +46,6 @@
*/
typedef void (*u3_moor_bail)(void*, const c3_c* err_c);
/* u3_mess: blob message in process.
*/
typedef struct _u3_mess {
c3_d len_d; // blob length in bytes
c3_d has_d; // currently held
struct _u3_meat* meq_u; // exit of message queue
struct _u3_meat* qem_u; // entry of message queue
} u3_mess;
/* u3_meat: blob message block.
*/
typedef struct _u3_meat {
@ -63,6 +54,29 @@
c3_y hun_y[0];
} u3_meat;
/* u3_mess_type: in-process message type.
*/
typedef enum {
u3_mess_head = 0, // awaiting header
u3_mess_tail = 1 // awaiting body
} u3_mess_type;
/* u3_mess: blob message in process.
*/
typedef struct _u3_mess {
u3_mess_type sat_e; // msg type
union { //
struct { // awaiting header:
c3_y len_y[8]; // header bytes
c3_y has_y; // length
} hed_u; //
struct { // awaiting body
u3_meat* met_u; // partial message
c3_d has_d; // length
} tal_u; //
};
} u3_mess;
/* u3_moat: inbound message stream.
*/
typedef struct _u3_moat {
@ -70,9 +84,10 @@
u3_moor_bail bal_f; // error response function
void* ptr_v; // callback pointer
u3_moor_poke pok_f; // action function
struct _u3_mess* mes_u; // message in progress
c3_d len_d; // length of stray bytes
c3_y* rag_y; // stray bytes
u3_mess mes_u; // message in progress
uv_timer_t tim_u; // queue timer
u3_meat* ent_u; // entry of message queue
u3_meat* ext_u; // exit of message queue
} u3_moat;
/* u3_mojo: outbound message stream.
@ -85,14 +100,15 @@
/* u3_moor: two-way message stream, linked list */
typedef struct _u3_moor {
uv_pipe_t pyp_u;
u3_moor_bail bal_f;
void* ptr_v;
u3_moor_poke pok_f;
struct _u3_mess* mes_u;
c3_d len_d;
c3_y* rag_y;
struct _u3_moor* nex_u;
uv_pipe_t pyp_u; // duplex stream
u3_moor_bail bal_f; // error response function
void* ptr_v; // callback pointer
u3_moor_poke pok_f; // action function
u3_mess mes_u; // message in progress
uv_timer_t tim_u; // queue timer
u3_meat* ent_u; // entry of message queue
u3_meat* ext_u; // exit of message queue
struct _u3_moor* nex_u; // next in list
} u3_moor;
/* u3_dent: directory entry.
@ -1061,7 +1077,7 @@
/* u3_newt_decode(): decode a (partial) length-prefixed byte buffer
*/
void
u3_newt_decode(u3_moat* mot_u, c3_y* buf_y, c3_w len_w);
u3_newt_decode(u3_moat* mot_u, c3_y* buf_y, c3_d len_d);
/* u3_newt_write(): write atom to stream; free atom.
*/

View File

@ -10,9 +10,6 @@ _setup(void)
u3m_pave(c3y, c3n);
}
static c3_w pok_w;
static c3_w bal_w;
/* _newt_encode(): synchronous serialization into a single buffer, for test purposes
*/
static c3_y*
@ -38,17 +35,18 @@ _newt_encode(u3_atom mat, c3_w* len_w)
return buf_y;
}
static void
_moat_poke_cb(void* vod_p, u3_atom a)
static c3_w
_moat_length(u3_moat* mot_u)
{
pok_w++;
u3z(a);
}
u3_meat* met_u = mot_u->ext_u;
c3_w len_w = 0;
static void
_moat_bail_cb(void* vod_p, const c3_c* err_c)
{
bal_w++;
while ( met_u ) {
met_u = met_u->nex_u;
len_w++;
}
return len_w;
}
/* _test_newt_smol(): various scenarios with small messages
@ -64,19 +62,16 @@ _test_newt_smol(void)
c3_y* buf_y;
memset(&mot_u, 0, sizeof(u3_moat));
mot_u.pok_f = _moat_poke_cb;
mot_u.bal_f = _moat_bail_cb;
// one message one buffer
//
{
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = _newt_encode(u3k(a), &len_w);
u3_newt_decode(&mot_u, buf_y, len_w);
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt smol fail (a)\n");
exit(1);
}
@ -85,8 +80,7 @@ _test_newt_smol(void)
// two messages one buffer
//
{
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = _newt_encode(u3k(a), &len_w);
@ -96,7 +90,7 @@ _test_newt_smol(void)
u3_newt_decode(&mot_u, buf_y, len_w);
if ( 2 != pok_w ) {
if ( 2 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt smol fail (b)\n");
exit(1);
}
@ -106,8 +100,8 @@ _test_newt_smol(void)
//
{
c3_y* end_y;
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = _newt_encode(u3k(a), &len_w);
@ -116,14 +110,14 @@ _test_newt_smol(void)
u3_newt_decode(&mot_u, buf_y, len_w - 1);
if ( 0 != pok_w ) {
if ( 0 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt smol fail (c)\n");
exit(1);
}
u3_newt_decode(&mot_u, end_y, 1);
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt smol fail (d)\n");
exit(1);
}
@ -135,8 +129,7 @@ _test_newt_smol(void)
c3_y* haf_y;
c3_w haf_w, dub_w;
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = _newt_encode(u3k(a), &len_w);
@ -155,14 +148,14 @@ _test_newt_smol(void)
u3_newt_decode(&mot_u, buf_y, dub_w - haf_w);
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt smol fail (e)\n");
exit(1);
}
u3_newt_decode(&mot_u, haf_y, haf_w);
if ( 2 != pok_w ) {
if ( 2 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt smol fail (f)\n");
exit(1);
}
@ -184,19 +177,16 @@ _test_newt_vast(void)
c3_y* buf_y;
memset(&mot_u, 0, sizeof(u3_moat));
mot_u.pok_f = _moat_poke_cb;
mot_u.bal_f = _moat_bail_cb;
// one message one buffer
//
{
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = _newt_encode(u3k(a), &len_w);
u3_newt_decode(&mot_u, buf_y, len_w);
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (a)\n");
exit(1);
}
@ -205,8 +195,7 @@ _test_newt_vast(void)
// two messages one buffer
//
{
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = _newt_encode(u3k(a), &len_w);
@ -216,7 +205,7 @@ _test_newt_vast(void)
u3_newt_decode(&mot_u, buf_y, len_w);
if ( 2 != pok_w ) {
if ( 2 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (b)\n");
exit(1);
}
@ -225,8 +214,7 @@ _test_newt_vast(void)
// one message many buffers
//
{
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = _newt_encode(u3k(a), &len_w);
@ -241,7 +229,7 @@ _test_newt_vast(void)
c3_y* end_y = c3_malloc(1);
end_y[0] = cop_y[haf_w];
if ( 0 != pok_w ) {
if ( 0 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (c) %u\n", haf_w);
exit(1);
}
@ -253,7 +241,7 @@ _test_newt_vast(void)
c3_free(cop_y);
}
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (d)\n");
exit(1);
}
@ -265,8 +253,7 @@ _test_newt_vast(void)
c3_y* haf_y;
c3_w haf_w, dub_w;
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = _newt_encode(u3k(a), &len_w);
@ -285,14 +272,14 @@ _test_newt_vast(void)
u3_newt_decode(&mot_u, buf_y, dub_w - haf_w);
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (e)\n");
exit(1);
}
u3_newt_decode(&mot_u, haf_y, haf_w);
if ( 2 != pok_w ) {
if ( 2 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (f)\n");
exit(1);
}
@ -303,8 +290,7 @@ _test_newt_vast(void)
{
c3_w dub_w;
pok_w = 0;
bal_w = 0;
mot_u.ent_u = mot_u.ext_u = 0;
buf_y = _newt_encode(u3k(a), &len_w);
@ -326,7 +312,7 @@ _test_newt_vast(void)
c3_y* end_y = c3_malloc(1);
end_y[0] = cop_y[haf_w];
if ( 1 != pok_w ) {
if ( 1 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (g) %u\n", haf_w);
exit(1);
}
@ -338,7 +324,7 @@ _test_newt_vast(void)
c3_free(cop_y);
}
if ( 2 != pok_w ) {
if ( 2 != _moat_length(&mot_u) ) {
fprintf(stderr, "newt vast fail (h)\n");
exit(1);
}

View File

@ -425,6 +425,7 @@ _daemon_socket_connect(uv_stream_t *sock, int status)
mor_u->nex_u = 0;
}
uv_timer_init(u3L, &mor_u->tim_u);
uv_pipe_init(u3L, &mor_u->pyp_u, 0);
mor_u->pok_f = _daemon_fate;
mor_u->bal_f = _daemon_bail;
@ -866,6 +867,7 @@ _daemon_loop_init()
u3_moor* mor_u = c3_malloc(sizeof(u3_moor));
uv_connect_t* con_u = c3_malloc(sizeof(uv_connect_t));
con_u->data = mor_u;
uv_timer_init(u3L, &mor_u->tim_u);
uv_pipe_init(u3L, &mor_u->pyp_u, 0);
uv_pipe_connect(con_u, &mor_u->pyp_u, u3K.soc_c, _boothack_cb);
}

View File

@ -914,6 +914,7 @@ u3_lord_init(c3_c* pax_c, c3_w wag_w, c3_d key_d[4], u3_lord_cb cb_u)
arg_c[5] = 0;
uv_pipe_init(u3L, &god_u->inn_u.pyp_u, 0);
uv_timer_init(u3L, &god_u->out_u.tim_u);
uv_pipe_init(u3L, &god_u->out_u.pyp_u, 0);
god_u->cod_u[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;

View File

@ -31,209 +31,152 @@
#include "all.h"
#include "vere/vere.h"
/* _newt_gain_meat(): add a block to an existing message
/* _newt_mess_head(): await next msg header.
*/
static void
_newt_gain_meat(u3_moat* mot_u)
_newt_mess_head(u3_mess* mes_u)
{
c3_assert( 0 != mot_u->mes_u );
// create block
//
u3_meat* met_u = c3_malloc(mot_u->len_d + (c3_d) sizeof(u3_meat));
met_u->nex_u = 0;
met_u->len_d = mot_u->len_d;
memcpy(met_u->hun_y, mot_u->rag_y, mot_u->len_d);
// enqueue block
//
if ( !mot_u->mes_u->meq_u ) {
mot_u->mes_u->meq_u = mot_u->mes_u->qem_u = met_u;
}
else {
mot_u->mes_u->qem_u->nex_u = met_u;
mot_u->mes_u->qem_u = met_u;
}
mot_u->mes_u->has_d += met_u->len_d;
// free consumed stray bytes
//
c3_free(mot_u->rag_y);
mot_u->len_d = 0;
mot_u->rag_y = 0;
mes_u->sat_e = u3_mess_head;
mes_u->hed_u.has_y = 0;
}
/* _newt_gain_mess(): begin parsing a new message
/* _newt_mess_tail(): await msg body.
*/
static void
_newt_gain_mess(u3_moat* mot_u)
_newt_mess_tail(u3_mess* mes_u, c3_d len_d)
{
c3_assert( 8ULL <= mot_u->len_d );
c3_assert( 0 == mot_u->mes_u );
u3_meat* met_u = c3_malloc(len_d + sizeof(*met_u));
met_u->nex_u = 0;
met_u->len_d = len_d;
c3_d nel_d = 0ULL;
mes_u->sat_e = u3_mess_tail;
mes_u->tal_u.has_d = 0;
mes_u->tal_u.met_u = met_u;
}
nel_d |= ((c3_d) mot_u->rag_y[0]) << 0ULL;
nel_d |= ((c3_d) mot_u->rag_y[1]) << 8ULL;
nel_d |= ((c3_d) mot_u->rag_y[2]) << 16ULL;
nel_d |= ((c3_d) mot_u->rag_y[3]) << 24ULL;
nel_d |= ((c3_d) mot_u->rag_y[4]) << 32ULL;
nel_d |= ((c3_d) mot_u->rag_y[5]) << 40ULL;
nel_d |= ((c3_d) mot_u->rag_y[6]) << 48ULL;
nel_d |= ((c3_d) mot_u->rag_y[7]) << 56ULL;
c3_assert( 0ULL != nel_d );
// very likely to be a bad write, we can't jam anything this big
//
if ( 0xFFFFFFFFULL < nel_d ) {
fprintf(stderr, "newt: %d warn: large read %" PRIu64 "\r\n",
getpid(),
nel_d);
}
mot_u->len_d -= 8ULL;
mot_u->mes_u = c3_malloc(sizeof(u3_mess));
mot_u->mes_u->len_d = nel_d;
mot_u->mes_u->has_d = 0;
mot_u->mes_u->meq_u = mot_u->mes_u->qem_u = 0;
if ( 0ULL == mot_u->len_d ) {
c3_free(mot_u->rag_y);
mot_u->rag_y = 0;
/* _newt_meat_plan(): enqueue complete msg.
*/
static void
_newt_meat_plan(u3_moat* mot_u, u3_meat* met_u)
{
if ( mot_u->ent_u ) {
mot_u->ent_u->nex_u = met_u;
mot_u->ent_u = met_u;
}
else {
// remove consumed length from stray bytes
//
c3_y* buf_y = c3_malloc(mot_u->len_d);
memcpy(buf_y, mot_u->rag_y + 8, mot_u->len_d);
c3_free(mot_u->rag_y);
mot_u->rag_y = buf_y;
mot_u->ent_u = mot_u->ext_u = met_u;
}
}
/* _newt_poke_mess(): pass message to [mot_u] callback
static void
_newt_meat_next_cb(uv_timer_t* tim_u);
/* _newt_meat_poke(): deliver completed msg.
*/
static void
_newt_poke_mess(u3_moat* mot_u)
_newt_meat_poke(u3_moat* mot_u)
{
c3_assert( 0 != mot_u->mes_u );
c3_assert( mot_u->mes_u->has_d >= mot_u->mes_u->len_d );
u3_meat* met_u = mot_u->ext_u;
c3_d len_d = mot_u->mes_u->len_d;
c3_y* buf_y = c3_malloc(len_d);
c3_d pat_d = 0;
u3_meat* met_u;
if ( met_u ) {
mot_u->ext_u = met_u->nex_u;
// we should have just cleared this
//
c3_assert(!mot_u->rag_y);
c3_assert(!mot_u->len_d);
// collect queue blocks, cleaning them up; return any spare meat
// to the rag.
//
{
met_u = mot_u->mes_u->meq_u;
while ( met_u && (pat_d < len_d) ) {
u3_meat* nex_u = met_u->nex_u;
c3_d end_d = (pat_d + met_u->len_d);
c3_d eat_d;
c3_d rem_d;
eat_d = c3_min(len_d, end_d) - pat_d;
memcpy(buf_y + pat_d, met_u->hun_y, eat_d);
pat_d += eat_d;
rem_d = (met_u->len_d - eat_d);
if ( rem_d ) {
mot_u->rag_y = c3_malloc(rem_d);
memcpy(mot_u->rag_y, met_u->hun_y + eat_d, rem_d);
mot_u->len_d = rem_d;
// one: unless we got a bad length, this has to be the last
// block in the message.
//
// two: bad data on a newt channel can cause us to assert.
// that's actually the right thing for a private channel.
///
c3_assert(0 == nex_u);
}
c3_free(met_u);
met_u = nex_u;
if ( mot_u->ext_u ) {
uv_timer_start(&mot_u->tim_u, _newt_meat_next_cb, 0, 0);
}
else {
mot_u->ent_u = 0;
}
c3_assert(pat_d == len_d);
// clear the message
//
c3_free(mot_u->mes_u);
mot_u->mes_u = 0;
}
// build and send the object
//
{
u3_noun mat = u3i_bytes((c3_w)len_d, buf_y);
u3_noun mat = u3i_bytes((c3_w)met_u->len_d, met_u->hun_y);
mot_u->pok_f(mot_u->ptr_v, mat);
c3_free(met_u);
}
}
c3_free(buf_y);
/* _newt_meat_next_cb(): handle next msg after timer.
*/
static void
_newt_meat_next_cb(uv_timer_t* tim_u)
{
u3_moat* mot_u = tim_u->data;
_newt_meat_poke(mot_u);
}
/* u3_newt_decode(): decode a (partial) length-prefixed byte buffer
*/
void
u3_newt_decode(u3_moat* mot_u, c3_y* buf_y, c3_w len_w)
u3_newt_decode(u3_moat* mot_u, c3_y* buf_y, c3_d len_d)
{
// grow read buffer by `len_d` bytes
//
if ( mot_u->rag_y ) {
// XX check SIZE_MAX?
//
c3_d nel_d = mot_u->len_d + len_w;
u3_mess* mes_u = &mot_u->mes_u;
mot_u->rag_y = c3_realloc(mot_u->rag_y, nel_d);
memcpy(mot_u->rag_y + mot_u->len_d, buf_y, len_w);
while ( len_d ) {
switch( mes_u->sat_e ) {
mot_u->len_d = nel_d;
c3_free(buf_y);
}
else {
mot_u->rag_y = buf_y;
mot_u->len_d = (c3_d)len_w;
}
// process stray bytes, trying to create a new message
// or add a block to an existing one.
//
while ( mot_u->rag_y ) {
// no message
//
if ( !mot_u->mes_u ) {
// but enough stray bytes to start one
// read up to 8 length bytes as needed
//
if ( 8ULL <= mot_u->len_d ) {
_newt_gain_mess(mot_u);
}
else {
break;
}
}
else {
// there is a live message, add a block to the queue.
//
_newt_gain_meat(mot_u);
case u3_mess_head: {
c3_y* len_y = mes_u->hed_u.len_y;
c3_y has_y = mes_u->hed_u.has_y;
c3_y ned_y = 8 - has_y;
c3_y cop_y = c3_min(ned_y, len_d);
// check for message completions
//
if ( mot_u->mes_u->has_d >= mot_u->mes_u->len_d ) {
_newt_poke_mess(mot_u);
}
memcpy(len_y + has_y, buf_y, cop_y);
buf_y += cop_y;
len_d -= cop_y;
ned_y -= cop_y;
// moar bytes needed, yield
//
if ( ned_y ) {
mes_u->hed_u.has_y = (has_y + cop_y);
}
// length known, allocate message
//
else {
c3_d met_d = (((c3_d)len_y[0]) << 0)
| (((c3_d)len_y[1]) << 8)
| (((c3_d)len_y[2]) << 16)
| (((c3_d)len_y[3]) << 24)
| (((c3_d)len_y[4]) << 32)
| (((c3_d)len_y[5]) << 40)
| (((c3_d)len_y[6]) << 48)
| (((c3_d)len_y[7]) << 56);
// must be non-zero, only 32 bits supported
//
c3_assert( met_d );
c3_assert( 0xFFFFFFFFULL > met_d );
// await body
//
_newt_mess_tail(mes_u, met_d);
}
} break;
case u3_mess_tail: {
u3_meat* met_u = mes_u->tal_u.met_u;
c3_d has_d = mes_u->tal_u.has_d;
c3_d ned_d = met_u->len_d - has_d;
c3_d cop_d = c3_min(ned_d, len_d);
memcpy(met_u->hun_y + has_d, buf_y, cop_d);
buf_y += cop_d;
len_d -= cop_d;
ned_d -= cop_d;
// moar bytes needed, yield
//
if ( ned_d ) {
mes_u->tal_u.has_d = (has_d + cop_d);
}
// message completed, enqueue and await next header
//
else {
_newt_meat_plan(mot_u, met_u);
_newt_mess_head(mes_u);
}
} break;
}
}
}
@ -259,7 +202,10 @@ _newt_read_cb(uv_stream_t* str_u,
c3_free(buf_u->base);
}
else {
u3_newt_decode(mot_u, (c3_y*)buf_u->base, (c3_w)len_i);
u3_newt_decode(mot_u, (c3_y*)buf_u->base, (c3_d)len_i);
c3_free(buf_u->base);
_newt_meat_poke(mot_u);
}
}
@ -282,9 +228,17 @@ _newt_alloc(uv_handle_t* had_u,
void
u3_newt_read(u3_moat* mot_u)
{
mot_u->mes_u = 0;
mot_u->len_d = 0;
mot_u->rag_y = 0;
// zero-initialize completed msg queue
//
mot_u->ent_u = mot_u->ext_u = 0;
// store pointer for queue timer callback
//
mot_u->tim_u.data = mot_u;
// await next msg header
//
_newt_mess_head(&mot_u->mes_u);
{
c3_i sas_i;

View File

@ -144,6 +144,8 @@ main(c3_i argc, c3_c* argv[])
{
c3_i err_i;
err_i = uv_timer_init(lup_u, &inn_u.tim_u);
c3_assert(!err_i);
err_i = uv_pipe_init(lup_u, &inn_u.pyp_u, 0);
c3_assert(!err_i);
uv_pipe_open(&inn_u.pyp_u, inn_i);