reads events in batches of 1K during replay

This commit is contained in:
Joe Bryan 2019-04-11 00:07:35 -07:00
parent 2eff7240ac
commit f5da7da563

View File

@ -66,6 +66,7 @@
static void _pier_apply(u3_pier* pir_u);
static void _pier_boot_complete(u3_pier* pir_u);
static void _pier_disk_load_commit(u3_pier* pir_u, c3_d lav_d, c3_d len_d);
static void _pier_exit_done(u3_pier* pir_u);
static void _pier_loop_exit(u3_pier* pir_u);
static void _pier_work_save(u3_pier* pir_u);
@ -301,6 +302,20 @@ _pier_work_release(u3_writ* wit_u)
if ( u3_psat_pace == pir_u->sat_e ) {
fputc('.', stderr);
// enqueue another batch of events for replay
//
{
u3_disk* log_u = pir_u->log_u;
// XX requires that writs be unlinked before effects are released
//
if ( (0 == pir_u->ent_u) &&
(wit_u->evt_d < log_u->com_d) )
{
_pier_disk_load_commit(pir_u, (1ULL + god_u->dun_d), 1000ULL);
}
}
}
else {
#ifdef VERBOSE_EVENTS
@ -639,7 +654,6 @@ _pier_set_ship(u3_pier* pir_u, u3_noun who, u3_noun fak)
}
/* _pier_disk_read_header_complete():
** XX async
*/
static void
_pier_disk_read_header_complete(u3_disk* log_u, u3_noun dat)
@ -718,111 +732,120 @@ _pier_disk_read_header(u3_disk* log_u)
#endif
}
/* _pier_disk_load_commit(): load all commits >= evt_d; set ent_u, ext_u.
** XX async
/* _pier_disk_load_commit(): load len_d commits >= lav_d; enqueue for replay
*/
static c3_o
static void
_pier_disk_load_commit(u3_pier* pir_u,
c3_d lav_d)
c3_d lav_d,
c3_d len_d)
{
u3_disk* log_u = pir_u->log_u;
if ( !log_u->fol_u ) {
return c3n;
}
else {
c3_d pos_d = log_u->fol_u->end_d;
c3_d old_d = 0;
c3_d max_d = lav_d + len_d;
c3_d pos_d = log_u->fol_u->end_d;
c3_d old_d = 0;
c3_assert ( 0 != log_u->fol_u );
#ifdef VERBOSE_EVENTS
fprintf(stderr, "pier: load: commit: at %" PRIx64 "\r\n", pos_d);
#endif
while ( pos_d ) {
c3_d len_d, evt_d;
c3_d* buf_d;
u3_noun mat, ovo, job, evt;
while ( pos_d ) {
c3_d len_d, evt_d;
c3_d* buf_d;
u3_noun mat, ovo, job, evt;
buf_d = u3_foil_reveal(log_u->fol_u, &pos_d, &len_d);
if ( !buf_d ) {
_pier_disk_bail(0, "load: commit: corrupt");
return c3n;
}
buf_d = u3_foil_reveal(log_u->fol_u, &pos_d, &len_d);
mat = u3i_chubs(len_d, buf_d);
c3_free(buf_d);
if ( !buf_d ) {
_pier_disk_bail(0, "pier: load: commit: corrupt");
return;
}
ovo = u3ke_cue(u3k(mat));
mat = u3i_chubs(len_d, buf_d);
c3_free(buf_d);
// reached header
//
if ( 0ULL == pos_d ) {
c3_assert( 1ULL == lav_d );
c3_assert( c3__boot == u3h(ovo) );
ovo = u3ke_cue(u3k(mat));
_pier_disk_read_header_complete(log_u, u3k(u3t(ovo)));
// reached header
//
if ( 0ULL == pos_d ) {
c3_assert( 1ULL == lav_d );
c3_assert( c3__boot == u3h(ovo) );
u3z(ovo); u3z(mat);
break;
}
_pier_disk_read_header_complete(log_u, u3k(u3t(ovo)));
c3_assert(c3__work == u3h(ovo));
evt = u3h(u3t(ovo));
job = u3k(u3t(u3t(u3t(ovo))));
evt_d = u3r_chub(0, evt);
u3z(ovo);
u3z(ovo); u3z(mat);
break;
}
// confirm event order
//
if ( (0 != old_d) &&
((old_d - 1ULL) != evt_d) ) {
fprintf(stderr, "pier: load: event order\r\n");
return c3n;
}
else {
old_d = evt_d;
}
c3_assert(c3__work == u3h(ovo));
evt = u3h(u3t(ovo));
job = u3k(u3t(u3t(u3t(ovo))));
evt_d = u3r_chub(0, evt);
u3z(ovo);
if ( evt_d < lav_d ) {
u3z(mat);
u3z(job);
// confirm event order
//
if ( (0 != old_d) &&
((old_d - 1ULL) != evt_d) ) {
_pier_disk_bail(0, "pier: load: commit: event order");
return;
}
else {
old_d = evt_d;
}
return c3y;
}
else {
u3_writ* wit_u = c3_calloc(sizeof(u3_writ));
// done: read past the first event requested
//
if ( evt_d < lav_d ) {
u3z(mat);
u3z(job);
return;
}
// skip: haven't reached the last event requested
//
else if ( evt_d > max_d ) {
u3z(mat);
u3z(job);
continue;
}
// enqueue requested event
//
else {
u3_writ* wit_u = c3_calloc(sizeof(u3_writ));
#ifdef VERBOSE_EVENTS
fprintf(stderr, "pier: load: commit: %" PRIu64 "\r\n", evt_d);
fprintf(stderr, "pier: load: commit: %" PRIu64 "\r\n", evt_d);
#endif
wit_u->pir_u = pir_u;
wit_u->evt_d = evt_d;
wit_u->job = job;
wit_u->mat = mat;
wit_u->pir_u = pir_u;
wit_u->evt_d = evt_d;
wit_u->job = job;
wit_u->mat = mat;
/* insert at queue exit -- the oldest events run first
*/
if ( !pir_u->ent_u && !pir_u->ext_u ) {
pir_u->ent_u = pir_u->ext_u = wit_u;
}
else {
if ( (1ULL + wit_u->evt_d) != pir_u->ext_u->evt_d ) {
fprintf(stderr, "pier: load: commit: event gap: %" PRIx64 ", %"
PRIx64 "\r\n",
wit_u->evt_d,
pir_u->ext_u->evt_d);
u3z(mat);
u3z(job);
return c3n;
}
wit_u->nex_u = pir_u->ext_u;
pir_u->ext_u = wit_u;
/* insert at queue exit -- the oldest events run first
*/
if ( !pir_u->ent_u && !pir_u->ext_u ) {
pir_u->ent_u = pir_u->ext_u = wit_u;
}
else {
if ( (1ULL + wit_u->evt_d) != pir_u->ext_u->evt_d ) {
fprintf(stderr, "pier: load: commit: event gap: %" PRIx64 ", %"
PRIx64 "\r\n",
wit_u->evt_d,
pir_u->ext_u->evt_d);
u3z(mat);
u3z(job);
_pier_disk_bail(0, "pier: load: comit: event gap");
return;
}
wit_u->nex_u = pir_u->ext_u;
pir_u->ext_u = wit_u;
}
}
return c3y;
}
}
@ -1112,9 +1135,7 @@ _pier_boot_ready(u3_pier* pir_u)
// begin queuing batches of committed events
//
// XX batch, async
//
_pier_disk_load_commit(pir_u, (1ULL + god_u->dun_d));
_pier_disk_load_commit(pir_u, (1ULL + god_u->dun_d), 1000ULL);
if ( 0 == god_u->dun_d ) {
fprintf(stderr, "pier: replaying events 1 through %" PRIu64 "\r\n",
@ -1150,7 +1171,6 @@ _pier_boot_ready(u3_pier* pir_u)
}
/* _pier_disk_init_complete():
** XX async
*/
static void
_pier_disk_init_complete(u3_disk* log_u, c3_d evt_d)
@ -1168,7 +1188,6 @@ _pier_disk_init_complete(u3_disk* log_u, c3_d evt_d)
}
/* _pier_disk_init():
** XX async
*/
static c3_o
_pier_disk_init(u3_disk* log_u)