Merge pull request #1259 from urbit/cc-batched-writes

Batch event log commits
This commit is contained in:
Elliot Glaysher 2019-04-29 16:21:53 -07:00 committed by GitHub
commit 191c907995
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 165 additions and 62 deletions

View File

@ -1294,11 +1294,29 @@
c3_o u3_lmdb_get_latest_event_number(MDB_env* environment,
c3_d* event_number);
/* u3_lmdb_write_request: opaque write request structures
*/
struct u3_lmdb_write_request;
/* u3_lmdb_build_write_reuqest(): allocates and builds a write request
**
** Reads count sequential writs starting with event_u and creates a
** single write request for all those writs.
*/
struct u3_lmdb_write_request*
u3_lmdb_build_write_request(u3_writ* event_u, c3_d count);
/* u3_lmdb_free_write_request(): frees a write requst
*/
void u3_lmdb_free_write_request(struct u3_lmdb_write_request* request);
/* u3_lmdb_write_event(): Persists an event to the database
*/
void u3_lmdb_write_event(MDB_env* environment,
u3_writ* event_u,
void (*on_complete)(c3_o success, u3_writ*));
u3_pier* pir_u,
struct u3_lmdb_write_request* request_u,
void (*on_complete)(c3_o success, u3_pier*,
c3_d, c3_d));
/* u3_lmdb_read_events(): Reads events back from the database
**

View File

@ -174,6 +174,67 @@ c3_o _perform_get_on_database_noun(MDB_txn* transaction_u,
return c3y;
}
/* u3_lmdb_write_request: Events to be written together
*/
struct u3_lmdb_write_request {
// The event number of the first event.
c3_d first_event;
// The number of events in this write request. Nonzero.
c3_d event_count;
// An array of serialized event datas. The array size is |event_count|. We
// perform the event serialization on the main thread so we can read the loom
// and write into a malloced structure for the worker thread.
void** malloced_event_data;
// An array of sizes of serialized event datas. We keep track of this for the
// database write.
size_t* malloced_event_data_size;
};
/* u3_lmdb_build_write_request(): Allocates and builds a write request
*/
struct u3_lmdb_write_request*
u3_lmdb_build_write_request(u3_writ* event_u, c3_d count)
{
struct u3_lmdb_write_request* request =
c3_malloc(sizeof(struct u3_lmdb_write_request));
request->first_event = event_u->evt_d;
request->event_count = count;
request->malloced_event_data = c3_malloc(sizeof(void*) * count);
request->malloced_event_data_size = c3_malloc(sizeof(size_t) * count);
for (c3_d i = 0; i < count; ++i) {
// Sanity check that the events in u3_writ are in order.
c3_assert(event_u->evt_d == (request->first_event + i));
// Serialize the jammed event log entry into a malloced buffer we can send
// to the other thread.
c3_w siz_w = u3r_met(3, event_u->mat);
c3_y* data_u = c3_calloc(siz_w);
u3r_bytes(0, siz_w, data_u, event_u->mat);
request->malloced_event_data[i] = data_u;
request->malloced_event_data_size[i] = siz_w;
event_u = event_u->nex_u;
}
return request;
}
/* u3_lmdb_free_write_request(): Frees a write request
*/
void u3_lmdb_free_write_request(struct u3_lmdb_write_request* request) {
for (c3_d i = 0; i < request->event_count; ++i)
free(request->malloced_event_data[i]);
free(request->malloced_event_data);
free(request->malloced_event_data_size);
free(request);
}
/* _write_request_data: callback struct for u3_lmdb_write_event()
*/
struct _write_request_data {
@ -181,27 +242,17 @@ struct _write_request_data {
// the transactions and handles opened from it are explicitly not.
MDB_env* environment;
// The original event. Not to be accessed from the worker thread; only used
// in the callback executed on the main loop thread.
u3_writ* event;
// The pier that we're writing for.
u3_pier* pir_u;
// The event number from event separated out so we can access it on the other
// thread.
c3_d event_number;
// The event serialized out of the loom into a malloced structure accessible
// from the worker thread.
void* malloced_event_data;
// The size of the malloced_event_data. We keep track of this for the
// database write.
size_t malloced_event_data_size;
// The encapsulated request. This may contain multiple event writes.
struct u3_lmdb_write_request* request;
// Whether the write completed successfully.
c3_o success;
// Called on main loop thread on completion.
void (*on_complete)(c3_o, u3_writ*);
void (*on_complete)(c3_o, u3_pier*, c3_d, c3_d);
};
/* _u3_lmdb_write_event_cb(): Implementation of u3_lmdb_write_event()
@ -235,23 +286,42 @@ static void _u3_lmdb_write_event_cb(uv_work_t* req) {
return;
}
// TODO: We need to detect the database being full, making the database
// maxsize larger, and then retrying this transaction.
//
c3_o success = _perform_put_on_database_raw(
transaction_u,
database_u,
MDB_NOOVERWRITE,
&(data->event_number),
sizeof(c3_d),
data->malloced_event_data,
data->malloced_event_data_size);
struct u3_lmdb_write_request* request = data->request;
for (c3_d i = 0; i < request->event_count; ++i) {
c3_d event_number = request->first_event + i;
c3_o success = _perform_put_on_database_raw(
transaction_u,
database_u,
MDB_NOOVERWRITE,
&event_number,
sizeof(c3_d),
request->malloced_event_data[i],
request->malloced_event_data_size[i]);
if (success == c3n) {
u3l_log("lmdb: failed to write event %" PRIu64 "\n", event_number);
mdb_txn_abort(transaction_u);
data->success = c3n;
return;
}
}
ret_w = mdb_txn_commit(transaction_u);
if (0 != ret_w) {
u3l_log("lmdb: failed to commit event %" PRIu64 ": %s\n",
data->event_number,
mdb_strerror(ret_w));
if ( request->event_count == 1 ) {
u3l_log("lmdb: failed to commit event %" PRIu64 ": %s\n",
request->first_event,
mdb_strerror(ret_w));
} else {
c3_d through = request->first_event + request->event_count - 1ULL;
u3l_log("lmdb: failed to commit events %" PRIu64 " through %" PRIu64
": %s\n",
request->first_event,
through,
mdb_strerror(ret_w));
}
data->success = c3n;
return;
}
@ -266,9 +336,12 @@ static void _u3_lmdb_write_event_cb(uv_work_t* req) {
static void _u3_lmdb_write_event_after_cb(uv_work_t* req, int status) {
struct _write_request_data* data = req->data;
data->on_complete(data->success, data->event);
data->on_complete(data->success,
data->pir_u,
data->request->first_event,
data->request->event_count);
free(data->malloced_event_data);
u3_lmdb_free_write_request(data->request);
free(data);
free(req);
}
@ -278,27 +351,17 @@ static void _u3_lmdb_write_event_after_cb(uv_work_t* req, int status) {
** This writes all the passed in events along with log metadata updates to the
** database as a single transaction on a worker thread. Once the transaction
** is completed, it calls the passed in callback on the main loop thread.
**
** TODO: Make this take multiple events in one commit once we have this
** working one at a time.
*/
void u3_lmdb_write_event(MDB_env* environment,
u3_writ* event_u,
void (*on_complete)(c3_o, u3_writ*))
u3_pier* pir_u,
struct u3_lmdb_write_request* request_u,
void (*on_complete)(c3_o, u3_pier*, c3_d, c3_d))
{
// Serialize the jammed $work into a malloced buffer we can send to the other
// thread.
c3_w siz_w = u3r_met(3, event_u->mat);
c3_y* data_u = c3_calloc(siz_w);
u3r_bytes(0, siz_w, data_u, event_u->mat);
// Structure to pass to the worker thread.
struct _write_request_data* data = c3_malloc(sizeof(struct _write_request_data));
data->environment = environment;
data->event = event_u;
data->event_number = event_u->evt_d;
data->malloced_event_data = data_u;
data->malloced_event_data_size = siz_w;
data->pir_u = pir_u;
data->request = request_u;
data->on_complete = on_complete;
data->success = c3n;

View File

@ -90,9 +90,11 @@ _pier_db_shutdown(u3_pier* pir_u)
/* _pier_db_commit_complete(): commit complete.
*/
static void
_pier_db_commit_complete(c3_o success, u3_writ* wit_u)
_pier_db_commit_complete(c3_o success,
u3_pier* pir_u,
c3_d first_event_d,
c3_d event_count_d)
{
u3_pier* pir_u = wit_u->pir_u;
u3_disk* log_u = pir_u->log_u;
if (success == c3n) {
@ -101,15 +103,20 @@ _pier_db_commit_complete(c3_o success, u3_writ* wit_u)
}
#ifdef VERBOSE_EVENTS
u3l_log("pier: (%" PRIu64 "): db commit completed\r\n", wit_u->evt_d);
if (event_count_d != 1) {
u3l_log("pier: (%" PRIu64 "-%" PRIu64 "): db commit: complete\r\n",
first_event_d, first_event_d + event_count_d - 1ULL);
} else {
u3l_log("pier: (%" PRIu64 "): db commit: complete\r\n", first_event_d);
}
#endif
/* advance commit counter
*/
{
c3_assert(wit_u->evt_d == log_u->moc_d);
c3_assert(wit_u->evt_d == (1ULL + log_u->com_d));
log_u->com_d += 1ULL;
c3_assert((first_event_d + event_count_d - 1ULL) == log_u->moc_d);
c3_assert(first_event_d == (1ULL + log_u->com_d));
log_u->com_d += event_count_d;
}
_pier_loop_resume(pir_u);
@ -118,28 +125,36 @@ _pier_db_commit_complete(c3_o success, u3_writ* wit_u)
/* _pier_db_commit_request(): start commit.
*/
static void
_pier_db_commit_request(u3_writ* wit_u)
_pier_db_commit_request(u3_pier* pir_u,
struct u3_lmdb_write_request* request_u,
c3_d first_event_d,
c3_d count_d)
{
u3_pier* pir_u = wit_u->pir_u;
u3_disk* log_u = pir_u->log_u;
#ifdef VERBOSE_EVENTS
u3l_log("pier: (%" PRIu64 "): commit: request\r\n", wit_u->evt_d);
if (count_d != 1) {
u3l_log("pier: (%" PRIu64 "-%" PRIu64 "): db commit: request\r\n",
first_event_d, first_event_d + count_d - 1ULL);
} else {
u3l_log("pier: (%" PRIu64 "): db commit: request\r\n", first_event_d);
}
#endif
/* put it in the database
*/
{
u3_lmdb_write_event(log_u->db_u,
wit_u,
pir_u,
request_u,
_pier_db_commit_complete);
}
/* advance commit-request counter
*/
{
c3_assert(wit_u->evt_d == (1ULL + log_u->moc_d));
log_u->moc_d += 1ULL;
c3_assert(first_event_d == (1ULL + log_u->moc_d));
log_u->moc_d += count_d;
}
}
@ -1658,8 +1673,15 @@ start:
(wit_u->evt_d == (1 + log_u->moc_d)) &&
(wit_u->evt_d == (1 + log_u->com_d)) )
{
// TODO(erg): This is the place where we build up things into a queue.
_pier_db_commit_request(wit_u);
c3_d count = 1 + (god_u->dun_d - wit_u->evt_d);
struct u3_lmdb_write_request* request =
u3_lmdb_build_write_request(wit_u, count);
c3_assert(request != 0);
_pier_db_commit_request(pir_u,
request,
wit_u->evt_d,
count);
act_o = c3y;
}