vere: ports synchronous event log iterator

This commit is contained in:
Joe Bryan 2022-12-13 21:33:05 -05:00
parent 99a8ccda7b
commit 9daab2fd5a
2 changed files with 100 additions and 0 deletions

View File

@ -563,6 +563,10 @@
u3_info put_u; // write queue
} u3_disk;
/* u3_disk_walk: opaque event log iterator.
*/
typedef struct _u3_disk_walk u3_disk_walk;
/* u3_psat: pier state.
*/
typedef enum {
@ -1011,6 +1015,28 @@
void
u3_disk_plan(u3_disk* log_u, u3_fact* tac_u);
/* u3_disk_walk_init(): init iterator.
*/
u3_disk_walk*
u3_disk_walk_init(u3_disk* log_u,
c3_d eve_d,
c3_d len_d);
/* u3_disk_walk_live(): check if live.
*/
c3_o
u3_disk_walk_live(u3_disk_walk* wok_u);
/* u3_disk_walk_live(): get next fact.
*/
c3_o
u3_disk_walk_step(u3_disk_walk* wok_u, u3_fact* tac_u);
/* u3_disk_walk_done(): close iterator.
*/
void
u3_disk_walk_done(u3_disk_walk* wok_u);
/* u3_lord_init(): start serf.
*/
u3_lord*

View File

@ -22,6 +22,12 @@ struct _cd_save {
struct _u3_disk* log_u;
};
struct _u3_disk_walk {
u3_lmdb_walk itr_u;
u3_disk* log_u;
c3_o liv_o;
};
#undef VERBOSE_DISK
#undef DISK_TRACE_JAM
#undef DISK_TRACE_CUE
@ -494,6 +500,74 @@ u3_disk_read(u3_disk* log_u, c3_d eve_d, c3_d len_d)
uv_timer_start(&red_u->tim_u, _disk_read_start_cb, 0, 0);
}
/* u3_disk_walk_init(): init iterator.
*/
u3_disk_walk*
u3_disk_walk_init(u3_disk* log_u,
c3_d eve_d,
c3_d len_d)
{
u3_disk_walk* wok_u = c3_malloc(sizeof(*wok_u));
c3_d max_d = eve_d + len_d - 1;
wok_u->log_u = log_u;
wok_u->liv_o = u3_lmdb_walk_init(log_u->mdb_u,
&wok_u->itr_u,
eve_d,
c3_min(max_d, log_u->dun_d));
return wok_u;
}
/* u3_disk_walk_live(): check if live.
*/
c3_o
u3_disk_walk_live(u3_disk_walk* wok_u)
{
if ( wok_u->itr_u.nex_d > wok_u->itr_u.las_d ) {
wok_u->liv_o = c3n;
}
return wok_u->liv_o;
}
/* u3_disk_walk_step(): get next fact.
*/
c3_o
u3_disk_walk_step(u3_disk_walk* wok_u, u3_fact* tac_u)
{
u3_disk* log_u = wok_u->log_u;
size_t len_i;
void* buf_v;
tac_u->eve_d = wok_u->itr_u.nex_d;
if ( c3n == u3_lmdb_walk_next(&wok_u->itr_u, &len_i, &buf_v) ) {
fprintf(stderr, "disk: (%" PRIu64 "): read fail\r\n", tac_u->eve_d);
return wok_u->liv_o = c3n;
}
if ( c3n == u3_disk_sift(log_u, len_i,
(c3_y*)buf_v,
&tac_u->mug_l,
&tac_u->job) )
{
fprintf(stderr, "disk: (%" PRIu64 "): sift fail\r\n", tac_u->eve_d);
return wok_u->liv_o = c3n;
}
return c3y;
}
/* u3_disk_walk_done(): close iterator.
*/
void
u3_disk_walk_done(u3_disk_walk* wok_u)
{
u3_lmdb_walk_done(&wok_u->itr_u);
c3_free(wok_u);
}
/* _disk_save_meta(): serialize atom, save as metadata at [key_c].
*/
static c3_o