newt: adds synchronous read, used in serf with blocking writes

This commit is contained in:
Joe Bryan 2020-06-08 20:14:39 -07:00
parent 3f26140cf4
commit 8ef8987b54
3 changed files with 104 additions and 32 deletions

View File

@ -1084,7 +1084,12 @@
void
u3_newt_write(u3_mojo* moj_u, u3_atom mat);
/* u3_newt_read(): activate reading on input stream.
/* u3_newt_read_sync(): start reading; multiple msgs synchronous.
*/
void
u3_newt_read_sync(u3_moat* mot_u);
/* u3_newt_read(): start reading; each msg asynchronous.
*/
void
u3_newt_read(u3_moat* mot_u);

View File

@ -68,13 +68,39 @@ _newt_meat_plan(u3_moat* mot_u, u3_meat* met_u)
}
}
static void
_newt_meat_next_cb(uv_timer_t* tim_u);
/* _newt_meat_poke(): deliver completed msg.
*/
static void
_newt_meat_poke(u3_moat* mot_u)
_newt_meat_poke(u3_moat* mot_u, u3_meat* met_u)
{
u3_noun mat = u3i_bytes((c3_w)met_u->len_d, met_u->hun_y);
mot_u->pok_f(mot_u->ptr_v, mat);
c3_free(met_u);
}
/* _newt_meat_next_sync(): deliver completed msgs, synchronously.
*/
static void
_newt_meat_next_sync(u3_moat* mot_u)
{
u3_meat* met_u = mot_u->ext_u;
while ( met_u ) {
u3_meat* nex_u = met_u->nex_u;
_newt_meat_poke(mot_u, met_u);
met_u = nex_u;
}
mot_u->ent_u = mot_u->ext_u = 0;
}
static void
_newt_meat_next_cb(uv_timer_t* tim_u);
/* _newt_meat_next(): deliver completed msgs, asynchronously.
*/
static void
_newt_meat_next(u3_moat* mot_u)
{
u3_meat* met_u = mot_u->ext_u;
@ -88,9 +114,7 @@ _newt_meat_poke(u3_moat* mot_u)
mot_u->ent_u = 0;
}
u3_noun mat = u3i_bytes((c3_w)met_u->len_d, met_u->hun_y);
mot_u->pok_f(mot_u->ptr_v, mat);
c3_free(met_u);
_newt_meat_poke(mot_u, met_u);
}
}
@ -100,7 +124,7 @@ static void
_newt_meat_next_cb(uv_timer_t* tim_u)
{
u3_moat* mot_u = tim_u->data;
_newt_meat_poke(mot_u);
_newt_meat_next(mot_u);
}
/* u3_newt_decode(): decode a (partial) length-prefixed byte buffer
@ -181,7 +205,48 @@ u3_newt_decode(u3_moat* mot_u, c3_y* buf_y, c3_d len_d)
}
}
/* _newt_read_cb(): stream input callback.
/* _newt_read(): handle async read result.
*/
static c3_o
_newt_read(u3_moat* mot_u,
ssize_t len_i,
const uv_buf_t* buf_u)
{
if ( 0 > len_i ) {
c3_free(buf_u->base);
uv_read_stop((uv_stream_t*)&mot_u->pyp_u);
fprintf(stderr, "newt: read failed %s\r\n", uv_strerror(len_i));
mot_u->bal_f(mot_u->ptr_v, uv_strerror(len_i));
return c3n;
}
// EAGAIN/EWOULDBLOCK
//
else if ( 0 == len_i ) {
c3_free(buf_u->base);
return c3n;
}
else {
u3_newt_decode(mot_u, (c3_y*)buf_u->base, (c3_d)len_i);
c3_free(buf_u->base);
return c3y;
}
}
/* _newt_read_sync_cb(): async read callback, sync msg delivery.
*/
static void
_newt_read_sync_cb(uv_stream_t* str_u,
ssize_t len_i,
const uv_buf_t* buf_u)
{
u3_moat* mot_u = (void *)str_u;
if ( c3y == _newt_read(mot_u, len_i, buf_u) ) {
_newt_meat_next_sync(mot_u);
}
}
/* _newt_read_cb(): async read callback, async msg delivery.
*/
static void
_newt_read_cb(uv_stream_t* str_u,
@ -190,22 +255,8 @@ _newt_read_cb(uv_stream_t* str_u,
{
u3_moat* mot_u = (void *)str_u;
if ( 0 > len_i ) {
c3_free(buf_u->base);
uv_read_stop(str_u);
fprintf(stderr, "newt: read failed %s\r\n", uv_strerror(len_i));
mot_u->bal_f(mot_u->ptr_v, uv_strerror(len_i));
}
// EAGAIN/EWOULDBLOCK
//
else if ( 0 == len_i ) {
c3_free(buf_u->base);
}
else {
u3_newt_decode(mot_u, (c3_y*)buf_u->base, (c3_d)len_i);
c3_free(buf_u->base);
_newt_meat_poke(mot_u);
if ( c3y == _newt_read(mot_u, len_i, buf_u) ) {
_newt_meat_next(mot_u);
}
}
@ -223,10 +274,8 @@ _newt_alloc(uv_handle_t* had_u,
*buf_u = uv_buf_init(ptr_v, len_i);
}
/* u3_newt_read(): start stream reading.
*/
void
u3_newt_read(u3_moat* mot_u)
static void
_newt_read_init(u3_moat* mot_u, uv_read_cb read_cb_f)
{
// zero-initialize completed msg queue
//
@ -245,7 +294,7 @@ u3_newt_read(u3_moat* mot_u)
if ( 0 != (sas_i = uv_read_start((uv_stream_t*)&mot_u->pyp_u,
_newt_alloc,
_newt_read_cb)) )
read_cb_f)) )
{
fprintf(stderr, "newt: read failed %s\r\n", uv_strerror(sas_i));
mot_u->bal_f(mot_u->ptr_v, uv_strerror(sas_i));
@ -253,6 +302,22 @@ u3_newt_read(u3_moat* mot_u)
}
}
/* u3_newt_read_sync(): start reading; multiple msgs synchronous.
*/
void
u3_newt_read_sync(u3_moat* mot_u)
{
_newt_read_init(mot_u, _newt_read_sync_cb);
}
/* u3_newt_read(): start reading; each msg asynchronous.
*/
void
u3_newt_read(u3_moat* mot_u)
{
_newt_read_init(mot_u, _newt_read_cb);
}
/* n_req: write request for newt
*/
typedef struct _n_req {

View File

@ -153,6 +153,8 @@ main(c3_i argc, c3_c* argv[])
err_i = uv_pipe_init(lup_u, &out_u.pyp_u, 0);
c3_assert(!err_i);
uv_pipe_open(&out_u.pyp_u, out_i);
uv_stream_set_blocking((uv_stream_t*)&out_u.pyp_u, 1);
}
// set up writing
@ -241,7 +243,7 @@ main(c3_i argc, c3_c* argv[])
// start reading
//
u3_newt_read(&inn_u);
u3_newt_read_sync(&inn_u);
// enter loop
//