WIP hand-rolled parsing, read side

This commit is contained in:
Steve Dee 2014-03-04 17:43:40 -08:00 committed by Steven Dee
parent 85c3cf4f5e
commit 008558a1f7
3 changed files with 323 additions and 52 deletions

View File

@ -14,6 +14,7 @@
# define c3__ankh c3_s4('a','n','k','h')
# define c3__any c3_s3('a','n','y')
# define c3__ap c3_s2('a','p')
# define c3__apen c3_s4('a','p','e','n')
# define c3__aro c3_s3('a','r','o')
# define c3__arvo c3_s4('a','r','v','o')
# define c3__ash c3_s3('a','s','h')

View File

@ -431,15 +431,36 @@
// end persistent state
} u2_raft;
/* u2_rreq: raft request.
*/
typedef struct _u2_rreq {
struct _u2_rmsg* msg_u;
struct _u2_rreq* nex_u;
struct _u2_rcon* ron_u;
c3_t red_t : 1;
} u2_rreq;
/* u2_rbuf: raft input buffer.
*/
typedef struct _u2_rbuf {
c3_w len_w;
c3_w cap_w;
c3_y buf_y[0];
} u2_rbuf;
/* u2_rcon: raft connection.
*/
typedef struct _u2_rcon {
uv_tcp_t wax_u;
struct _u2_rnam* nam_u;
u2_rbuf* red_u;
u2_bean red;
u2_raft* raf_u;
struct capn* cap_u;
u2_rreq* out_u;
u2_rreq* tou_u;
u2_rreq* inn_u;
u2_rreq* nni_u;
struct _u2_rcon* nex_u;
c3_t red_t;
} u2_rcon;
/* u2_rnam: raft peer name.

349
v/raft.c
View File

@ -2,7 +2,6 @@
**
** This file is in the public domain.
*/
#include <capn.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@ -10,10 +9,45 @@
#include <uv.h>
#include "all.h"
#include "p/raft.capnp.h"
#include "v/vere.h"
typedef struct {
c3_w tem_w; // Log entry term
c3_w typ_w; // Entry type, %ra|%ov
c3_w len_w; // Length of blob
c3_w* bob_w; // Blob
} u2_rent;
typedef struct _u2_rmsg {
c3_d len_d; // Words in message
c3_w tem_w; // Current term
c3_w typ_w; // %apen|%revo|%rasp
union {
struct {
c3_w suc_w; // Request successful
} rasp;
struct {
c3_d lai_d; // Last log index
c3_w lat_w; // Last log term
c3_w nam_w; // Name world length
c3_c* nam_c; // Requestor name
union {
struct {
c3_d cit_d; // Leader commitIndex
c3_d ent_d; // Number of entries
u2_rent* ent_u; // Entries
} apen;
};
} rest;
};
} u2_rmsg;
static ssize_t _raft_read_rmsg(const u2_rbuf* buf_u, u2_rmsg* msg_u);
static void _raft_send_rmsg(uv_stream_t* sem_u, const u2_rmsg* msg_u);
static void _raft_rmsg_free(u2_rmsg* msg_u);
static void _raft_conn_dead(u2_rcon* ron_u);
static u2_bean _raft_remove_run(u2_rcon* ron_u);
@ -98,19 +132,22 @@ _raft_promote(u2_raft* raf_u)
}
}
/* _raft_do_rest(): act on an incoming raft RPC request.
/* _raft_do_rest(): generic incoming RPC request effects.
*/
static void
_raft_do_rest(u2_rcon* ron_u, struct Raft_Rest res_u)
_raft_do_rest(u2_rreq* req_u)
{
uL(fprintf(uH, "raft: rest{.tem=%d,.cid=%s,.lai=%lld,.lat=%d,.which=%d}\n",
res_u.tem, res_u.cid.str, res_u.lai, res_u.lat, res_u.which));
u2_rcon* ron_u = req_u->ron_u;
u2_rmsg* msg_u = req_u->msg_u;
c3_assert(c3__apen == msg_u->typ_w || c3__revo == msg_u->typ_w);
if ( 0 == ron_u->nam_u ) {
u2_raft* raf_u = ron_u->raf_u;
u2_rnam* nam_u = raf_u->nam_u;
while ( nam_u ) {
if ( 0 == strcmp(nam_u->str_c, res_u.cid.str) ) {
if ( 0 == strcmp(nam_u->str_c, msg_u->rest.nam_c) ) {
if ( nam_u->ron_u ) {
c3_assert(nam_u->ron_u != ron_u);
_raft_conn_dead(nam_u->ron_u);
@ -125,18 +162,35 @@ _raft_do_rest(u2_rcon* ron_u, struct Raft_Rest res_u)
}
if ( 0 == ron_u->nam_u ) {
uL(fprintf(uH, "connection from unkown peer %s\n", res_u.cid.str));
uL(fprintf(uH, "connection from unkown peer %s\n", msg_u->rest.nam_c));
_raft_conn_dead(ron_u);
}
/* TODO */
}
/* _raft_do_apen(): Handle incoming AppendEntries.
*/
static void
_raft_do_apen(u2_rcon* ron_u, const u2_rmsg* msg_u)
{
c3_assert(c3__apen == msg_u->typ_w);
}
/* _raft_do_revo(): Handle incoming RequestVote.
*/
static void
_raft_do_revo(u2_rcon* ron_u, const u2_rmsg* msg_u)
{
c3_assert(c3__revo == msg_u->typ_w);
}
/* _raft_do_rasp(): act on an incoming raft RPC response.
*/
static void
_raft_do_rasp(u2_rcon* ron_u, struct Raft_Rasp ras_u)
_raft_do_rasp(u2_rcon* ron_u, u2_rmsg* msg_u)
{
c3_assert(c3__rasp == msg_u->typ_w);
if ( 0 == ron_u->nam_u ) {
uL(fprintf(uH, "invalid connection from unknown host\n"));
_raft_conn_dead(ron_u);
@ -146,46 +200,231 @@ _raft_do_rasp(u2_rcon* ron_u, struct Raft_Rasp ras_u)
}
}
/* _raft_read_rmsg(): read a u2_rmsg from a buffer.
**
** Returns <0 on parse failure.
** Returns bytes read on partial data.
** Completely successful iff msg_u->len_d == return value.
**
** If successful, caller must eventually call _raft_free_rmsg() on msg_u.
*/
static ssize_t
_raft_read_rmsg(const u2_rbuf* buf_u, u2_rmsg* msg_u)
{
ssize_t red_i = 0;
if ( buf_u->len_w < sizeof(c3_d) ) {
return 0;
}
memcpy(&msg_u->len_d, buf_u->buf_y, sizeof(c3_d));
red_i += sizeof(c3_d);
if ( msg_u->len_d < 3 ) {
return -1;
}
if ( buf_u->len_w < 4 * msg_u->len_d ) {
red_i = buf_u->len_w;
goto out;
}
if ( msg_u->len_d < red_i + 2 * sizeof(c3_w) ) {
goto out;
}
memcpy(&msg_u->tem_w, buf_u->buf_y + red_i, sizeof(c3_w));
red_i += sizeof(c3_w);
memcpy(&msg_u->typ_w, buf_u->buf_y + red_i, sizeof(c3_w));
red_i += sizeof(c3_w);
switch ( msg_u->typ_w ) {
default: {
c3_c* typ_c = u2_cr_string(msg_u->typ_w);
uL(fprintf(uH, "raft: unknown msg type %s\n", typ_c));
free(typ_c);
return -1;
}
case c3__rasp: {
if ( msg_u->len_d < red_i + sizeof(c3_w) ) {
goto out;
}
memcpy(&msg_u->rasp.suc_w, buf_u->buf_y + red_i, sizeof(c3_w));
red_i += sizeof(c3_w);
break;
}
case c3__apen: case c3__revo: {
if ( msg_u->len_d < red_i + sizeof(c3_d) + 2 * sizeof(c3_w) ) {
goto out;
}
memcpy(&msg_u->rest.lai_d, buf_u->buf_y + red_i, sizeof(c3_d));
red_i += sizeof(c3_d);
memcpy(&msg_u->rest.lat_w, buf_u->buf_y + red_i, sizeof(c3_w));
red_i += sizeof(c3_w);
memcpy(&msg_u->rest.nam_w, buf_u->buf_y + red_i, sizeof(c3_w));
red_i += sizeof(c3_w);
if ( msg_u->len_d < red_i + msg_u->rest.nam_w ) {
goto out;
}
msg_u->rest.nam_c = malloc(msg_u->rest.nam_w);
memcpy(msg_u->rest.nam_c, buf_u->buf_y + red_i, msg_u->rest.nam_w);
red_i += msg_u->rest.nam_w;
break;
}
}
if ( c3__apen == msg_u->typ_w ) {
if ( msg_u->len_d < red_i + 2 * sizeof(c3_d) ) {
goto fail;
}
memcpy(&msg_u->rest.apen.cit_d, buf_u->buf_y + red_i, sizeof(c3_d));
red_i += sizeof(c3_d);
memcpy(&msg_u->rest.apen.ent_d, buf_u->buf_y + red_i, sizeof(c3_d));
red_i += sizeof(c3_d);
msg_u->rest.apen.ent_u = calloc(
1, msg_u->rest.apen.ent_d * sizeof(u2_rent));
{
c3_d i_d;
u2_rent* ent_u = msg_u->rest.apen.ent_u;
for ( i_d = 0; i_d < msg_u->rest.apen.ent_d; i_d++ ) {
if ( msg_u->len_d < red_i + 3 * sizeof(c3_w) ) {
goto fail;
}
memcpy(&ent_u[i_d].tem_w, buf_u->buf_y + red_i, sizeof(c3_w));
red_i += sizeof(c3_w);
memcpy(&ent_u[i_d].typ_w, buf_u->buf_y + red_i, sizeof(c3_w));
red_i += sizeof(c3_w);
memcpy(&ent_u[i_d].len_w, buf_u->buf_y + red_i, sizeof(c3_w));
red_i += sizeof(c3_w);
if ( msg_u->len_d < red_i + ent_u[i_d].len_w ) {
_raft_rmsg_free(msg_u);
return red_i;
}
ent_u[i_d].bob_w = malloc(ent_u[i_d].len_w);
memcpy(ent_u[i_d].bob_w, buf_u->buf_y + red_i, ent_u[i_d].len_w);
red_i += ent_u[i_d].len_w;
}
}
}
if ( red_i != 4 * msg_u->len_d ) {
red_i = -1;
goto fail;
}
out:
return red_i;
fail:
_raft_rmsg_free(msg_u);
goto out;
}
static void
_raft_rmsg_free(u2_rmsg* msg_u) {
if ( c3__apen == msg_u->typ_w && msg_u->rest.apen.ent_u ) {
c3_d i_d;
for ( i_d = 0; i_d < msg_u->rest.apen.ent_d; i_d++ ) {
free(msg_u->rest.apen.ent_u[i_d].bob_w);
}
free(msg_u->rest.apen.ent_u);
msg_u->rest.apen.ent_u = 0;
}
if ( c3__apen == msg_u->typ_w || c3__revo == msg_u->typ_w ) {
free(msg_u->rest.nam_c);
msg_u->rest.nam_c = 0;
}
}
/* _raft_conn_work(): read and write requests and responses.
*/
static void
_raft_conn_work(u2_rcon* ron_u)
{
if ( 0 && ron_u->red_t ) { /* TODO */
capn_ptr rot_p = capn_root(ron_u->cap_u);
if ( u2_yes == ron_u->red ) {
uL(fprintf(uH, "raft: working\n"));
ron_u->red = u2_no;
while (1) {
u2_rmsg msg_u;
ssize_t ret_i = _raft_read_rmsg(ron_u->red_u, &msg_u);
ron_u->red_t = 0;
if ( CAPN_NULL == rot_p.type ) {
uL(fprintf(uH, "raft: null root\n"));
return;
if ( ret_i < 0 ) {
uL(fprintf(uH, "raft: error reading from %s\n", ron_u->nam_u->nam_c));
free(ron_u->red_u);
ron_u->red_u = 0;
break;
}
else {
Raft_Rmsg_ptr mes_p = {capn_getp(rot_p, 0, 1)};
struct Raft_Rmsg mes_u;
if ( ret_i < 4 * msg_u.len_d ) {
uL(fprintf(uH, "raft: need more\n"));
break;
}
else if ( 4 * msg_u.len_d < ret_i ) {
uL(fprintf(uH, "raft: read more than specified?!\n"));
c3_assert(0);
}
else { // lengths equal
c3_assert(ron_u->red_u->len_w >= ret_i);
memmove(ron_u->red_u->buf_y,
ron_u->red_u->buf_y + ret_i,
ron_u->red_u->len_w - ret_i);
ron_u->red_u->len_w -= ret_i;
if ( CAPN_STRUCT != mes_p.p.type ) {
uL(fprintf(uH, "raft: expected struct, got %d\n", mes_p.p.type));
_raft_conn_dead(ron_u);
}
else {
read_Raft_Rmsg(&mes_u, mes_p);
switch ( msg_u.typ_w ) {
default: {
c3_c* typ_c = u2_cr_string(msg_u.typ_w);
if ( Raft_Rmsg_rest == mes_u.which ) {
struct Raft_Rest res_u;
uL(fprintf(uH, "raft: work: unknown message type %s\n", typ_c));
free(typ_c);
break;
}
case c3__apen: {
_raft_do_apen(ron_u, &msg_u);
break;
}
case c3__revo: {
_raft_do_revo(ron_u, &msg_u);
break;
}
case c3__rasp: {
_raft_do_rasp(ron_u, &msg_u);
break;
}
}
_raft_rmsg_free(&msg_u);
}
}
}
}
}
read_Raft_Rest(&res_u, mes_u.rest);
_raft_do_rest(ron_u, res_u);
}
else {
struct Raft_Rasp ras_u;
/* _raft_conn_grow(): append buffer to raft read state.
*/
static void
_raft_conn_grow(u2_rcon* ron_u, uv_buf_t buf_u)
{
u2_rbuf* red_u = ron_u->red_u;
c3_assert(Raft_Rmsg_rasp == mes_u.which);
read_Raft_Rasp(&ras_u, mes_u.rasp);
_raft_do_rasp(ron_u, ras_u);
}
}
if ( !red_u ) {
red_u = malloc(sizeof(*red_u) + buf_u.len);
red_u->len_w = 0;
red_u->cap_w = buf_u.len;
}
if ( red_u->cap_w - red_u->len_w < buf_u.len ) {
c3_w cap_w = c3_max(2 * red_u->cap_w,
red_u->len_w + buf_u.len);
red_u = realloc(red_u, cap_w);
red_u->cap_w = cap_w;
}
memcpy(red_u->buf_y + red_u->len_w, buf_u.base, buf_u.len);
red_u->len_w += buf_u.len;
ron_u->red_u = red_u;
}
/* _raft_conn_read_cb(): generic connection read callback.
@ -208,14 +447,8 @@ _raft_conn_read_cb(uv_stream_t* tcp_u,
_raft_conn_dead(ron_u);
}
else {
struct capn_segment* seg_u = calloc(1, sizeof(struct capn_segment));
seg_u->data = buf_u.base;
seg_u->len = buf_u.len;
seg_u->cap = buf_u.len;
seg_u->user = ron_u;
capn_append_segment(ron_u->cap_u, seg_u);
ron_u->red_t = 1;
_raft_conn_grow(ron_u, buf_u);
ron_u->red = u2_yes;
_raft_conn_work(ron_u);
}
}
@ -230,9 +463,11 @@ _raft_conn_new(u2_raft* raf_u)
u2_rcon* ron_u = malloc(sizeof(*ron_u));
uv_tcp_init(u2L, &ron_u->wax_u);
ron_u->cap_u = 0;
ron_u->red_t = 0;
ron_u->red_u = 0;
ron_u->out_u = ron_u->tou_u = 0;
ron_u->inn_u = ron_u->nni_u = 0;
ron_u->red_u = 0;
ron_u->red = u2_no;
ron_u->nam_u = 0;
ron_u->raf_u = raf_u;
ron_u->nex_u = 0;
@ -268,6 +503,22 @@ _raft_remove_run(u2_rcon* ron_u)
return suc;
}
static void
_raft_rreq_free(u2_rreq* req_u, u2_rreq* qer_u)
{
if ( 0 == req_u ) {
c3_assert(0 == qer_u);
}
else {
if ( req_u->nex_u ) {
_raft_rreq_free(req_u->nex_u, qer_u);
_raft_rmsg_free(req_u->msg_u);
free(req_u);
}
else c3_assert(qer_u == req_u);
}
}
/* _raft_conn_free(): unlink a connection and free its resources.
*/
static void
@ -287,10 +538,8 @@ _raft_conn_free(uv_handle_t* had_u)
c3_assert(u2_yes == suc);
}
if ( ron_u->cap_u ) {
capn_free(ron_u->cap_u);
free(ron_u->cap_u);
}
_raft_rreq_free(ron_u->out_u, ron_u->tou_u);
_raft_rreq_free(ron_u->inn_u, ron_u->nni_u);
free(ron_u);
}