initalizes persistence and starts the serf in parallel

This commit is contained in:
Joe Bryan 2019-04-04 17:33:37 -07:00
parent 6c932e5304
commit e22312ea14

View File

@ -1016,7 +1016,9 @@ _pier_disk_init_complete(u3_disk* log_u, c3_d evt_d)
u3_pier* pir_u = log_u->pir_u;
u3_lord* god_u = pir_u->god_u;
if ( (c3y == god_u->liv_o) && (c3n == pir_u->liv_o) ) {
if ( (0 != god_u) &&
(c3y == god_u->liv_o) &&
(c3n == pir_u->liv_o) ) {
_pier_boot_ready(pir_u);
}
}
@ -1174,14 +1176,12 @@ _pier_work_play(u3_pier* pir_u,
//
god_u->rel_d = god_u->dun_d = god_u->sen_d = (lav_d - 1ULL);
// load all committed events
//
_pier_disk_create(pir_u);
{
u3_disk* log_u = pir_u->log_u;
if ( (c3y == log_u->liv_o) && (c3n == pir_u->liv_o) ) {
if ( (0 != log_u) &&
(c3y == log_u->liv_o) &&
(c3n == pir_u->liv_o) ) {
_pier_boot_ready(pir_u);
}
}
@ -1235,116 +1235,110 @@ _pier_work_poke(void* vod_p,
if ( c3y != u3du(jar) ) {
goto error;
}
else {
/* the worker process starts with a %play task,
** which tells us where to start playback
** (and who we are, if it knows)
*/
if ( 0 == pir_u->log_u ) {
switch ( u3h(jar) ) {
default: goto error;
case c3__play: {
c3_d lav_d;
c3_l mug_l;
switch ( u3h(jar) ) {
default: goto error;
if ( (c3n == u3r_qual(u3t(jar), 0, &p_jar, &q_jar, &r_jar)) ||
(c3n == u3ud(p_jar)) ||
(u3r_met(6, p_jar) != 1) ||
(c3n == u3ud(q_jar)) ||
(u3r_met(5, p_jar) != 1) ||
(c3n == u3du(r_jar)) ||
(c3n == u3ud(u3h(r_jar))) ||
((c3y != u3t(r_jar)) && (c3n != u3t(r_jar))) )
{
if ( u3_nul == u3t(jar) ) {
lav_d = 1ULL;
mug_l = 0;
}
else {
goto error;
}
}
// the worker process starts with a %play task,
// which tells us where to start playback
// (and who we are, if it knows) XX remove in favor of event-log header
//
case c3__play: {
c3_d lav_d;
c3_l mug_l;
if ( u3_nul != u3t(jar) ) {
lav_d = u3r_chub(0, p_jar);
mug_l = u3r_word(0, q_jar);
// single-home
//
_pier_set_ship(pir_u, u3k(u3h(r_jar)), u3k(u3t(r_jar)));
}
_pier_work_play(pir_u, lav_d, mug_l);
u3z(jar); u3z(mat);
break;
if ( (c3n == u3r_qual(u3t(jar), 0, &p_jar, &q_jar, &r_jar)) ||
(c3n == u3ud(p_jar)) ||
(u3r_met(6, p_jar) != 1) ||
(c3n == u3ud(q_jar)) ||
(u3r_met(5, p_jar) != 1) ||
(c3n == u3du(r_jar)) ||
(c3n == u3ud(u3h(r_jar))) ||
((c3y != u3t(r_jar)) && (c3n != u3t(r_jar))) )
{
if ( u3_nul == u3t(jar) ) {
lav_d = 1ULL;
mug_l = 0;
}
else {
goto error;
}
}
if ( u3_nul != u3t(jar) ) {
lav_d = u3r_chub(0, p_jar);
mug_l = u3r_word(0, q_jar);
// single-home
//
_pier_set_ship(pir_u, u3k(u3h(r_jar)), u3k(u3t(r_jar)));
}
_pier_work_play(pir_u, lav_d, mug_l);
u3z(jar); u3z(mat);
break;
}
else {
switch ( u3h(jar) ) {
default: goto error;
case c3__work: {
if ( (c3n == u3r_qual(jar, 0, &p_jar, &q_jar, &r_jar)) ||
(c3n == u3ud(p_jar)) ||
(u3r_met(6, p_jar) != 1) ||
(c3n == u3ud(q_jar)) ||
(u3r_met(5, q_jar) > 1) )
{
goto error;
}
else {
c3_d evt_d = u3r_chub(0, p_jar);
c3_l mug_l = u3r_word(0, q_jar);
u3_writ* wit_u = _pier_work_writ(pir_u, evt_d);
if ( !wit_u || (mug_l && (mug_l != wit_u->mug_l)) ) {
goto error;
}
{
// XX not the right place to print an error!
//
u3m_p("wire", u3h(u3t(r_jar)));
u3m_p("oust", u3h(u3t(u3t(wit_u->job))));
u3m_p("with", u3h(u3t(u3t(r_jar))));
if ( c3__crud == u3h(u3t(u3t(r_jar))) ) {
u3_pier_punt(0, u3k(u3t(u3t(u3t(u3t(r_jar))))));
}
}
fprintf(stderr, "pier: replace: %" PRIu64 "\r\n", evt_d);
_pier_work_replace(wit_u, u3k(r_jar), mat);
}
break;
}
case c3__done: {
if ( (c3n == u3r_qual(jar, 0, &p_jar, &q_jar, &r_jar)) ||
(c3n == u3ud(p_jar)) ||
(u3r_met(6, p_jar) != 1) ||
(c3n == u3ud(q_jar)) ||
(u3r_met(5, q_jar) > 1) )
{
goto error;
}
else {
c3_d evt_d = u3r_chub(0, p_jar);
c3_l mug_l = u3r_word(0, q_jar);
u3_writ* wit_u = _pier_work_writ(pir_u, evt_d);
if ( !wit_u ) {
fprintf(stderr, "poke: no writ: %" PRIu64 "\r\n", evt_d);
goto error;
}
_pier_work_complete(wit_u, mug_l, u3k(r_jar));
}
break;
}
case c3__work: {
if ( (c3n == u3r_qual(jar, 0, &p_jar, &q_jar, &r_jar)) ||
(c3n == u3ud(p_jar)) ||
(u3r_met(6, p_jar) != 1) ||
(c3n == u3ud(q_jar)) ||
(u3r_met(5, q_jar) > 1) )
{
goto error;
}
else {
c3_d evt_d = u3r_chub(0, p_jar);
c3_l mug_l = u3r_word(0, q_jar);
u3_writ* wit_u = _pier_work_writ(pir_u, evt_d);
if ( !wit_u || (mug_l && (mug_l != wit_u->mug_l)) ) {
goto error;
}
{
// XX not the right place to print an error!
//
u3m_p("wire", u3h(u3t(r_jar)));
u3m_p("oust", u3h(u3t(u3t(wit_u->job))));
u3m_p("with", u3h(u3t(u3t(r_jar))));
if ( c3__crud == u3h(u3t(u3t(r_jar))) ) {
u3_pier_punt(0, u3k(u3t(u3t(u3t(u3t(r_jar))))));
}
}
fprintf(stderr, "pier: replace: %" PRIu64 "\r\n", evt_d);
_pier_work_replace(wit_u, u3k(r_jar), mat);
}
break;
}
case c3__done: {
if ( (c3n == u3r_qual(jar, 0, &p_jar, &q_jar, &r_jar)) ||
(c3n == u3ud(p_jar)) ||
(u3r_met(6, p_jar) != 1) ||
(c3n == u3ud(q_jar)) ||
(u3r_met(5, q_jar) > 1) )
{
goto error;
}
else {
c3_d evt_d = u3r_chub(0, p_jar);
c3_l mug_l = u3r_word(0, q_jar);
u3_writ* wit_u = _pier_work_writ(pir_u, evt_d);
if ( !wit_u ) {
fprintf(stderr, "poke: no writ: %" PRIu64 "\r\n", evt_d);
goto error;
}
_pier_work_complete(wit_u, mug_l, u3k(r_jar));
}
break;
}
}
_pier_apply(pir_u);
return;
@ -1452,6 +1446,12 @@ u3_pier_create(c3_w wag_w, c3_c* pax_c)
pir_u->unx_u = c3_calloc(sizeof(u3_unix));
pir_u->sav_u = c3_calloc(sizeof(u3_save));
// initialize persistence
//
if ( c3n == _pier_disk_create(pir_u) ) {
return 0;
}
// start the serf process
//
if ( !(pir_u->god_u = _pier_work_create(pir_u)) ) {