Merge branch 'fucking-raft'

As of this commit, connection maintenance and leader election in
multi-instance raft mode are working. We don't sync events yet. We
also pick an inopportune time to boot -- since our boot process is
synchronous and generally takes longer than 300ms, the first thing
we do after becoming leader is go unavailable, causing a new leader
to be picked.

In other words, raft multi-instance mode isn't there yet. But
single-instance mode should be unaffected by any of this.
This commit is contained in:
Steven Dee 2014-03-06 10:54:33 -08:00
commit ec7a08f6df
12 changed files with 1289 additions and 59 deletions

3
.gitmodules vendored
View File

@ -1,3 +0,0 @@
[submodule "outside/c-capnproto"]
path = outside/c-capnproto
url = https://github.com/urbit/c-capnproto

View File

@ -14,6 +14,7 @@
# define c3__ankh c3_s4('a','n','k','h') # define c3__ankh c3_s4('a','n','k','h')
# define c3__any c3_s3('a','n','y') # define c3__any c3_s3('a','n','y')
# define c3__ap c3_s2('a','p') # define c3__ap c3_s2('a','p')
# define c3__apen c3_s4('a','p','e','n')
# define c3__aro c3_s3('a','r','o') # define c3__aro c3_s3('a','r','o')
# define c3__arvo c3_s4('a','r','v','o') # define c3__arvo c3_s4('a','r','v','o')
# define c3__ash c3_s3('a','s','h') # define c3__ash c3_s3('a','s','h')

View File

@ -17,18 +17,21 @@ struct Raft_Comd;
struct Raft_Rent; struct Raft_Rent;
struct Raft_Rest; struct Raft_Rest;
struct Raft_Rasp; struct Raft_Rasp;
struct Raft_Rmsg;
typedef struct {capn_ptr p;} Raft_ptr; typedef struct {capn_ptr p;} Raft_ptr;
typedef struct {capn_ptr p;} Raft_Comd_ptr; typedef struct {capn_ptr p;} Raft_Comd_ptr;
typedef struct {capn_ptr p;} Raft_Rent_ptr; typedef struct {capn_ptr p;} Raft_Rent_ptr;
typedef struct {capn_ptr p;} Raft_Rest_ptr; typedef struct {capn_ptr p;} Raft_Rest_ptr;
typedef struct {capn_ptr p;} Raft_Rasp_ptr; typedef struct {capn_ptr p;} Raft_Rasp_ptr;
typedef struct {capn_ptr p;} Raft_Rmsg_ptr;
typedef struct {capn_ptr p;} Raft_list; typedef struct {capn_ptr p;} Raft_list;
typedef struct {capn_ptr p;} Raft_Comd_list; typedef struct {capn_ptr p;} Raft_Comd_list;
typedef struct {capn_ptr p;} Raft_Rent_list; typedef struct {capn_ptr p;} Raft_Rent_list;
typedef struct {capn_ptr p;} Raft_Rest_list; typedef struct {capn_ptr p;} Raft_Rest_list;
typedef struct {capn_ptr p;} Raft_Rasp_list; typedef struct {capn_ptr p;} Raft_Rasp_list;
typedef struct {capn_ptr p;} Raft_Rmsg_list;
enum Raft_Comd_Type { enum Raft_Comd_Type {
Raft_Comd_Type_nop = 0, Raft_Comd_Type_nop = 0,
@ -45,7 +48,7 @@ struct Raft_Comd {
struct Raft_Rent { struct Raft_Rent {
uint32_t tem; uint32_t tem;
Raft_Comd_ptr cmd; Raft_Comd_ptr com;
}; };
enum Raft_Rest_which { enum Raft_Rest_which {
Raft_Rest_revo = 0, Raft_Rest_revo = 0,
@ -70,42 +73,60 @@ struct Raft_Rasp {
uint64_t tem; uint64_t tem;
unsigned suc : 1; unsigned suc : 1;
}; };
enum Raft_Rmsg_which {
Raft_Rmsg_rest = 0,
Raft_Rmsg_rasp = 1
};
struct Raft_Rmsg {
enum Raft_Rmsg_which which;
union {
Raft_Rest_ptr rest;
Raft_Rasp_ptr rasp;
};
};
Raft_ptr new_Raft(struct capn_segment*); Raft_ptr new_Raft(struct capn_segment*);
Raft_Comd_ptr new_Raft_Comd(struct capn_segment*); Raft_Comd_ptr new_Raft_Comd(struct capn_segment*);
Raft_Rent_ptr new_Raft_Rent(struct capn_segment*); Raft_Rent_ptr new_Raft_Rent(struct capn_segment*);
Raft_Rest_ptr new_Raft_Rest(struct capn_segment*); Raft_Rest_ptr new_Raft_Rest(struct capn_segment*);
Raft_Rasp_ptr new_Raft_Rasp(struct capn_segment*); Raft_Rasp_ptr new_Raft_Rasp(struct capn_segment*);
Raft_Rmsg_ptr new_Raft_Rmsg(struct capn_segment*);
Raft_list new_Raft_list(struct capn_segment*, int len); Raft_list new_Raft_list(struct capn_segment*, int len);
Raft_Comd_list new_Raft_Comd_list(struct capn_segment*, int len); Raft_Comd_list new_Raft_Comd_list(struct capn_segment*, int len);
Raft_Rent_list new_Raft_Rent_list(struct capn_segment*, int len); Raft_Rent_list new_Raft_Rent_list(struct capn_segment*, int len);
Raft_Rest_list new_Raft_Rest_list(struct capn_segment*, int len); Raft_Rest_list new_Raft_Rest_list(struct capn_segment*, int len);
Raft_Rasp_list new_Raft_Rasp_list(struct capn_segment*, int len); Raft_Rasp_list new_Raft_Rasp_list(struct capn_segment*, int len);
Raft_Rmsg_list new_Raft_Rmsg_list(struct capn_segment*, int len);
void read_Raft(struct Raft*, Raft_ptr); void read_Raft(struct Raft*, Raft_ptr);
void read_Raft_Comd(struct Raft_Comd*, Raft_Comd_ptr); void read_Raft_Comd(struct Raft_Comd*, Raft_Comd_ptr);
void read_Raft_Rent(struct Raft_Rent*, Raft_Rent_ptr); void read_Raft_Rent(struct Raft_Rent*, Raft_Rent_ptr);
void read_Raft_Rest(struct Raft_Rest*, Raft_Rest_ptr); void read_Raft_Rest(struct Raft_Rest*, Raft_Rest_ptr);
void read_Raft_Rasp(struct Raft_Rasp*, Raft_Rasp_ptr); void read_Raft_Rasp(struct Raft_Rasp*, Raft_Rasp_ptr);
void read_Raft_Rmsg(struct Raft_Rmsg*, Raft_Rmsg_ptr);
void write_Raft(const struct Raft*, Raft_ptr); void write_Raft(const struct Raft*, Raft_ptr);
void write_Raft_Comd(const struct Raft_Comd*, Raft_Comd_ptr); void write_Raft_Comd(const struct Raft_Comd*, Raft_Comd_ptr);
void write_Raft_Rent(const struct Raft_Rent*, Raft_Rent_ptr); void write_Raft_Rent(const struct Raft_Rent*, Raft_Rent_ptr);
void write_Raft_Rest(const struct Raft_Rest*, Raft_Rest_ptr); void write_Raft_Rest(const struct Raft_Rest*, Raft_Rest_ptr);
void write_Raft_Rasp(const struct Raft_Rasp*, Raft_Rasp_ptr); void write_Raft_Rasp(const struct Raft_Rasp*, Raft_Rasp_ptr);
void write_Raft_Rmsg(const struct Raft_Rmsg*, Raft_Rmsg_ptr);
void get_Raft(struct Raft*, Raft_list, int i); void get_Raft(struct Raft*, Raft_list, int i);
void get_Raft_Comd(struct Raft_Comd*, Raft_Comd_list, int i); void get_Raft_Comd(struct Raft_Comd*, Raft_Comd_list, int i);
void get_Raft_Rent(struct Raft_Rent*, Raft_Rent_list, int i); void get_Raft_Rent(struct Raft_Rent*, Raft_Rent_list, int i);
void get_Raft_Rest(struct Raft_Rest*, Raft_Rest_list, int i); void get_Raft_Rest(struct Raft_Rest*, Raft_Rest_list, int i);
void get_Raft_Rasp(struct Raft_Rasp*, Raft_Rasp_list, int i); void get_Raft_Rasp(struct Raft_Rasp*, Raft_Rasp_list, int i);
void get_Raft_Rmsg(struct Raft_Rmsg*, Raft_Rmsg_list, int i);
void set_Raft(const struct Raft*, Raft_list, int i); void set_Raft(const struct Raft*, Raft_list, int i);
void set_Raft_Comd(const struct Raft_Comd*, Raft_Comd_list, int i); void set_Raft_Comd(const struct Raft_Comd*, Raft_Comd_list, int i);
void set_Raft_Rent(const struct Raft_Rent*, Raft_Rent_list, int i); void set_Raft_Rent(const struct Raft_Rent*, Raft_Rent_list, int i);
void set_Raft_Rest(const struct Raft_Rest*, Raft_Rest_list, int i); void set_Raft_Rest(const struct Raft_Rest*, Raft_Rest_list, int i);
void set_Raft_Rasp(const struct Raft_Rasp*, Raft_Rasp_list, int i); void set_Raft_Rasp(const struct Raft_Rasp*, Raft_Rasp_list, int i);
void set_Raft_Rmsg(const struct Raft_Rmsg*, Raft_Rmsg_list, int i);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -421,15 +421,55 @@
uv_timer_t tim_u; uv_timer_t tim_u;
u2_ulog lug_u; // event log u2_ulog lug_u; // event log
c3_w ent_w; c3_w ent_w;
c3_w lat_w;
u2_raty typ_e; u2_raty typ_e;
struct _u2_rnam* nam_u;
struct _u2_rcon* run_u;
c3_w pop_w;
c3_w vot_w;
c3_c* str_c; // our name
// persistent state, restored on start
c3_w tem_w;
c3_c* vog_c;
// end persistent state
} u2_raft; } u2_raft;
/* u2_rreq: raft request.
*/
typedef struct _u2_rreq {
struct _u2_rmsg* msg_u;
struct _u2_rreq* nex_u;
struct _u2_rcon* ron_u;
} u2_rreq;
/* u2_rbuf: raft input buffer.
*/
typedef struct _u2_rbuf {
c3_w len_w;
c3_w cap_w;
c3_y buf_y[0];
} u2_rbuf;
/* u2_rcon: raft connection.
*/
typedef struct _u2_rcon {
uv_tcp_t wax_u;
struct _u2_rnam* nam_u;
u2_rbuf* red_u;
u2_bean red;
u2_raft* raf_u;
u2_rreq* out_u;
u2_rreq* tou_u;
struct _u2_rcon* nex_u;
} u2_rcon;
/* u2_rnam: raft peer name. /* u2_rnam: raft peer name.
*/ */
typedef struct _u2_rnam { typedef struct _u2_rnam {
c3_c* str_c; c3_c* str_c;
c3_c* nam_c; c3_c* nam_c;
c3_s por_s; c3_c* por_c;
u2_rcon* ron_u;
struct _u2_rnam* nex_u; struct _u2_rnam* nex_u;
} u2_rnam; } u2_rnam;
@ -769,15 +809,15 @@
/** Main loop, new style. /** Main loop, new style.
**/ **/
/* u2_lo_boot(): restore or create pier.
*/
void
u2_lo_boot(void);
/* u2_lo_loop(): enter main event loop. /* u2_lo_loop(): enter main event loop.
*/ */
void void
u2_lo_loop(u2_reck* rec_u); u2_lo_loop(void);
/* u2_lo_lead(): actions on promotion to leader.
*/
void
u2_lo_lead(u2_reck* rec_u);
/* u2_lo_exit(): shut down io across pier. /* u2_lo_exit(): shut down io across pier.
*/ */
@ -1064,10 +1104,10 @@
u2_bean u2_bean
u2_raft_readopt(u2_ropt* rop_u, const c3_c* arg_c); u2_raft_readopt(u2_ropt* rop_u, const c3_c* arg_c);
/* u2_raft_io_init(): initialize raft I/O. /* u2_raft_init(): start Raft process.
*/ */
void void
u2_raft_io_init(void); u2_raft_init(void);
/* u2_raft_work(): poke, kick, and push pending events. /* u2_raft_work(): poke, kick, and push pending events.
*/ */

@ -1 +0,0 @@
Subproject commit 89f1ac3953ae934d7fc34013251dee19fa88add6

View File

@ -14,7 +14,7 @@ struct Raft {
struct Rent { # log entry struct Rent { # log entry
tem @0 :UInt32; # term tem @0 :UInt32; # term
cmd @1 :Comd; # command com @1 :Comd; # command
} }
struct Rest { # Raft RPC request struct Rest { # Raft RPC request
@ -35,4 +35,11 @@ struct Raft {
tem @0 :UInt64; # leader's term tem @0 :UInt64; # leader's term
suc @1 :Bool; # success suc @1 :Bool; # success
} }
struct Rmsg { # Raft RPC message
union {
rest @0 :Rest;
rasp @1 :Rasp;
}
}
} }

View File

@ -73,12 +73,12 @@ Raft_Rent_list new_Raft_Rent_list(struct capn_segment *s, int len) {
void read_Raft_Rent(struct Raft_Rent *s, Raft_Rent_ptr p) { void read_Raft_Rent(struct Raft_Rent *s, Raft_Rent_ptr p) {
capn_resolve(&p.p); capn_resolve(&p.p);
s->tem = capn_read32(p.p, 0); s->tem = capn_read32(p.p, 0);
s->cmd.p = capn_getp(p.p, 0, 0); s->com.p = capn_getp(p.p, 0, 0);
} }
void write_Raft_Rent(const struct Raft_Rent *s, Raft_Rent_ptr p) { void write_Raft_Rent(const struct Raft_Rent *s, Raft_Rent_ptr p) {
capn_resolve(&p.p); capn_resolve(&p.p);
capn_write32(p.p, 0, s->tem); capn_write32(p.p, 0, s->tem);
capn_setp(p.p, 0, s->cmd.p); capn_setp(p.p, 0, s->com.p);
} }
void get_Raft_Rent(struct Raft_Rent *s, Raft_Rent_list l, int i) { void get_Raft_Rent(struct Raft_Rent *s, Raft_Rent_list l, int i) {
Raft_Rent_ptr p; Raft_Rent_ptr p;
@ -174,3 +174,48 @@ void set_Raft_Rasp(const struct Raft_Rasp *s, Raft_Rasp_list l, int i) {
p.p = capn_getp(l.p, i, 0); p.p = capn_getp(l.p, i, 0);
write_Raft_Rasp(s, p); write_Raft_Rasp(s, p);
} }
Raft_Rmsg_ptr new_Raft_Rmsg(struct capn_segment *s) {
Raft_Rmsg_ptr p;
p.p = capn_new_struct(s, 8, 1);
return p;
}
Raft_Rmsg_list new_Raft_Rmsg_list(struct capn_segment *s, int len) {
Raft_Rmsg_list p;
p.p = capn_new_list(s, len, 8, 1);
return p;
}
void read_Raft_Rmsg(struct Raft_Rmsg *s, Raft_Rmsg_ptr p) {
capn_resolve(&p.p);
s->which = (enum Raft_Rmsg_which) capn_read16(p.p, 0);
switch (s->which) {
case Raft_Rmsg_rest:
case Raft_Rmsg_rasp:
s->rasp.p = capn_getp(p.p, 0, 0);
break;
default:
break;
}
}
void write_Raft_Rmsg(const struct Raft_Rmsg *s, Raft_Rmsg_ptr p) {
capn_resolve(&p.p);
capn_write16(p.p, 0, s->which);
switch (s->which) {
case Raft_Rmsg_rest:
case Raft_Rmsg_rasp:
capn_setp(p.p, 0, s->rasp.p);
break;
default:
break;
}
}
void get_Raft_Rmsg(struct Raft_Rmsg *s, Raft_Rmsg_list l, int i) {
Raft_Rmsg_ptr p;
p.p = capn_getp(l.p, i, 0);
read_Raft_Rmsg(s, p);
}
void set_Raft_Rmsg(const struct Raft_Rmsg *s, Raft_Rmsg_list l, int i) {
Raft_Rmsg_ptr p;
p.p = capn_getp(l.p, i, 0);
write_Raft_Rmsg(s, p);
}

View File

@ -164,7 +164,6 @@ _lo_init()
u2_term_io_init(); u2_term_io_init();
u2_http_io_init(); u2_http_io_init();
u2_save_io_init(); u2_save_io_init();
u2_raft_io_init();
u2_batz_io_init(); u2_batz_io_init();
} }
@ -630,10 +629,10 @@ _lo_slow()
#endif #endif
} }
/* u2_lo_boot(): restore or create pier. /* u2_lo_boot(): begin main event loop.
*/ */
void void
u2_lo_boot() u2_lo_loop()
{ {
uv_loop_t* lup_u = uv_default_loop(); uv_loop_t* lup_u = uv_default_loop();
@ -643,13 +642,17 @@ u2_lo_boot()
// signal(SIGIO, SIG_IGN); // linux is wont to produce for some reason // signal(SIGIO, SIG_IGN); // linux is wont to produce for some reason
_lo_init(); _lo_init();
u2_sist_boot(); u2_raft_init();
if ( u2_no == u2_Host.ops_u.bat ) {
uv_run(u2L, UV_RUN_DEFAULT);
}
} }
/* u2_lo_loop(): begin main event loop. /* u2_lo_lead(): actions on promotion to leader.
*/ */
void void
u2_lo_loop(u2_reck* rec_u) u2_lo_lead(u2_reck* rec_u)
{ {
_lo_talk(); _lo_talk();
{ {

View File

@ -410,10 +410,7 @@ main(c3_i argc,
u2_lo_grab("main", u2_none); u2_lo_grab("main", u2_none);
u2_lo_boot(); u2_lo_loop();
if ( u2_no == u2_Host.ops_u.bat ) {
u2_lo_loop(u2_Host.arv_u);
}
return 0; return 0;
} }

1155
v/raft.c

File diff suppressed because it is too large Load Diff

View File

@ -984,8 +984,16 @@ u2_reck_plan(u2_reck* rec_u,
u2_noun pax, u2_noun pax,
u2_noun fav) u2_noun fav)
{ {
if ( u2_raty_lead == u2R->typ_e ) {
u2_noun egg = u2nc(pax, fav); u2_noun egg = u2nc(pax, fav);
rec_u->roe = u2nc(u2nc(u2_nul, egg), rec_u->roe); rec_u->roe = u2nc(u2nc(u2_nul, egg), rec_u->roe);
}
else {
c3_c* hed_c = u2_cr_string(u2h(u2t(pax)));
uL(fprintf(uH, "reck: dropping roe from %s\n", hed_c));
free(hed_c);
u2z(pax); u2z(fav);
}
} }
/* u2_reck_plow(): queue multiple ova (external). /* u2_reck_plow(): queue multiple ova (external).

View File

@ -814,7 +814,7 @@ _sist_zen(u2_reck* rec_u)
void void
u2_sist_boot(void) u2_sist_boot(void)
{ {
uL(fprintf(uH, "raft: booting\n")); uL(fprintf(uH, "sist: booting\n"));
if ( u2_yes == u2_Host.ops_u.nuu ) { if ( u2_yes == u2_Host.ops_u.nuu ) {
u2_noun pig = u2_none; u2_noun pig = u2_none;
@ -863,5 +863,4 @@ u2_sist_boot(void)
{ {
u2_http_ef_bake(); u2_http_ef_bake();
} }
} }