diff --git a/pkg/urbit/include/vere/vere.h b/pkg/urbit/include/vere/vere.h index 94fcf1bade..710f27710b 100644 --- a/pkg/urbit/include/vere/vere.h +++ b/pkg/urbit/include/vere/vere.h @@ -1294,11 +1294,29 @@ c3_o u3_lmdb_get_latest_event_number(MDB_env* environment, c3_d* event_number); + /* u3_lmdb_write_request: opaque write request structures + */ + struct u3_lmdb_write_request; + + /* u3_lmdb_build_write_reuqest(): allocates and builds a write request + ** + ** Reads count sequential writs starting with event_u and creates a + ** single write request for all those writs. + */ + struct u3_lmdb_write_request* + u3_lmdb_build_write_request(u3_writ* event_u, c3_d count); + + /* u3_lmdb_free_write_request(): frees a write requst + */ + void u3_lmdb_free_write_request(struct u3_lmdb_write_request* request); + /* u3_lmdb_write_event(): Persists an event to the database */ void u3_lmdb_write_event(MDB_env* environment, - u3_writ* event_u, - void (*on_complete)(c3_o success, u3_writ*)); + u3_pier* pir_u, + struct u3_lmdb_write_request* request_u, + void (*on_complete)(c3_o success, u3_pier*, + c3_d, c3_d)); /* u3_lmdb_read_events(): Reads events back from the database ** diff --git a/pkg/urbit/vere/lmdb.c b/pkg/urbit/vere/lmdb.c index d66e268990..15f0807bcf 100644 --- a/pkg/urbit/vere/lmdb.c +++ b/pkg/urbit/vere/lmdb.c @@ -174,6 +174,67 @@ c3_o _perform_get_on_database_noun(MDB_txn* transaction_u, return c3y; } +/* u3_lmdb_write_request: Events to be written together +*/ +struct u3_lmdb_write_request { + // The event number of the first event. + c3_d first_event; + + // The number of events in this write request. Nonzero. + c3_d event_count; + + // An array of serialized event datas. The array size is |event_count|. We + // perform the event serialization on the main thread so we can read the loom + // and write into a malloced structure for the worker thread. + void** malloced_event_data; + + // An array of sizes of serialized event datas. We keep track of this for the + // database write. + size_t* malloced_event_data_size; +}; + +/* u3_lmdb_build_write_request(): Allocates and builds a write request +*/ +struct u3_lmdb_write_request* +u3_lmdb_build_write_request(u3_writ* event_u, c3_d count) +{ + struct u3_lmdb_write_request* request = + c3_malloc(sizeof(struct u3_lmdb_write_request)); + request->first_event = event_u->evt_d; + request->event_count = count; + request->malloced_event_data = c3_malloc(sizeof(void*) * count); + request->malloced_event_data_size = c3_malloc(sizeof(size_t) * count); + + for (c3_d i = 0; i < count; ++i) { + // Sanity check that the events in u3_writ are in order. + c3_assert(event_u->evt_d == (request->first_event + i)); + + // Serialize the jammed event log entry into a malloced buffer we can send + // to the other thread. + c3_w siz_w = u3r_met(3, event_u->mat); + c3_y* data_u = c3_calloc(siz_w); + u3r_bytes(0, siz_w, data_u, event_u->mat); + + request->malloced_event_data[i] = data_u; + request->malloced_event_data_size[i] = siz_w; + + event_u = event_u->nex_u; + } + + return request; +} + +/* u3_lmdb_free_write_request(): Frees a write request +*/ +void u3_lmdb_free_write_request(struct u3_lmdb_write_request* request) { + for (c3_d i = 0; i < request->event_count; ++i) + free(request->malloced_event_data[i]); + + free(request->malloced_event_data); + free(request->malloced_event_data_size); + free(request); +} + /* _write_request_data: callback struct for u3_lmdb_write_event() */ struct _write_request_data { @@ -181,27 +242,17 @@ struct _write_request_data { // the transactions and handles opened from it are explicitly not. MDB_env* environment; - // The original event. Not to be accessed from the worker thread; only used - // in the callback executed on the main loop thread. - u3_writ* event; + // The pier that we're writing for. + u3_pier* pir_u; - // The event number from event separated out so we can access it on the other - // thread. - c3_d event_number; - - // The event serialized out of the loom into a malloced structure accessible - // from the worker thread. - void* malloced_event_data; - - // The size of the malloced_event_data. We keep track of this for the - // database write. - size_t malloced_event_data_size; + // The encapsulated request. This may contain multiple event writes. + struct u3_lmdb_write_request* request; // Whether the write completed successfully. c3_o success; // Called on main loop thread on completion. - void (*on_complete)(c3_o, u3_writ*); + void (*on_complete)(c3_o, u3_pier*, c3_d, c3_d); }; /* _u3_lmdb_write_event_cb(): Implementation of u3_lmdb_write_event() @@ -235,23 +286,42 @@ static void _u3_lmdb_write_event_cb(uv_work_t* req) { return; } - // TODO: We need to detect the database being full, making the database - // maxsize larger, and then retrying this transaction. - // - c3_o success = _perform_put_on_database_raw( - transaction_u, - database_u, - MDB_NOOVERWRITE, - &(data->event_number), - sizeof(c3_d), - data->malloced_event_data, - data->malloced_event_data_size); + struct u3_lmdb_write_request* request = data->request; + for (c3_d i = 0; i < request->event_count; ++i) { + c3_d event_number = request->first_event + i; + + c3_o success = _perform_put_on_database_raw( + transaction_u, + database_u, + MDB_NOOVERWRITE, + &event_number, + sizeof(c3_d), + request->malloced_event_data[i], + request->malloced_event_data_size[i]); + + if (success == c3n) { + u3l_log("lmdb: failed to write event %" PRIu64 "\n", event_number); + mdb_txn_abort(transaction_u); + data->success = c3n; + return; + } + } ret_w = mdb_txn_commit(transaction_u); if (0 != ret_w) { - u3l_log("lmdb: failed to commit event %" PRIu64 ": %s\n", - data->event_number, - mdb_strerror(ret_w)); + if ( request->event_count == 1 ) { + u3l_log("lmdb: failed to commit event %" PRIu64 ": %s\n", + request->first_event, + mdb_strerror(ret_w)); + } else { + c3_d through = request->first_event + request->event_count - 1ULL; + u3l_log("lmdb: failed to commit events %" PRIu64 " through %" PRIu64 + ": %s\n", + request->first_event, + through, + mdb_strerror(ret_w)); + } + data->success = c3n; return; } @@ -266,9 +336,12 @@ static void _u3_lmdb_write_event_cb(uv_work_t* req) { static void _u3_lmdb_write_event_after_cb(uv_work_t* req, int status) { struct _write_request_data* data = req->data; - data->on_complete(data->success, data->event); + data->on_complete(data->success, + data->pir_u, + data->request->first_event, + data->request->event_count); - free(data->malloced_event_data); + u3_lmdb_free_write_request(data->request); free(data); free(req); } @@ -278,27 +351,17 @@ static void _u3_lmdb_write_event_after_cb(uv_work_t* req, int status) { ** This writes all the passed in events along with log metadata updates to the ** database as a single transaction on a worker thread. Once the transaction ** is completed, it calls the passed in callback on the main loop thread. -** -** TODO: Make this take multiple events in one commit once we have this -** working one at a time. */ void u3_lmdb_write_event(MDB_env* environment, - u3_writ* event_u, - void (*on_complete)(c3_o, u3_writ*)) + u3_pier* pir_u, + struct u3_lmdb_write_request* request_u, + void (*on_complete)(c3_o, u3_pier*, c3_d, c3_d)) { - // Serialize the jammed $work into a malloced buffer we can send to the other - // thread. - c3_w siz_w = u3r_met(3, event_u->mat); - c3_y* data_u = c3_calloc(siz_w); - u3r_bytes(0, siz_w, data_u, event_u->mat); - // Structure to pass to the worker thread. struct _write_request_data* data = c3_malloc(sizeof(struct _write_request_data)); data->environment = environment; - data->event = event_u; - data->event_number = event_u->evt_d; - data->malloced_event_data = data_u; - data->malloced_event_data_size = siz_w; + data->pir_u = pir_u; + data->request = request_u; data->on_complete = on_complete; data->success = c3n; diff --git a/pkg/urbit/vere/pier.c b/pkg/urbit/vere/pier.c index bf8de5c84c..730e47f095 100644 --- a/pkg/urbit/vere/pier.c +++ b/pkg/urbit/vere/pier.c @@ -90,9 +90,11 @@ _pier_db_shutdown(u3_pier* pir_u) /* _pier_db_commit_complete(): commit complete. */ static void -_pier_db_commit_complete(c3_o success, u3_writ* wit_u) +_pier_db_commit_complete(c3_o success, + u3_pier* pir_u, + c3_d first_event_d, + c3_d event_count_d) { - u3_pier* pir_u = wit_u->pir_u; u3_disk* log_u = pir_u->log_u; if (success == c3n) { @@ -101,15 +103,20 @@ _pier_db_commit_complete(c3_o success, u3_writ* wit_u) } #ifdef VERBOSE_EVENTS - u3l_log("pier: (%" PRIu64 "): db commit completed\r\n", wit_u->evt_d); + if (event_count_d != 1) { + u3l_log("pier: (%" PRIu64 "-%" PRIu64 "): db commit: complete\r\n", + first_event_d, first_event_d + event_count_d - 1ULL); + } else { + u3l_log("pier: (%" PRIu64 "): db commit: complete\r\n", first_event_d); + } #endif /* advance commit counter */ { - c3_assert(wit_u->evt_d == log_u->moc_d); - c3_assert(wit_u->evt_d == (1ULL + log_u->com_d)); - log_u->com_d += 1ULL; + c3_assert((first_event_d + event_count_d - 1ULL) == log_u->moc_d); + c3_assert(first_event_d == (1ULL + log_u->com_d)); + log_u->com_d += event_count_d; } _pier_loop_resume(pir_u); @@ -118,28 +125,36 @@ _pier_db_commit_complete(c3_o success, u3_writ* wit_u) /* _pier_db_commit_request(): start commit. */ static void -_pier_db_commit_request(u3_writ* wit_u) +_pier_db_commit_request(u3_pier* pir_u, + struct u3_lmdb_write_request* request_u, + c3_d first_event_d, + c3_d count_d) { - u3_pier* pir_u = wit_u->pir_u; u3_disk* log_u = pir_u->log_u; #ifdef VERBOSE_EVENTS - u3l_log("pier: (%" PRIu64 "): commit: request\r\n", wit_u->evt_d); + if (count_d != 1) { + u3l_log("pier: (%" PRIu64 "-%" PRIu64 "): db commit: request\r\n", + first_event_d, first_event_d + count_d - 1ULL); + } else { + u3l_log("pier: (%" PRIu64 "): db commit: request\r\n", first_event_d); + } #endif /* put it in the database */ { u3_lmdb_write_event(log_u->db_u, - wit_u, + pir_u, + request_u, _pier_db_commit_complete); } /* advance commit-request counter */ { - c3_assert(wit_u->evt_d == (1ULL + log_u->moc_d)); - log_u->moc_d += 1ULL; + c3_assert(first_event_d == (1ULL + log_u->moc_d)); + log_u->moc_d += count_d; } } @@ -1658,8 +1673,15 @@ start: (wit_u->evt_d == (1 + log_u->moc_d)) && (wit_u->evt_d == (1 + log_u->com_d)) ) { - // TODO(erg): This is the place where we build up things into a queue. - _pier_db_commit_request(wit_u); + c3_d count = 1 + (god_u->dun_d - wit_u->evt_d); + struct u3_lmdb_write_request* request = + u3_lmdb_build_write_request(wit_u, count); + c3_assert(request != 0); + + _pier_db_commit_request(pir_u, + request, + wit_u->evt_d, + count); act_o = c3y; }