Merge branch 'newt-clean' (#2042)

* newt-clean:
  vere: refactors ipc message parsing and assembly

Signed-off-by: Jared Tobin <jared@tlon.io>
This commit is contained in:
Jared Tobin 2019-12-06 21:15:21 +08:00
commit 5cbfb844e8
No known key found for this signature in database
GPG Key ID: 0E4647D58F8A69E4

View File

@ -4,7 +4,7 @@
**
** a message is a 64-bit little-endian byte count, followed
** by the indicated number of bytes. the bytes are the
** the ++cue of of a noun.
** the +jam of of a noun.
**
** the implementation is relatively inefficient and could
** lose a few copies, mallocs, etc.
@ -33,174 +33,204 @@
#undef NEWT_VERBOSE
/* _newt_gain_meat(): add a block to an existing message
*/
static void
_newt_gain_meat(u3_moat* mot_u)
{
c3_assert( 0 != mot_u->mes_u );
// create block
//
u3_meat* met_u = c3_malloc(mot_u->len_d + (c3_d) sizeof(u3_meat));
met_u->nex_u = 0;
met_u->len_d = mot_u->len_d;
memcpy(met_u->hun_y, mot_u->rag_y, mot_u->len_d);
#ifdef NEWT_VERBOSE
u3l_log("newt: %d: create: msg %p, new block %p, len %"
PRIu64 ", has %" PRIu64 ", needs %" PRIu64 "\r\n",
getpid(),
mot_u->mes_u,
met_u,
met_u->len_d,
mot_u->mes_u->has_d,
mot_u->mes_u->len_d);
#endif
// enqueue block
//
if ( !mot_u->mes_u->meq_u ) {
mot_u->mes_u->meq_u = mot_u->mes_u->qem_u = met_u;
}
else {
mot_u->mes_u->qem_u->nex_u = met_u;
mot_u->mes_u->qem_u = met_u;
}
mot_u->mes_u->has_d += met_u->len_d;
// free consumed stray bytes
//
c3_free(mot_u->rag_y);
mot_u->len_d = 0;
mot_u->rag_y = 0;
}
/* _newt_gain_mess(): begin parsing a new message
*/
static void
_newt_gain_mess(u3_moat* mot_u)
{
c3_assert( 8ULL <= mot_u->len_d );
c3_assert( 0 == mot_u->mes_u );
c3_d nel_d = 0ULL;
nel_d |= ((c3_d) mot_u->rag_y[0]) << 0ULL;
nel_d |= ((c3_d) mot_u->rag_y[1]) << 8ULL;
nel_d |= ((c3_d) mot_u->rag_y[2]) << 16ULL;
nel_d |= ((c3_d) mot_u->rag_y[3]) << 24ULL;
nel_d |= ((c3_d) mot_u->rag_y[4]) << 32ULL;
nel_d |= ((c3_d) mot_u->rag_y[5]) << 40ULL;
nel_d |= ((c3_d) mot_u->rag_y[6]) << 48ULL;
nel_d |= ((c3_d) mot_u->rag_y[7]) << 56ULL;
c3_assert( 0ULL != nel_d );
// very likely to be a bad write, we can't jam anything this big
//
if ( 0xFFFFFFFFULL < nel_d ) {
u3l_log("newt: %d warn: large read %" PRIu64 "\r\n",
getpid(),
nel_d);
}
#ifdef NEWT_VERBOSE
u3l_log("newt: %d: parsed length %" PRIu64 "\r\n",
getpid(),
nel_d);
#endif
mot_u->len_d -= 8ULL;
mot_u->mes_u = c3_malloc(sizeof(u3_mess));
mot_u->mes_u->len_d = nel_d;
mot_u->mes_u->has_d = 0;
mot_u->mes_u->meq_u = mot_u->mes_u->qem_u = 0;
if ( 0ULL == mot_u->len_d ) {
c3_free(mot_u->rag_y);
mot_u->rag_y = 0;
}
else {
// remove consumed length from stray bytes
//
c3_y* buf_y = c3_malloc(mot_u->len_d);
memcpy(buf_y, mot_u->rag_y + 8, mot_u->len_d);
c3_free(mot_u->rag_y);
mot_u->rag_y = buf_y;
}
}
/* _newt_poke_mess(): pass message to [mot_u] callback
*/
static void
_newt_poke_mess(u3_moat* mot_u)
{
c3_assert( 0 != mot_u->mes_u );
c3_assert( mot_u->mes_u->has_d >= mot_u->mes_u->len_d );
c3_d len_d = mot_u->mes_u->len_d;
c3_y* buf_y = c3_malloc(len_d);
c3_d pat_d = 0;
u3_meat* met_u;
// we should have just cleared this
//
c3_assert(!mot_u->rag_y);
c3_assert(!mot_u->len_d);
// collect queue blocks, cleaning them up; return any spare meat
// to the rag.
//
{
met_u = mot_u->mes_u->meq_u;
while ( met_u && (pat_d < len_d) ) {
u3_meat* nex_u = met_u->nex_u;
c3_d end_d = (pat_d + met_u->len_d);
c3_d eat_d;
c3_d rem_d;
eat_d = c3_min(len_d, end_d) - pat_d;
memcpy(buf_y + pat_d, met_u->hun_y, eat_d);
pat_d += eat_d;
rem_d = (met_u->len_d - eat_d);
if ( rem_d ) {
mot_u->rag_y = c3_malloc(rem_d);
memcpy(mot_u->rag_y, met_u->hun_y + eat_d, rem_d);
mot_u->len_d = rem_d;
// one: unless we got a bad length, this has to be the last
// block in the message.
//
// two: bad data on a newt channel can cause us to assert.
// that's actually the right thing for a private channel.
///
c3_assert(0 == nex_u);
}
c3_free(met_u);
met_u = nex_u;
}
c3_assert(pat_d == len_d);
// clear the message
//
c3_free(mot_u->mes_u);
mot_u->mes_u = 0;
}
// build and send the object
//
{
u3_noun mat = u3i_bytes((c3_w)len_d, buf_y);
mot_u->pok_f(mot_u->vod_p, mat);
}
}
/* _newt_consume(): advance buffer processing.
*/
static void
_newt_consume(u3_moat* mot_u)
{
/* process stray bytes, trying to create a new message
** or add a block to an existing one.
*/
while ( 1 ) {
if ( mot_u->rag_y ) {
/* if there is a live message, add a block to the queue.
*/
if ( mot_u->mes_u ) {
u3_meat* met_u;
/* create block
*/
met_u = c3_malloc(mot_u->len_d + (c3_d) sizeof(u3_meat));
met_u->nex_u = 0;
met_u->len_d = mot_u->len_d;
memcpy(met_u->hun_y, mot_u->rag_y, mot_u->len_d);
#ifdef NEWT_VERBOSE
u3l_log("newt: %d: create: msg %p, new block %p, len %"
PRIu64 ", has %" PRIu64 ", needs %" PRIu64 "\r\n",
getpid(),
mot_u->mes_u,
met_u,
met_u->len_d,
mot_u->mes_u->has_d,
mot_u->mes_u->len_d);
#endif
/* enqueue block
*/
if ( !mot_u->mes_u->meq_u ) {
mot_u->mes_u->meq_u = mot_u->mes_u->qem_u = met_u;
}
else {
mot_u->mes_u->qem_u->nex_u = met_u;
mot_u->mes_u->qem_u = met_u;
}
mot_u->mes_u->has_d += met_u->len_d;
/* free consumed stray bytes
*/
c3_free(mot_u->rag_y);
mot_u->len_d = 0;
mot_u->rag_y = 0;
}
else {
/* no message, but enough stray bytes to fill in
** a length; collect them and create a message.
*/
if ( mot_u->len_d >= 8ULL ) {
c3_d nel_d = 0;
nel_d |= ((c3_d) mot_u->rag_y[0]) << 0ULL;
nel_d |= ((c3_d) mot_u->rag_y[1]) << 8ULL;
nel_d |= ((c3_d) mot_u->rag_y[2]) << 16ULL;
nel_d |= ((c3_d) mot_u->rag_y[3]) << 24ULL;
nel_d |= ((c3_d) mot_u->rag_y[4]) << 32ULL;
nel_d |= ((c3_d) mot_u->rag_y[5]) << 40ULL;
nel_d |= ((c3_d) mot_u->rag_y[6]) << 48ULL;
nel_d |= ((c3_d) mot_u->rag_y[7]) << 56ULL;
#ifdef NEWT_VERBOSE
u3l_log("newt: %d: parsed length %" PRIu64 "\r\n",
getpid(),
nel_d);
#endif
mot_u->len_d -= 8ULL;
mot_u->mes_u = c3_malloc(sizeof(u3_mess));
mot_u->mes_u->len_d = nel_d;
mot_u->mes_u->has_d = 0;
mot_u->mes_u->meq_u = mot_u->mes_u->qem_u = 0;
if ( !mot_u->len_d ) {
c3_free(mot_u->rag_y);
mot_u->rag_y = 0;
}
else {
/* remove consumed length from stray bytes
*/
c3_y* buf_y = c3_malloc(mot_u->len_d);
memcpy(buf_y, mot_u->rag_y + 8, mot_u->len_d);
c3_free(mot_u->rag_y);
mot_u->rag_y = buf_y;
/* remaining bytes will be installed as message meat
*/
continue;
}
}
// process stray bytes, trying to create a new message
// or add a block to an existing one.
//
while ( mot_u->rag_y ) {
// no message
//
if ( !mot_u->mes_u ) {
// but enough stray bytes to start one
//
if ( 8ULL <= mot_u->len_d ) {
_newt_gain_mess(mot_u);
}
}
else {
// there is a live message, add a block to the queue.
//
_newt_gain_meat(mot_u);
/* check for message completions
*/
if ( mot_u->mes_u && (mot_u->mes_u->has_d >= mot_u->mes_u->len_d) ) {
c3_d len_d = mot_u->mes_u->len_d;
c3_y* buf_y = c3_malloc(len_d);
c3_d pat_d = 0;
u3_meat* met_u;
/* we should have just cleared this
*/
c3_assert(!mot_u->rag_y);
c3_assert(!mot_u->len_d);
/* collect queue blocks, cleaning them up; return any spare meat
** to the rag.
*/
{
met_u = mot_u->mes_u->meq_u;
while ( met_u && (pat_d < len_d) ) {
u3_meat* nex_u = met_u->nex_u;
c3_d end_d = (pat_d + met_u->len_d);
c3_d eat_d;
c3_d rem_d;
eat_d = c3_min(len_d, end_d) - pat_d;
memcpy(buf_y + pat_d, met_u->hun_y, eat_d);
pat_d += eat_d;
rem_d = (met_u->len_d - eat_d);
if ( rem_d ) {
mot_u->rag_y = c3_malloc(rem_d);
memcpy(mot_u->rag_y, met_u->hun_y + eat_d, rem_d);
mot_u->len_d = rem_d;
/* one: unless we got a bad length, this has to be the last
** block in the message.
**
** two: bad data on a newt channel can cause us to assert.
** that's actually the right thing for a private channel.
*/
c3_assert(0 == nex_u);
}
c3_free(met_u);
met_u = nex_u;
}
c3_assert(pat_d == len_d);
/* clear the message
*/
c3_free(mot_u->mes_u);
mot_u->mes_u = 0;
// check for message completions
//
if ( mot_u->mes_u->has_d >= mot_u->mes_u->len_d ) {
_newt_poke_mess(mot_u);
}
/* build and send the object
*/
{
u3_noun mat = u3i_bytes((c3_w) len_d, buf_y);
mot_u->pok_f(mot_u->vod_p, mat);
}
/* continue; spare meat may need processing
*/
continue;
}
/* nothing happening, await next event
*/
break;
}
}
@ -223,39 +253,48 @@ _newt_read_cb(uv_stream_t* str_u,
ssize_t len_i,
const uv_buf_t* buf_u)
{
c3_d len_d = (c3_d) len_i;
u3_moat* mot_u = (void *)str_u;
if ( UV_EOF == len_i ) {
// u3l_log("newt: %d: stream closed\r\n", getpid());
if ( 0 > len_i ) {
uv_read_stop(str_u);
mot_u->bal_f(mot_u->vod_p, "stream closed");
mot_u->bal_f(mot_u->vod_p, uv_strerror(len_i));
}
// EAGAIN/EWOULDBLOCK
//
else if ( 0 == len_i ) {
c3_free(buf_u->base);
}
else {
#ifdef NEWT_VERBOSE
u3l_log("newt: %d: read %ld\r\n", getpid(), len_i);
#endif
c3_d len_d = (c3_d)len_i;
#ifdef NEWT_VERBOSE
u3l_log("newt: %d: <bytes>", getpid());
for ( int i = 0; i < len_i; i++) {
if (0 == (i % 16)) u3l_log("\r\n");
u3l_log(" %02x", (unsigned) buf_u->base[i]);
}
u3l_log("\r\nnewt: %d: </bytes>\r\n", getpid());
u3l_log("newt: %d: read %ld\r\n", getpid(), len_i);
u3l_log("newt: %d: <bytes>", getpid());
for ( int i = 0; i < len_i; i++) {
if (0 == (i % 16)) u3l_log("\r\n");
u3l_log(" %02x", (unsigned) buf_u->base[i]);
}
u3l_log("\r\nnewt: %d: </bytes>\r\n", getpid());
#endif
// grow read buffer by `len_d` bytes
//
if ( mot_u->rag_y ) {
mot_u->rag_y = c3_realloc(mot_u->rag_y, mot_u->len_d + len_d);
c3_d nel_d = mot_u->len_d + len_d;
mot_u->rag_y = c3_realloc(mot_u->rag_y, nel_d);
memcpy(mot_u->rag_y + mot_u->len_d, buf_u->base, len_d);
mot_u->len_d = nel_d;
c3_free(buf_u->base);
}
else {
mot_u->rag_y = (c3_y *)buf_u->base;
mot_u->len_d = len_d;
}
_newt_consume(mot_u);
}
}