From 5d71e0a804df75a19094287d06f13cf23c828432 Mon Sep 17 00:00:00 2001 From: Joe Bryan Date: Wed, 17 Jun 2020 22:22:38 -0700 Subject: [PATCH] vere: properly dispose disk resources on exit --- pkg/urbit/include/vere/vere.h | 43 +++++++++- pkg/urbit/vere/disk.c | 149 ++++++++++++++++++++++++++++------ pkg/urbit/vere/foil.c | 12 +-- pkg/urbit/vere/ward.c | 55 +++++++++++++ 4 files changed, 223 insertions(+), 36 deletions(-) diff --git a/pkg/urbit/include/vere/vere.h b/pkg/urbit/include/vere/vere.h index 9abfe5ad6e..16b7e3dc4f 100644 --- a/pkg/urbit/include/vere/vere.h +++ b/pkg/urbit/include/vere/vere.h @@ -459,6 +459,22 @@ struct _u3_writ* ext_u; // queue exit } u3_lord; + /* u3_read: event log read request + */ + typedef struct _u3_read { + union { // read timer/handle + uv_timer_t tim_u; // + uv_handle_t had_u; // + }; // + c3_d eve_d; // first event + c3_d len_d; // read stride + struct _u3_fact* ent_u; // response entry + struct _u3_fact* ext_u; // response exit + struct _u3_read* nex_u; // next read + struct _u3_read* pre_u; // previous read + struct _u3_disk* log_u; // disk backpointer + } u3_read; + /* u3_disk_cb: u3_disk callbacks */ typedef struct _u3_disk_cb { @@ -480,8 +496,11 @@ c3_d sen_d; // commit requested c3_d dun_d; // committed u3_disk_cb cb_u; // callbacks - uv_timer_t tim_u; // read timer - uv_work_t ted_u; // write thread + u3_read* red_u; // read requests + union { // write thread/request + uv_work_t ted_u; // + uv_req_t req_u; // + }; // c3_o ted_o; // c3y == active u3_info put_u; // write queue } u3_disk; @@ -685,6 +704,26 @@ /** ward: common structure lifecycle **/ + /* u3_dent_init(): initialize file record. + */ + u3_dent* + u3_dent_init(const c3_c* nam_c); + + /* u3_dent_free(): dispose file record. + */ + void + u3_dent_free(u3_dent *det_u); + + /* u3_dire_init(): initialize directory record. + */ + u3_dire* + u3_dire_init(const c3_c* pax_c); + + /* u3_dire_free(): dispose directory record. + */ + void + u3_dire_free(u3_dire *dir_u); + /* u3_fact_init(): initialize completed event. */ u3_fact* diff --git a/pkg/urbit/vere/disk.c b/pkg/urbit/vere/disk.c index 4628b62638..66b9c26399 100644 --- a/pkg/urbit/vere/disk.c +++ b/pkg/urbit/vere/disk.c @@ -121,13 +121,16 @@ _disk_commit_done(struct _cd_save* req_u) static void _disk_commit_after_cb(uv_work_t* ted_u, c3_i sas_i) { - // XX UV_ECANCELED == sas_i? - // struct _cd_save* req_u = ted_u->data; - ted_u->data = 0; - req_u->log_u->ted_o = c3n; - _disk_commit_done(req_u); + if ( UV_ECANCELED == sas_i ) { + _disk_free_save(req_u); + } + else { + ted_u->data = 0; + req_u->log_u->ted_o = c3n; + _disk_commit_done(req_u); + } } /* _disk_commit_cb(): off the main thread, write event-batch. @@ -286,20 +289,73 @@ u3_disk_boot_save(u3_disk* log_u) _disk_commit(log_u); } +static void +_disk_read_free(u3_read* red_u) +{ + // free facts (if the read failed) + // + { + u3_fact* tac_u = red_u->ext_u; + u3_fact* nex_u; + + while ( tac_u ) { + nex_u = tac_u->nex_u; + u3_fact_free(tac_u); + tac_u = nex_u; + } + } + + c3_free(red_u); +} + +/* _disk_read_close_cb(): +*/ +static void +_disk_read_close_cb(uv_handle_t* had_u) +{ + u3_read* red_u = had_u->data; + _disk_read_free(red_u); +} + +static void +_disk_read_close(u3_read* red_u) +{ + u3_disk* log_u = red_u->log_u; + + // unlink request + // + { + if ( red_u->pre_u ) { + red_u->pre_u->nex_u = red_u->nex_u; + } + else { + log_u->red_u = red_u->nex_u; + } + + if ( red_u->nex_u ) { + red_u->nex_u->pre_u = red_u->pre_u; + } + } + + uv_close(&red_u->had_u, _disk_read_close_cb); +} + /* _disk_read_done_cb(): finalize read, invoke callback with response. */ static void _disk_read_done_cb(uv_timer_t* tim_u) { - struct _cd_read* red_u = tim_u->data; + u3_read* red_u = tim_u->data; u3_disk* log_u = red_u->log_u; u3_info pay_u = { .ent_u = red_u->ent_u, .ext_u = red_u->ext_u }; c3_assert( red_u->ent_u ); c3_assert( red_u->ext_u ); + red_u->ent_u = 0; + red_u->ext_u = 0; log_u->cb_u.read_done_f(log_u->cb_u.vod_p, pay_u); - uv_close((uv_handle_t*)tim_u, (uv_close_cb)free); + _disk_read_close(red_u); } /* _disk_read_one_cb(): lmdb read callback, invoked for each event in order @@ -307,7 +363,7 @@ _disk_read_done_cb(uv_timer_t* tim_u) static c3_o _disk_read_one_cb(void* vod_p, c3_d eve_d, size_t val_i, void* val_p) { - struct _cd_read* red_u = vod_p; + u3_read* red_u = vod_p; u3_disk* log_u = red_u->log_u; u3_fact* tac_u; @@ -358,7 +414,7 @@ _disk_read_one_cb(void* vod_p, c3_d eve_d, size_t val_i, void* val_p) static void _disk_read_start_cb(uv_timer_t* tim_u) { - struct _cd_read* red_u = tim_u->data; + u3_read* red_u = tim_u->data; u3_disk* log_u = red_u->log_u; // read events synchronously @@ -370,16 +426,13 @@ _disk_read_start_cb(uv_timer_t* tim_u) _disk_read_one_cb) ) { log_u->cb_u.read_bail_f(log_u->cb_u.vod_p, red_u->eve_d); - // XX dispose all facts in red_u - // - c3_free(red_u); - tim_u->data = 0; - return; + _disk_read_close(red_u); } - // finish the read asynchronously // - uv_timer_start(&red_u->tim_u, _disk_read_done_cb, 0, 0); + else { + uv_timer_start(&red_u->tim_u, _disk_read_done_cb, 0, 0); + } } /* u3_disk_read(): read [len_d] events starting at [eve_d]. @@ -387,18 +440,21 @@ _disk_read_start_cb(uv_timer_t* tim_u) void u3_disk_read(u3_disk* log_u, c3_d eve_d, c3_d len_d) { - // XX enqueue [red_u] in [log_u] for cancellation - // - struct _cd_read* red_u = c3_malloc(sizeof(*red_u)); + u3_read* red_u = c3_malloc(sizeof(*red_u)); red_u->log_u = log_u; red_u->eve_d = eve_d; red_u->len_d = len_d; red_u->ent_u = red_u->ext_u = 0; + red_u->pre_u = 0; + red_u->nex_u = log_u->red_u; + + if ( log_u->red_u ) { + log_u->red_u->pre_u = red_u; + } + log_u->red_u = red_u; // perform the read asynchronously // - // XX queue reads for cancelation - // uv_timer_init(u3L, &red_u->tim_u); red_u->tim_u.data = red_u; @@ -542,9 +598,52 @@ u3_disk_read_meta(u3_disk* log_u, void u3_disk_exit(u3_disk* log_u) { - u3_lmdb_exit(log_u->mdb_u); - // XX dispose + // cancel all outstanding reads // + { + u3_read* red_u = log_u->red_u; + + while ( red_u ) { + _disk_read_close(red_u); + red_u = red_u->nex_u; + } + } + + // cancel write thread + // + if ( c3y == log_u->ted_o ) { + c3_i sas_i; + + do { + sas_i = uv_cancel(&log_u->req_u); + fprintf(stderr, "disk canceling\r\n"); + } + while ( UV_EBUSY == sas_i ); + } + + // close database + // + u3_lmdb_exit(log_u->mdb_u); + + // dispose planned writes + // + + { + u3_fact* tac_u = log_u->put_u.ext_u; + u3_fact* nex_u; + + while ( tac_u ) { + nex_u = tac_u->nex_u; + u3_fact_free(tac_u); + tac_u = nex_u; + } + } + + u3_dire_free(log_u->dir_u); + u3_dire_free(log_u->urb_u); + u3_dire_free(log_u->com_u); + + c3_free(log_u); } /* u3_disk_init(): load or create pier directories and event log. @@ -556,8 +655,8 @@ u3_disk_init(c3_c* pax_c, u3_disk_cb cb_u) log_u->liv_o = c3n; log_u->ted_o = c3n; log_u->cb_u = cb_u; - - // uv_timer_init(u3L, &log_u->tim_u); + log_u->red_u = 0; + log_u->put_u.ent_u = log_u->put_u.ext_u = 0; // create/load pier directory // diff --git a/pkg/urbit/vere/foil.c b/pkg/urbit/vere/foil.c index b6647eb409..9c78cd2969 100644 --- a/pkg/urbit/vere/foil.c +++ b/pkg/urbit/vere/foil.c @@ -121,21 +121,15 @@ u3_foil_folder(const c3_c* pax_c) } } } - dir_u = c3_malloc(sizeof *dir_u); - dir_u->all_u = 0; - dir_u->pax_c = c3_malloc(1 + strlen(pax_c)); - strcpy(dir_u->pax_c, pax_c); + + dir_u = u3_dire_init(pax_c); } /* create entries for all files */ while ( UV_EOF != uv_fs_scandir_next(&ruq_u, &den_u) ) { if ( UV_DIRENT_FILE == den_u.type ) { - u3_dent* det_u = c3_malloc(sizeof(*det_u)); - - det_u->nam_c = c3_malloc(1 + strlen(den_u.name)); - strcpy(det_u->nam_c, den_u.name); - + u3_dent* det_u = u3_dent_init(den_u.name); det_u->nex_u = dir_u->all_u; dir_u->all_u = det_u; } diff --git a/pkg/urbit/vere/ward.c b/pkg/urbit/vere/ward.c index 8b4850a7c8..59eef7dab4 100644 --- a/pkg/urbit/vere/ward.c +++ b/pkg/urbit/vere/ward.c @@ -25,6 +25,61 @@ // or allocated in one and freed in another // +/* u3_dent_init(): initialize file record. +*/ +u3_dent* +u3_dent_init(const c3_c* nam_c) +{ + u3_dent *det_u = c3_malloc(sizeof(*det_u)); + det_u->nex_u = 0; + det_u->nam_c = c3_malloc(1 + strlen(nam_c)); + strcpy(det_u->nam_c, nam_c); + + return det_u; +} + +/* u3_dent_free(): dispose file record. +*/ +void +u3_dent_free(u3_dent *det_u) +{ + c3_free(det_u->nam_c); + c3_free(det_u); +} + +/* u3_dire_init(): initialize directory record. +*/ +u3_dire* +u3_dire_init(const c3_c* pax_c) +{ + u3_dire *dir_u = c3_malloc(sizeof *dir_u); + dir_u->all_u = 0; + dir_u->pax_c = c3_malloc(1 + strlen(pax_c)); + strcpy(dir_u->pax_c, pax_c); + + return dir_u; +} + +/* u3_dire_free(): dispose directory record. +*/ +void +u3_dire_free(u3_dire *dir_u) +{ + { + u3_dent *det_u = dir_u->all_u; + u3_dent *nex_u; + + while ( det_u ) { + nex_u = det_u->nex_u; + u3_dent_free(det_u); + det_u = nex_u; + } + } + + c3_free(dir_u->pax_c); + c3_free(dir_u); +} + /* u3_fact_init(): initialize completed event. */ u3_fact*