vere: factors out sift/etch functions event serialization

This commit is contained in:
Joe Bryan 2022-12-13 21:32:03 -05:00
parent 5a775d1c79
commit 91f7818ab7
2 changed files with 87 additions and 37 deletions

View File

@ -943,6 +943,23 @@
u3_disk*
u3_disk_init(c3_c* pax_c, u3_disk_cb cb_u);
/* u3_disk_etch(): serialize an event for persistence. RETAIN [eve]
*/
size_t
u3_disk_etch(u3_disk* log_u,
u3_noun eve,
c3_l mug_l,
c3_y** out_y);
/* u3_disk_sift(): parse a persisted event buffer.
*/
c3_o
u3_disk_sift(u3_disk* log_u,
size_t len_i,
c3_y* dat_y,
c3_l* mug_l,
u3_noun* job);
/* u3_disk_info(): status info as $mass.
*/
u3_noun

View File

@ -149,34 +149,47 @@ _disk_commit_start(struct _cd_save* req_u)
_disk_commit_after_cb);
}
/* _disk_serialize_v1(): serialize events in format v1.
/* u3_disk_etch(): serialize an event for persistence. RETAIN [eve]
*/
static c3_w
_disk_serialize_v1(u3_fact* tac_u, c3_y** out_y)
size_t
u3_disk_etch(u3_disk* log_u,
u3_noun eve,
c3_l mug_l,
c3_y** out_y)
{
size_t len_i;
c3_y* dat_y;
#ifdef DISK_TRACE_JAM
u3t_event_trace("king disk jam", 'B');
u3t_event_trace("disk etch", 'B');
#endif
// XX check version number in log_u
// XX needs api redesign to limit allocations
//
{
u3_atom mat = u3qe_jam(tac_u->job);
u3_atom mat = u3qe_jam(eve);
c3_w len_w = u3r_met(3, mat);
c3_y* dat_y = c3_malloc(4 + len_w);
dat_y[0] = tac_u->mug_l & 0xff;
dat_y[1] = (tac_u->mug_l >> 8) & 0xff;
dat_y[2] = (tac_u->mug_l >> 16) & 0xff;
dat_y[3] = (tac_u->mug_l >> 24) & 0xff;
len_i = 4 + len_w;
dat_y = c3_malloc(len_i);
dat_y[0] = mug_l & 0xff;
dat_y[1] = (mug_l >> 8) & 0xff;
dat_y[2] = (mug_l >> 16) & 0xff;
dat_y[3] = (mug_l >> 24) & 0xff;
u3r_bytes(0, len_w, dat_y + 4, mat);
u3z(mat);
}
#ifdef DISK_TRACE_JAM
u3t_event_trace("king disk jam", 'E');
u3t_event_trace("disk etch", 'E');
#endif
u3z(mat);
*out_y = dat_y;
return len_w + 4;
}
*out_y = dat_y;
return len_i;
}
/* _disk_batch(): create a write batch
@ -200,7 +213,8 @@ _disk_batch(u3_disk* log_u, c3_d len_d)
for ( c3_d i_d = 0ULL; i_d < len_d; ++i_d) {
c3_assert( (req_u->eve_d + i_d) == tac_u->eve_d );
req_u->siz_i[i_d] = _disk_serialize_v1(tac_u, &req_u->byt_y[i_d]);
req_u->siz_i[i_d] = u3_disk_etch(log_u, tac_u->job,
tac_u->mug_l, &req_u->byt_y[i_d]);
tac_u = tac_u->nex_u;
}
@ -358,6 +372,41 @@ _disk_read_done_cb(uv_timer_t* tim_u)
_disk_read_close(red_u);
}
/* u3_disk_sift(): parse a persisted event buffer.
*/
c3_o
u3_disk_sift(u3_disk* log_u,
size_t len_i,
c3_y* dat_y,
c3_l* mug_l,
u3_noun* job)
{
if ( 4 >= len_i ) {
return c3n;
}
#ifdef DISK_TRACE_CUE
u3t_event_trace("disk sift", 'B');
#endif
// XX check version in log_u
//
*mug_l = dat_y[0]
^ (dat_y[1] << 8)
^ (dat_y[2] << 16)
^ (dat_y[3] << 24);
// XX u3m_soft?
//
*job = u3ke_cue(u3i_bytes(len_i - 4, dat_y + 4));
#ifdef DISK_TRACE_CUE
u3t_event_trace("disk sift", 'E');
#endif
return c3y;
}
/* _disk_read_one_cb(): lmdb read callback, invoked for each event in order
*/
static c3_o
@ -367,29 +416,13 @@ _disk_read_one_cb(void* ptr_v, c3_d eve_d, size_t val_i, void* val_p)
u3_disk* log_u = red_u->log_u;
u3_fact* tac_u;
if ( 4 >= val_i ) {
return c3n;
}
{
u3_noun job;
c3_y* dat_y = val_p;
c3_l mug_l = dat_y[0]
^ (dat_y[1] << 8)
^ (dat_y[2] << 16)
^ (dat_y[3] << 24);
c3_l mug_l;
#ifdef DISK_TRACE_CUE
u3t_event_trace("king disk cue", 'B');
#endif
// XX u3m_soft?
//
job = u3ke_cue(u3i_bytes(val_i - 4, dat_y + 4));
#ifdef DISK_TRACE_CUE
u3t_event_trace("king disk cue", 'E');
#endif
if ( c3n == u3_disk_sift(log_u, val_i, (c3_y*)val_p, &mug_l, &job) ) {
return c3n;
}
tac_u = u3_fact_init(eve_d, mug_l, job);
}