Theoretically multi-event writes.

I'm having trouble testing this, though. Every constructed write request
is still only 1 event.
This commit is contained in:
Elliot Glaysher 2019-04-29 10:06:03 -07:00
parent 51253dca5d
commit 8d691a63af
3 changed files with 81 additions and 69 deletions

View File

@ -1303,8 +1303,9 @@
/* u3_lmdb_write_event(): Persists an event to the database
*/
void u3_lmdb_write_event(MDB_env* environment,
u3_writ* event_u,
void (*on_complete)(c3_o success, u3_writ*));
u3_pier* pir_u,
struct u3_lmdb_write_request* request_u,
void (*on_complete)(c3_o success, u3_pier*, c3_d, c3_d));
/* u3_lmdb_read_events(): Reads events back from the database
**

View File

@ -174,7 +174,7 @@ c3_o _perform_get_on_database_noun(MDB_txn* transaction_u,
return c3y;
}
/* u3_lmdb_write_request: contains multiple data
/* u3_lmdb_write_request: contains multiple events to be written together
**
*/
struct u3_lmdb_write_request {
@ -245,27 +245,17 @@ struct _write_request_data {
// the transactions and handles opened from it are explicitly not.
MDB_env* environment;
// The original event. Not to be accessed from the worker thread; only used
// in the callback executed on the main loop thread.
u3_writ* event;
// The pier that we're writing for.
u3_pier* pir_u;
// The event number from event separated out so we can access it on the other
// thread.
c3_d event_number;
// The event serialized out of the loom into a malloced structure accessible
// from the worker thread.
void* malloced_event_data;
// The size of the malloced_event_data. We keep track of this for the
// database write.
size_t malloced_event_data_size;
// The encapsulated request. This may contain multiple event writes.
struct u3_lmdb_write_request* request;
// Whether the write completed successfully.
c3_o success;
// Called on main loop thread on completion.
void (*on_complete)(c3_o, u3_writ*);
void (*on_complete)(c3_o, u3_pier*, c3_d, c3_d);
};
/* _u3_lmdb_write_event_cb(): Implementation of u3_lmdb_write_event()
@ -299,23 +289,41 @@ static void _u3_lmdb_write_event_cb(uv_work_t* req) {
return;
}
// TODO: We need to detect the database being full, making the database
// maxsize larger, and then retrying this transaction.
//
c3_o success = _perform_put_on_database_raw(
transaction_u,
database_u,
MDB_NOOVERWRITE,
&(data->event_number),
sizeof(c3_d),
data->malloced_event_data,
data->malloced_event_data_size);
struct u3_lmdb_write_request* request = data->request;
for (c3_d i = 0; i < request->event_count; ++i) {
c3_d event_number = request->first_event + i;
c3_o success = _perform_put_on_database_raw(
transaction_u,
database_u,
MDB_NOOVERWRITE,
&event_number,
sizeof(c3_d),
request->malloced_event_data[i],
request->malloced_event_data_size[i]);
if (success == c3n) {
u3l_log("lmdb: failed to write event %" PRIu64 "\n", event_number);
mdb_txn_abort(transaction_u);
data->success = c3n;
return;
}
}
ret_w = mdb_txn_commit(transaction_u);
if (0 != ret_w) {
u3l_log("lmdb: failed to commit event %" PRIu64 ": %s\n",
data->event_number,
mdb_strerror(ret_w));
if ( request->event_count == 1 ) {
u3l_log("lmdb: failed to commit event %" PRIu64 ": %s\n",
request->first_event,
mdb_strerror(ret_w));
} else {
u3l_log("lmdb: failed to commit events %" PRIu64 " through %" PRIu64
": %s\n",
request->first_event,
request->first_event + request->event_count - 1ULL,
mdb_strerror(ret_w));
}
data->success = c3n;
return;
}
@ -330,9 +338,12 @@ static void _u3_lmdb_write_event_cb(uv_work_t* req) {
static void _u3_lmdb_write_event_after_cb(uv_work_t* req, int status) {
struct _write_request_data* data = req->data;
data->on_complete(data->success, data->event);
data->on_complete(data->success,
data->pir_u,
data->request->first_event,
data->request->event_count);
free(data->malloced_event_data);
u3_lmdb_free_write_request(data->request);
free(data);
free(req);
}
@ -342,27 +353,17 @@ static void _u3_lmdb_write_event_after_cb(uv_work_t* req, int status) {
** This writes all the passed in events along with log metadata updates to the
** database as a single transaction on a worker thread. Once the transaction
** is completed, it calls the passed in callback on the main loop thread.
**
** TODO: Make this take multiple events in one commit once we have this
** working one at a time.
*/
void u3_lmdb_write_event(MDB_env* environment,
u3_writ* event_u,
void (*on_complete)(c3_o, u3_writ*))
u3_pier* pir_u,
struct u3_lmdb_write_request* request_u,
void (*on_complete)(c3_o, u3_pier*, c3_d, c3_d))
{
// Serialize the jammed $work into a malloced buffer we can send to the other
// thread.
c3_w siz_w = u3r_met(3, event_u->mat);
c3_y* data_u = c3_calloc(siz_w);
u3r_bytes(0, siz_w, data_u, event_u->mat);
// Structure to pass to the worker thread.
struct _write_request_data* data = c3_malloc(sizeof(struct _write_request_data));
data->environment = environment;
data->event = event_u;
data->event_number = event_u->evt_d;
data->malloced_event_data = data_u;
data->malloced_event_data_size = siz_w;
data->pir_u = pir_u;
data->request = request_u;
data->on_complete = on_complete;
data->success = c3n;

View File

@ -89,9 +89,11 @@ _pier_db_shutdown(u3_pier* pir_u)
/* _pier_db_commit_complete(): commit complete.
*/
static void
_pier_db_commit_complete(c3_o success, u3_writ* wit_u)
_pier_db_commit_complete(c3_o success,
u3_pier* pir_u,
c3_d first_event_d,
c3_d event_count_d)
{
u3_pier* pir_u = wit_u->pir_u;
u3_disk* log_u = pir_u->log_u;
if (success == c3n) {
@ -100,14 +102,19 @@ _pier_db_commit_complete(c3_o success, u3_writ* wit_u)
}
#ifdef VERBOSE_EVENTS
u3l_log("pier: (%" PRIu64 "): db commit completed\r\n", wit_u->evt_d);
if (event_count_d != 1) {
u3l_log("pier: (%" PRIu64 "-%" PRIu64 "): db commit completed\r\n",
first_event_d, first_event_d + count_d - 1ULL);
} else {
u3l_log("pier: (%" PRIu64 "): db commit completed\r\n", first_event_d);
}
#endif
/* advance commit counter
*/
{
c3_assert(wit_u->evt_d == log_u->moc_d);
c3_assert(wit_u->evt_d == (1ULL + log_u->com_d));
c3_assert((first_event_d + event_count_d - 1ULL) == log_u->moc_d);
c3_assert(first_event_d == (1ULL + log_u->com_d));
log_u->com_d += 1ULL;
}
@ -117,34 +124,36 @@ _pier_db_commit_complete(c3_o success, u3_writ* wit_u)
/* _pier_db_commit_request(): start commit.
*/
static void
_pier_db_commit_request(u3_writ* wit_u)
_pier_db_commit_request(u3_pier* pir_u,
struct u3_lmdb_write_request* request_u,
c3_d first_event_d,
c3_d count_d)
{
u3_pier* pir_u = wit_u->pir_u;
u3_disk* log_u = pir_u->log_u;
#ifdef VERBOSE_EVENTS
u3l_log("pier: (%" PRIu64 "): commit: request\r\n", wit_u->evt_d);
if (count_d != 1) {
u3l_log("pier: (%" PRIu64 "-%" PRIu64 "): commit: request\r\n",
first_event_d, first_event_d + count_d - 1ULL);
} else {
u3l_log("pier: (%" PRIu64 "): commit: request\r\n", first_event_d);
}
#endif
// |wit_u| is the first of possibly several uncommitted events. We instead
// want to commit all of them at once.
//
//
/* put it in the database
*/
{
u3_lmdb_write_event(log_u->db_u,
wit_u,
pir_u,
request_u,
_pier_db_commit_complete);
}
/* advance commit-request counter
*/
{
c3_assert(wit_u->evt_d == (1ULL + log_u->moc_d));
log_u->moc_d += 1ULL;
c3_assert(first_event_d == (1ULL + log_u->moc_d));
log_u->moc_d += count_d;
}
}
@ -1661,10 +1670,11 @@ start:
struct u3_lmdb_write_request* request =
u3_lmdb_build_write_request(wit_u, count);
c3_assert(request != 0);
u3_lmdb_free_write_request(request);
// TODO(erg): This is the place where we build up things into a queue.
_pier_db_commit_request(wit_u);
_pier_db_commit_request(pir_u,
request,
wit_u->evt_d,
count);
act_o = c3y;
}