From acddc88d660c6b7c5aad151e58ba37ba708f0c3b Mon Sep 17 00:00:00 2001 From: Joe Bryan Date: Thu, 5 Dec 2019 14:10:01 -0800 Subject: [PATCH] vere: refactors ipc message parsing and assembly We've periodically seen malloc failures (ie, returning a null pointer) in newt.c, our IPC implementation ("memory lost" assertion failures). I've suspected that they were caused by attempts to allocate zero bytes. We were not properly following libuv's read callback conventions for EAGAIN/EWOULDBLOCK, which would've had that result. The message parsing and assembly was in one large function that I found it hard to follow, so I've refactored into smaller pieces, so it should be more maintainable going forward. --- pkg/urbit/vere/newt.c | 385 +++++++++++++++++++++++------------------- 1 file changed, 212 insertions(+), 173 deletions(-) diff --git a/pkg/urbit/vere/newt.c b/pkg/urbit/vere/newt.c index 67a220daa..05afb418b 100644 --- a/pkg/urbit/vere/newt.c +++ b/pkg/urbit/vere/newt.c @@ -4,7 +4,7 @@ ** ** a message is a 64-bit little-endian byte count, followed ** by the indicated number of bytes. the bytes are the -** the ++cue of of a noun. +** the +jam of of a noun. ** ** the implementation is relatively inefficient and could ** lose a few copies, mallocs, etc. @@ -33,174 +33,204 @@ #undef NEWT_VERBOSE +/* _newt_gain_meat(): add a block to an existing message +*/ +static void +_newt_gain_meat(u3_moat* mot_u) +{ + c3_assert( 0 != mot_u->mes_u ); + + // create block + // + u3_meat* met_u = c3_malloc(mot_u->len_d + (c3_d) sizeof(u3_meat)); + met_u->nex_u = 0; + met_u->len_d = mot_u->len_d; + memcpy(met_u->hun_y, mot_u->rag_y, mot_u->len_d); + +#ifdef NEWT_VERBOSE + u3l_log("newt: %d: create: msg %p, new block %p, len %" + PRIu64 ", has %" PRIu64 ", needs %" PRIu64 "\r\n", + getpid(), + mot_u->mes_u, + met_u, + met_u->len_d, + mot_u->mes_u->has_d, + mot_u->mes_u->len_d); +#endif + + // enqueue block + // + if ( !mot_u->mes_u->meq_u ) { + mot_u->mes_u->meq_u = mot_u->mes_u->qem_u = met_u; + } + else { + mot_u->mes_u->qem_u->nex_u = met_u; + mot_u->mes_u->qem_u = met_u; + } + mot_u->mes_u->has_d += met_u->len_d; + + // free consumed stray bytes + // + c3_free(mot_u->rag_y); + mot_u->len_d = 0; + mot_u->rag_y = 0; +} + +/* _newt_gain_mess(): begin parsing a new message +*/ +static void +_newt_gain_mess(u3_moat* mot_u) +{ + c3_assert( 8ULL <= mot_u->len_d ); + c3_assert( 0 == mot_u->mes_u ); + + c3_d nel_d = 0ULL; + + nel_d |= ((c3_d) mot_u->rag_y[0]) << 0ULL; + nel_d |= ((c3_d) mot_u->rag_y[1]) << 8ULL; + nel_d |= ((c3_d) mot_u->rag_y[2]) << 16ULL; + nel_d |= ((c3_d) mot_u->rag_y[3]) << 24ULL; + nel_d |= ((c3_d) mot_u->rag_y[4]) << 32ULL; + nel_d |= ((c3_d) mot_u->rag_y[5]) << 40ULL; + nel_d |= ((c3_d) mot_u->rag_y[6]) << 48ULL; + nel_d |= ((c3_d) mot_u->rag_y[7]) << 56ULL; + + c3_assert( 0ULL != nel_d ); + + // very likely to be a bad write, we can't jam anything this big + // + if ( 0xFFFFFFFFULL < nel_d ) { + u3l_log("newt: %d warn: large read %" PRIu64 "\r\n", + getpid(), + nel_d); + } + +#ifdef NEWT_VERBOSE + u3l_log("newt: %d: parsed length %" PRIu64 "\r\n", + getpid(), + nel_d); +#endif + + mot_u->len_d -= 8ULL; + + mot_u->mes_u = c3_malloc(sizeof(u3_mess)); + mot_u->mes_u->len_d = nel_d; + mot_u->mes_u->has_d = 0; + mot_u->mes_u->meq_u = mot_u->mes_u->qem_u = 0; + + if ( 0ULL == mot_u->len_d ) { + c3_free(mot_u->rag_y); + mot_u->rag_y = 0; + } + else { + // remove consumed length from stray bytes + // + c3_y* buf_y = c3_malloc(mot_u->len_d); + memcpy(buf_y, mot_u->rag_y + 8, mot_u->len_d); + + c3_free(mot_u->rag_y); + mot_u->rag_y = buf_y; + } +} + +/* _newt_poke_mess(): pass message to [mot_u] callback +*/ +static void +_newt_poke_mess(u3_moat* mot_u) +{ + c3_assert( 0 != mot_u->mes_u ); + c3_assert( mot_u->mes_u->has_d >= mot_u->mes_u->len_d ); + + c3_d len_d = mot_u->mes_u->len_d; + c3_y* buf_y = c3_malloc(len_d); + c3_d pat_d = 0; + u3_meat* met_u; + + // we should have just cleared this + // + c3_assert(!mot_u->rag_y); + c3_assert(!mot_u->len_d); + + // collect queue blocks, cleaning them up; return any spare meat + // to the rag. + // + { + met_u = mot_u->mes_u->meq_u; + + while ( met_u && (pat_d < len_d) ) { + u3_meat* nex_u = met_u->nex_u; + c3_d end_d = (pat_d + met_u->len_d); + c3_d eat_d; + c3_d rem_d; + + eat_d = c3_min(len_d, end_d) - pat_d; + memcpy(buf_y + pat_d, met_u->hun_y, eat_d); + pat_d += eat_d; + + rem_d = (met_u->len_d - eat_d); + if ( rem_d ) { + mot_u->rag_y = c3_malloc(rem_d); + memcpy(mot_u->rag_y, met_u->hun_y + eat_d, rem_d); + mot_u->len_d = rem_d; + + // one: unless we got a bad length, this has to be the last + // block in the message. + // + // two: bad data on a newt channel can cause us to assert. + // that's actually the right thing for a private channel. + /// + c3_assert(0 == nex_u); + } + + c3_free(met_u); + met_u = nex_u; + } + + c3_assert(pat_d == len_d); + + // clear the message + // + c3_free(mot_u->mes_u); + mot_u->mes_u = 0; + } + + // build and send the object + // + { + u3_noun mat = u3i_bytes((c3_w)len_d, buf_y); + mot_u->pok_f(mot_u->vod_p, mat); + } +} + /* _newt_consume(): advance buffer processing. */ static void _newt_consume(u3_moat* mot_u) { - /* process stray bytes, trying to create a new message - ** or add a block to an existing one. - */ - while ( 1 ) { - if ( mot_u->rag_y ) { - /* if there is a live message, add a block to the queue. - */ - if ( mot_u->mes_u ) { - u3_meat* met_u; - - /* create block - */ - met_u = c3_malloc(mot_u->len_d + (c3_d) sizeof(u3_meat)); - met_u->nex_u = 0; - met_u->len_d = mot_u->len_d; - memcpy(met_u->hun_y, mot_u->rag_y, mot_u->len_d); - -#ifdef NEWT_VERBOSE - u3l_log("newt: %d: create: msg %p, new block %p, len %" - PRIu64 ", has %" PRIu64 ", needs %" PRIu64 "\r\n", - getpid(), - mot_u->mes_u, - met_u, - met_u->len_d, - mot_u->mes_u->has_d, - mot_u->mes_u->len_d); -#endif - /* enqueue block - */ - if ( !mot_u->mes_u->meq_u ) { - mot_u->mes_u->meq_u = mot_u->mes_u->qem_u = met_u; - } - else { - mot_u->mes_u->qem_u->nex_u = met_u; - mot_u->mes_u->qem_u = met_u; - } - mot_u->mes_u->has_d += met_u->len_d; - - /* free consumed stray bytes - */ - c3_free(mot_u->rag_y); - mot_u->len_d = 0; - mot_u->rag_y = 0; - } - else { - /* no message, but enough stray bytes to fill in - ** a length; collect them and create a message. - */ - if ( mot_u->len_d >= 8ULL ) { - c3_d nel_d = 0; - - nel_d |= ((c3_d) mot_u->rag_y[0]) << 0ULL; - nel_d |= ((c3_d) mot_u->rag_y[1]) << 8ULL; - nel_d |= ((c3_d) mot_u->rag_y[2]) << 16ULL; - nel_d |= ((c3_d) mot_u->rag_y[3]) << 24ULL; - nel_d |= ((c3_d) mot_u->rag_y[4]) << 32ULL; - nel_d |= ((c3_d) mot_u->rag_y[5]) << 40ULL; - nel_d |= ((c3_d) mot_u->rag_y[6]) << 48ULL; - nel_d |= ((c3_d) mot_u->rag_y[7]) << 56ULL; - -#ifdef NEWT_VERBOSE - u3l_log("newt: %d: parsed length %" PRIu64 "\r\n", - getpid(), - nel_d); -#endif - mot_u->len_d -= 8ULL; - - mot_u->mes_u = c3_malloc(sizeof(u3_mess)); - mot_u->mes_u->len_d = nel_d; - mot_u->mes_u->has_d = 0; - mot_u->mes_u->meq_u = mot_u->mes_u->qem_u = 0; - - if ( !mot_u->len_d ) { - c3_free(mot_u->rag_y); - mot_u->rag_y = 0; - } - else { - /* remove consumed length from stray bytes - */ - c3_y* buf_y = c3_malloc(mot_u->len_d); - - memcpy(buf_y, mot_u->rag_y + 8, mot_u->len_d); - - c3_free(mot_u->rag_y); - mot_u->rag_y = buf_y; - - /* remaining bytes will be installed as message meat - */ - continue; - } - } + // process stray bytes, trying to create a new message + // or add a block to an existing one. + // + while ( mot_u->rag_y ) { + // no message + // + if ( !mot_u->mes_u ) { + // but enough stray bytes to start one + // + if ( 8ULL <= mot_u->len_d ) { + _newt_gain_mess(mot_u); } } + else { + // there is a live message, add a block to the queue. + // + _newt_gain_meat(mot_u); - /* check for message completions - */ - if ( mot_u->mes_u && (mot_u->mes_u->has_d >= mot_u->mes_u->len_d) ) { - c3_d len_d = mot_u->mes_u->len_d; - c3_y* buf_y = c3_malloc(len_d); - c3_d pat_d = 0; - u3_meat* met_u; - - /* we should have just cleared this - */ - c3_assert(!mot_u->rag_y); - c3_assert(!mot_u->len_d); - - /* collect queue blocks, cleaning them up; return any spare meat - ** to the rag. - */ - { - met_u = mot_u->mes_u->meq_u; - while ( met_u && (pat_d < len_d) ) { - u3_meat* nex_u = met_u->nex_u; - c3_d end_d = (pat_d + met_u->len_d); - c3_d eat_d; - c3_d rem_d; - - eat_d = c3_min(len_d, end_d) - pat_d; - memcpy(buf_y + pat_d, met_u->hun_y, eat_d); - pat_d += eat_d; - - rem_d = (met_u->len_d - eat_d); - if ( rem_d ) { - mot_u->rag_y = c3_malloc(rem_d); - memcpy(mot_u->rag_y, met_u->hun_y + eat_d, rem_d); - mot_u->len_d = rem_d; - - /* one: unless we got a bad length, this has to be the last - ** block in the message. - ** - ** two: bad data on a newt channel can cause us to assert. - ** that's actually the right thing for a private channel. - */ - c3_assert(0 == nex_u); - } - c3_free(met_u); - met_u = nex_u; - } - c3_assert(pat_d == len_d); - - /* clear the message - */ - c3_free(mot_u->mes_u); - mot_u->mes_u = 0; + // check for message completions + // + if ( mot_u->mes_u->has_d >= mot_u->mes_u->len_d ) { + _newt_poke_mess(mot_u); } - - /* build and send the object - */ - { - u3_noun mat = u3i_bytes((c3_w) len_d, buf_y); - - mot_u->pok_f(mot_u->vod_p, mat); - } - - /* continue; spare meat may need processing - */ - continue; } - - /* nothing happening, await next event - */ - break; } } @@ -223,39 +253,48 @@ _newt_read_cb(uv_stream_t* str_u, ssize_t len_i, const uv_buf_t* buf_u) { - c3_d len_d = (c3_d) len_i; u3_moat* mot_u = (void *)str_u; - if ( UV_EOF == len_i ) { - // u3l_log("newt: %d: stream closed\r\n", getpid()); + if ( 0 > len_i ) { uv_read_stop(str_u); - mot_u->bal_f(mot_u->vod_p, "stream closed"); + mot_u->bal_f(mot_u->vod_p, uv_strerror(len_i)); + } + // EAGAIN/EWOULDBLOCK + // + else if ( 0 == len_i ) { + c3_free(buf_u->base); } else { -#ifdef NEWT_VERBOSE - u3l_log("newt: %d: read %ld\r\n", getpid(), len_i); -#endif + c3_d len_d = (c3_d)len_i; #ifdef NEWT_VERBOSE - u3l_log("newt: %d: ", getpid()); - for ( int i = 0; i < len_i; i++) { - if (0 == (i % 16)) u3l_log("\r\n"); - u3l_log(" %02x", (unsigned) buf_u->base[i]); - } - u3l_log("\r\nnewt: %d: \r\n", getpid()); + u3l_log("newt: %d: read %ld\r\n", getpid(), len_i); + + u3l_log("newt: %d: ", getpid()); + for ( int i = 0; i < len_i; i++) { + if (0 == (i % 16)) u3l_log("\r\n"); + u3l_log(" %02x", (unsigned) buf_u->base[i]); + } + + u3l_log("\r\nnewt: %d: \r\n", getpid()); #endif // grow read buffer by `len_d` bytes // if ( mot_u->rag_y ) { - mot_u->rag_y = c3_realloc(mot_u->rag_y, mot_u->len_d + len_d); + c3_d nel_d = mot_u->len_d + len_d; + + mot_u->rag_y = c3_realloc(mot_u->rag_y, nel_d); memcpy(mot_u->rag_y + mot_u->len_d, buf_u->base, len_d); + + mot_u->len_d = nel_d; c3_free(buf_u->base); } else { mot_u->rag_y = (c3_y *)buf_u->base; mot_u->len_d = len_d; } + _newt_consume(mot_u); } }