diff --git a/pkg/urbit/include/vere/db/lmdb.h b/pkg/urbit/include/vere/db/lmdb.h index 573a68fa44..a3ca6a414b 100644 --- a/pkg/urbit/include/vere/db/lmdb.h +++ b/pkg/urbit/include/vere/db/lmdb.h @@ -6,6 +6,17 @@ /* lmdb api wrapper */ + /* u3_lmdb_iter: event iterator + */ + typedef struct _u3_lmdb_walk { + MDB_txn* txn_u; // transaction handle + MDB_dbi mdb_u; // db handle + MDB_cursor* cur_u; // db cursor + c3_o red_o; // have we read from this yet? + c3_d nex_d; // next event number + c3_d las_d; // final event number, inclusive + } u3_lmdb_walk; + /* u3_lmdb_init(): open lmdb at [pax_c], mmap up to [siz_i]. */ MDB_env* @@ -61,4 +72,22 @@ size_t val_i, void* val_p); + /* u3_lmdb_walk_init(): initialize db iterator. + */ + c3_o + u3_lmdb_walk_init(MDB_env* env_u, + u3_lmdb_walk* itr_u, + c3_d nex_d, + c3_d las_d); + + /* u3_lmdb_walk_next(): synchronously read next event from iterator. + */ + c3_o + u3_lmdb_walk_next(u3_lmdb_walk* itr_u, size_t* len_i, void** buf_v); + + /* u3_lmdb_walk_done(): close iterator. + */ + void + u3_lmdb_walk_done(u3_lmdb_walk* itr_u); + #endif /* ifndef U3_VERE_DB_LMDB_H */ diff --git a/pkg/urbit/vere/db/lmdb.c b/pkg/urbit/vere/db/lmdb.c index 443f11525d..c95cf38c1a 100644 --- a/pkg/urbit/vere/db/lmdb.c +++ b/pkg/urbit/vere/db/lmdb.c @@ -225,6 +225,112 @@ u3_lmdb_gulf(MDB_env* env_u, c3_d* low_d, c3_d* hig_d) } } +/* u3_lmdb_walk_init(): initialize db iterator. +*/ +c3_o +u3_lmdb_walk_init(MDB_env* env_u, + u3_lmdb_walk* itr_u, + c3_d nex_d, + c3_d las_d) +{ + // XX assumes little-endian + // + MDB_val key_u = { .mv_size = sizeof(c3_d), .mv_data = &nex_d }; + MDB_val val_u; + c3_w ops_w, ret_w; + + itr_u->red_o = c3n; + itr_u->nex_d = nex_d; + itr_u->las_d = las_d; + + // create a read-only transaction. + // + ops_w = MDB_RDONLY; + if ( (ret_w = mdb_txn_begin(env_u, 0, ops_w, &itr_u->txn_u)) ) { + mdb_logerror(stderr, ret_w, "lmdb: read txn_begin fail"); + return c3n; + } + // open the database in the transaction + // + ops_w = MDB_CREATE | MDB_INTEGERKEY; + if ( (ret_w = mdb_dbi_open(itr_u->txn_u, "EVENTS", ops_w, &itr_u->mdb_u)) ) { + mdb_logerror(stderr, ret_w, "lmdb: read: dbi_open fail"); + // XX confirm + // + mdb_txn_abort(itr_u->txn_u); + return c3n; + } + + // creates a cursor to iterate over keys starting at [eve_d] + // + if ( (ret_w = mdb_cursor_open(itr_u->txn_u, itr_u->mdb_u, &itr_u->cur_u)) ) { + mdb_logerror(stderr, ret_w, "lmdb: read: cursor_open fail"); + // XX confirm + // + mdb_txn_abort(itr_u->txn_u); + return c3n; + } + + // set the cursor to the position of [eve_d] + // + ops_w = MDB_SET_KEY; + if ( (ret_w = mdb_cursor_get(itr_u->cur_u, &key_u, &val_u, ops_w)) ) { + mdb_logerror(stderr, ret_w, "lmdb: read: initial cursor_get failed"); + fprintf(stderr, " at %" PRIu64 "\r\n", nex_d); + mdb_cursor_close(itr_u->cur_u); + // XX confirm + // + mdb_txn_abort(itr_u->txn_u); + return c3n; + } + + return c3y; +} + +/* u3_lmdb_walk_next(): synchronously read next event from iterator. +*/ +c3_o +u3_lmdb_walk_next(u3_lmdb_walk* itr_u, size_t* len_i, void** buf_v) +{ + MDB_val key_u, val_u; + c3_w ret_w, ops_w; + + c3_assert( itr_u->nex_d <= itr_u->las_d ); + + ops_w = ( c3y == itr_u->red_o ) ? MDB_NEXT : MDB_GET_CURRENT; + if ( (ret_w = mdb_cursor_get(itr_u->cur_u, &key_u, &val_u, ops_w)) ) { + mdb_logerror(stderr, ret_w, "lmdb: walk error"); + return c3n; + } + + // sanity check: ensure contiguous event numbers + // + if ( *(c3_d*)key_u.mv_data != itr_u->nex_d ) { + fprintf(stderr, "lmdb: read gap: expected %" PRIu64 + ", received %" PRIu64 "\r\n", + itr_u->nex_d, + *(c3_d*)key_u.mv_data); + return c3n; + } + + *len_i = val_u.mv_size; + *buf_v = val_u.mv_data; + + itr_u->nex_d++; + itr_u->red_o = c3y; + + return c3y; +} + +/* u3_lmdb_walk_done(): close iterator. +*/ +void +u3_lmdb_walk_done(u3_lmdb_walk* itr_u) +{ + mdb_cursor_close(itr_u->cur_u); + mdb_txn_abort(itr_u->txn_u); +} + /* u3_lmdb_read(): read [len_d] events starting at [eve_d]. */ c3_o