Leader election appears to work

This commit is contained in:
Steve Dee 2014-03-05 19:40:35 -08:00 committed by Steven Dee
parent 2e0f7df85c
commit 9e2f12fb80
2 changed files with 189 additions and 24 deletions

View File

@ -421,9 +421,11 @@
uv_timer_t tim_u;
u2_ulog lug_u; // event log
c3_w ent_w;
c3_w lat_w;
u2_raty typ_e;
struct _u2_rnam* nam_u;
struct _u2_rcon* run_u;
c3_w pop_w;
c3_w vot_w;
c3_c* str_c; // our name
// persistent state, restored on start

211
v/raft.c
View File

@ -48,10 +48,10 @@ typedef struct _u2_rmsg {
static ssize_t _raft_rmsg_read(const u2_rbuf* buf_u, u2_rmsg* msg_u);
static void _raft_rmsg_send(uv_stream_t* sem_u, const u2_rmsg* msg_u);
static void _raft_rmsg_free(u2_rmsg* msg_u);
static void _raft_conn_dead(u2_rcon* ron_u);
static u2_bean _raft_remove_run(u2_rcon* ron_u);
static void _raft_send_rasp(u2_rcon* ron_u, c3_t suc_t);
static void _raft_rreq_free(u2_rreq* req_u);
static void _raft_time_cb(uv_timer_t* tim_u, c3_i sas_i);
/* _raft_readname(): parse a raft host:port peer name.
@ -127,11 +127,68 @@ _raft_election_rand()
static void
_raft_promote(u2_raft* raf_u)
{
raf_u->typ_e = u2_raty_lead;
if ( u2_raty_lead == raf_u->typ_e ) {
uL(fprintf(uH, "raft: double promote; ignoring\n"));
}
else {
c3_i sas_i;
u2_sist_boot();
if ( u2_no == u2_Host.ops_u.bat ) {
u2_lo_lead(u2A);
uL(fprintf(uH, "raft: promoting to leader\n"));
if ( 1 == raf_u->pop_w ) {
raf_u->typ_e = u2_raty_lead;
}
else {
c3_assert(u2_raty_cand == raf_u->typ_e);
sas_i = uv_timer_stop(&raf_u->tim_u);
c3_assert(0 == sas_i);
raf_u->typ_e = u2_raty_lead;
sas_i = uv_timer_start(&raf_u->tim_u, _raft_time_cb, 50, 0);
c3_assert(0 == sas_i);
}
}
/* TODO */
if ( 1 == raf_u->pop_w ) {
u2_sist_boot();
if ( u2_no == u2_Host.ops_u.bat ) {
u2_lo_lead(u2A);
}
}
}
/* _raft_demote(): demote to follower.
*/
static void
_raft_demote(u2_raft* raf_u)
{
if ( u2_raty_lead == raf_u->typ_e ) {
uL(fprintf(uH, "raft: demoting leader\n"));
/* TODO just start dropping events */
exit(1);
}
else {
c3_assert(u2_raty_cand == raf_u->typ_e);
uL(fprintf(uH, "raft: demoting to follower\n"));
raf_u->vog_c = 0;
raf_u->vot_w = 0;
raf_u->typ_e = u2_raty_foll;
}
}
/* _raft_update_term(): note an updated term.
*/
static void
_raft_update_term(u2_raft* raf_u, c3_w tem_w)
{
if ( raf_u->tem_w < tem_w ) {
uL(fprintf(uH, "raft: got term from network: %d\n", tem_w));
raf_u->tem_w = tem_w;
c3_assert(raf_u->typ_e != u2_raty_none);
if ( raf_u->typ_e == u2_raty_foll ) {
c3_assert(0 == raf_u->vot_w);
} else _raft_demote(raf_u);
}
}
@ -209,14 +266,53 @@ _raft_do_apen(u2_rcon* ron_u, const u2_rmsg* msg_u)
/* TODO respond */
}
/* _raft_apen_done(): process AppendEntries response.
*/
static void
_raft_apen_done(u2_rreq* req_u, c3_w suc_w)
{
/* TODO */
}
/* _raft_do_revo(): Handle incoming RequestVote.
*/
static void
_raft_do_revo(u2_rcon* ron_u, const u2_rmsg* msg_u)
{
u2_raft* raf_u = ron_u->raf_u;
c3_assert(c3__revo == msg_u->typ_w);
_raft_do_rest(ron_u, msg_u);
/* TODO respond */
c3_assert(0 != ron_u->nam_u);
if ( msg_u->tem_w >= raf_u->tem_w &&
(0 == raf_u->vog_c ||
0 == strcmp(raf_u->vog_c, ron_u->nam_u->str_c)) &&
(raf_u->lat_w < msg_u->rest.lat_w ||
(raf_u->lat_w == msg_u->rest.lat_w &&
raf_u->ent_w <= msg_u->rest.lai_d)) )
{
_raft_send_rasp(ron_u, 1);
}
else _raft_send_rasp(ron_u, 0);
}
/* _raft_revo_done(): process RequestVote response.
*/
static void
_raft_revo_done(u2_rreq* req_u, c3_w suc_w)
{
u2_rcon* ron_u = req_u->ron_u;
u2_raft* raf_u = ron_u->raf_u;
if ( suc_w ) {
raf_u->vot_w++;
}
if ( raf_u->vot_w > raf_u->pop_w / 2 ) {
uL(fprintf(uH, "raft: got majority of %d for term %d\n",
raf_u->vot_w, raf_u->tem_w));
_raft_promote(raf_u);
}
}
/* _raft_do_rasp(): act on an incoming raft RPC response.
@ -224,13 +320,47 @@ _raft_do_revo(u2_rcon* ron_u, const u2_rmsg* msg_u)
static void
_raft_do_rasp(u2_rcon* ron_u, u2_rmsg* msg_u)
{
u2_raft* raf_u = ron_u->raf_u;
c3_assert(c3__rasp == msg_u->typ_w);
if ( 0 == ron_u->nam_u ) {
uL(fprintf(uH, "invalid connection from unknown host\n"));
uL(fprintf(uH, "raft: invalid connection from unknown host\n"));
_raft_conn_dead(ron_u);
}
else {
/* TODO find request, perform effects, pop request */
u2_rreq* req_u = ron_u->out_u;
uL(fprintf(uH, "raft: got response from %s\n", ron_u->nam_u->str_c));
if ( !req_u ) {
uL(fprintf(uH, "raft: no request found\n"));
_raft_conn_dead(ron_u);
}
else {
switch ( req_u->msg_u->typ_w ) {
default: {
uL(fprintf(uH, "raft: bogus request type %x?!\n",
req_u->msg_u->typ_w));
c3_assert(0);
}
case c3__apen: {
_raft_apen_done(req_u, msg_u->rasp.suc_w);
break;
}
case c3__revo: {
_raft_revo_done(req_u, msg_u->rasp.suc_w);
break;
}
}
_raft_update_term(raf_u, msg_u->tem_w);
ron_u->out_u = req_u->nex_u;
if ( 0 == req_u->nex_u ) {
c3_assert(req_u == ron_u->tou_u);
ron_u->tou_u = 0;
}
_raft_rreq_free(req_u);
}
}
}
@ -496,7 +626,6 @@ _raft_conn_work(u2_rcon* ron_u)
break;
}
else if ( ret_i == 0 ) {
//uL(fprintf(uH, "raft: conn_work: partial data\n"));
break;
}
else {
@ -666,20 +795,11 @@ _raft_rreq_new(u2_rcon* ron_u)
}
static void
_raft_rreq_free(u2_rreq* req_u, u2_rreq* qer_u)
_raft_rreq_free(u2_rreq* req_u)
{
if ( 0 == req_u ) {
c3_assert(0 == qer_u);
}
else {
if ( req_u->nex_u ) {
_raft_rreq_free(req_u->nex_u, qer_u);
_raft_rmsg_free(req_u->msg_u);
free(req_u->msg_u); // XX
free(req_u);
}
else c3_assert(qer_u == req_u);
}
_raft_rmsg_free(req_u->msg_u);
free(req_u->msg_u); // XX
free(req_u);
}
/* _raft_conn_free(): unlink a connection and free its resources.
@ -690,7 +810,23 @@ _raft_conn_free(uv_handle_t* had_u)
u2_rcon* ron_u = (void*)had_u;
//uL(fprintf(uH, "raft: conn_free %p\n", ron_u));
_raft_rreq_free(ron_u->out_u, ron_u->tou_u);
{
u2_rreq* req_u = ron_u->out_u;
if ( 0 == req_u ) {
c3_assert(0 == ron_u->tou_u);
}
else {
while ( req_u ) {
if ( 0 == req_u->nex_u ) {
c3_assert(req_u == ron_u->tou_u);
}
ron_u->out_u = req_u->nex_u;
_raft_rreq_free(req_u);
req_u = ron_u->out_u;
}
}
}
free(ron_u->red_u);
free(ron_u);
}
@ -866,6 +1002,22 @@ _raft_conn_all(u2_raft* raf_u, void (*con_f)(u2_rcon* ron_u))
}
}
/* _raft_send_rasp(): send a response to a peer.
*/
static void
_raft_send_rasp(u2_rcon* ron_u, c3_t suc_t)
{
u2_raft* raf_u = ron_u->raf_u;
u2_rmsg msg_u;
msg_u.ver_w = u2_cr_mug('a');
msg_u.len_d = 6;
msg_u.tem_w = raf_u->tem_w;
msg_u.typ_w = c3__rasp;
msg_u.rasp.suc_w = suc_t;
_raft_rmsg_send((uv_stream_t*)&ron_u->wax_u, &msg_u);
}
/* _raft_send_beat(): send a heartbeat (empty AppendEntries) to peer.
*/
static void
@ -1032,6 +1184,16 @@ u2_raft_init()
raf_u->nam_u = u2_Host.ops_u.rop_u.nam_u;
{
u2_rnam* nam_u = raf_u->nam_u;
raf_u->pop_w = 1;
while ( nam_u ) {
raf_u->pop_w++;
nam_u = nam_u->nex_u;
}
}
siz_i = strlen(u2_Host.ops_u.nam_c) + strlen(":65536") + 1;
raf_u->str_c = malloc(siz_i);
wri_i = snprintf(raf_u->str_c, siz_i, "%s:%d",
@ -1232,6 +1394,7 @@ _raft_push(u2_raft* raf_u, c3_w* bob_w, c3_w len_w)
if ( 0 == u2_Host.ops_u.rop_u.por_s ) {
raf_u->ent_w = u2_sist_pack(u2A, c3__ov, bob_w, len_w);
raf_u->lat_w = raf_u->tem_w; // XX
if ( !uv_is_active((uv_handle_t*)&raf_u->tim_u) ) {
uv_timer_start(&raf_u->tim_u, _raft_comm_cb, 0, 0);