vere: ports synchronous lmdb iterator

This commit is contained in:
Joe Bryan 2022-12-13 11:50:56 -05:00
parent 91f7818ab7
commit 99a8ccda7b
2 changed files with 135 additions and 0 deletions

View File

@ -6,6 +6,17 @@
/* lmdb api wrapper
*/
/* u3_lmdb_iter: event iterator
*/
typedef struct _u3_lmdb_walk {
MDB_txn* txn_u; // transaction handle
MDB_dbi mdb_u; // db handle
MDB_cursor* cur_u; // db cursor
c3_o red_o; // have we read from this yet?
c3_d nex_d; // next event number
c3_d las_d; // final event number, inclusive
} u3_lmdb_walk;
/* u3_lmdb_init(): open lmdb at [pax_c], mmap up to [siz_i].
*/
MDB_env*
@ -61,4 +72,22 @@
size_t val_i,
void* val_p);
/* u3_lmdb_walk_init(): initialize db iterator.
*/
c3_o
u3_lmdb_walk_init(MDB_env* env_u,
u3_lmdb_walk* itr_u,
c3_d nex_d,
c3_d las_d);
/* u3_lmdb_walk_next(): synchronously read next event from iterator.
*/
c3_o
u3_lmdb_walk_next(u3_lmdb_walk* itr_u, size_t* len_i, void** buf_v);
/* u3_lmdb_walk_done(): close iterator.
*/
void
u3_lmdb_walk_done(u3_lmdb_walk* itr_u);
#endif /* ifndef U3_VERE_DB_LMDB_H */

View File

@ -225,6 +225,112 @@ u3_lmdb_gulf(MDB_env* env_u, c3_d* low_d, c3_d* hig_d)
}
}
/* u3_lmdb_walk_init(): initialize db iterator.
*/
c3_o
u3_lmdb_walk_init(MDB_env* env_u,
u3_lmdb_walk* itr_u,
c3_d nex_d,
c3_d las_d)
{
// XX assumes little-endian
//
MDB_val key_u = { .mv_size = sizeof(c3_d), .mv_data = &nex_d };
MDB_val val_u;
c3_w ops_w, ret_w;
itr_u->red_o = c3n;
itr_u->nex_d = nex_d;
itr_u->las_d = las_d;
// create a read-only transaction.
//
ops_w = MDB_RDONLY;
if ( (ret_w = mdb_txn_begin(env_u, 0, ops_w, &itr_u->txn_u)) ) {
mdb_logerror(stderr, ret_w, "lmdb: read txn_begin fail");
return c3n;
}
// open the database in the transaction
//
ops_w = MDB_CREATE | MDB_INTEGERKEY;
if ( (ret_w = mdb_dbi_open(itr_u->txn_u, "EVENTS", ops_w, &itr_u->mdb_u)) ) {
mdb_logerror(stderr, ret_w, "lmdb: read: dbi_open fail");
// XX confirm
//
mdb_txn_abort(itr_u->txn_u);
return c3n;
}
// creates a cursor to iterate over keys starting at [eve_d]
//
if ( (ret_w = mdb_cursor_open(itr_u->txn_u, itr_u->mdb_u, &itr_u->cur_u)) ) {
mdb_logerror(stderr, ret_w, "lmdb: read: cursor_open fail");
// XX confirm
//
mdb_txn_abort(itr_u->txn_u);
return c3n;
}
// set the cursor to the position of [eve_d]
//
ops_w = MDB_SET_KEY;
if ( (ret_w = mdb_cursor_get(itr_u->cur_u, &key_u, &val_u, ops_w)) ) {
mdb_logerror(stderr, ret_w, "lmdb: read: initial cursor_get failed");
fprintf(stderr, " at %" PRIu64 "\r\n", nex_d);
mdb_cursor_close(itr_u->cur_u);
// XX confirm
//
mdb_txn_abort(itr_u->txn_u);
return c3n;
}
return c3y;
}
/* u3_lmdb_walk_next(): synchronously read next event from iterator.
*/
c3_o
u3_lmdb_walk_next(u3_lmdb_walk* itr_u, size_t* len_i, void** buf_v)
{
MDB_val key_u, val_u;
c3_w ret_w, ops_w;
c3_assert( itr_u->nex_d <= itr_u->las_d );
ops_w = ( c3y == itr_u->red_o ) ? MDB_NEXT : MDB_GET_CURRENT;
if ( (ret_w = mdb_cursor_get(itr_u->cur_u, &key_u, &val_u, ops_w)) ) {
mdb_logerror(stderr, ret_w, "lmdb: walk error");
return c3n;
}
// sanity check: ensure contiguous event numbers
//
if ( *(c3_d*)key_u.mv_data != itr_u->nex_d ) {
fprintf(stderr, "lmdb: read gap: expected %" PRIu64
", received %" PRIu64 "\r\n",
itr_u->nex_d,
*(c3_d*)key_u.mv_data);
return c3n;
}
*len_i = val_u.mv_size;
*buf_v = val_u.mv_data;
itr_u->nex_d++;
itr_u->red_o = c3y;
return c3y;
}
/* u3_lmdb_walk_done(): close iterator.
*/
void
u3_lmdb_walk_done(u3_lmdb_walk* itr_u)
{
mdb_cursor_close(itr_u->cur_u);
mdb_txn_abort(itr_u->txn_u);
}
/* u3_lmdb_read(): read [len_d] events starting at [eve_d].
*/
c3_o