vere: properly dispose disk resources on exit

This commit is contained in:
Joe Bryan 2020-06-17 22:22:38 -07:00
parent df7cd2a673
commit 5d71e0a804
4 changed files with 223 additions and 36 deletions

View File

@ -459,6 +459,22 @@
struct _u3_writ* ext_u; // queue exit
} u3_lord;
/* u3_read: event log read request
*/
typedef struct _u3_read {
union { // read timer/handle
uv_timer_t tim_u; //
uv_handle_t had_u; //
}; //
c3_d eve_d; // first event
c3_d len_d; // read stride
struct _u3_fact* ent_u; // response entry
struct _u3_fact* ext_u; // response exit
struct _u3_read* nex_u; // next read
struct _u3_read* pre_u; // previous read
struct _u3_disk* log_u; // disk backpointer
} u3_read;
/* u3_disk_cb: u3_disk callbacks
*/
typedef struct _u3_disk_cb {
@ -480,8 +496,11 @@
c3_d sen_d; // commit requested
c3_d dun_d; // committed
u3_disk_cb cb_u; // callbacks
uv_timer_t tim_u; // read timer
uv_work_t ted_u; // write thread
u3_read* red_u; // read requests
union { // write thread/request
uv_work_t ted_u; //
uv_req_t req_u; //
}; //
c3_o ted_o; // c3y == active
u3_info put_u; // write queue
} u3_disk;
@ -685,6 +704,26 @@
/** ward: common structure lifecycle
**/
/* u3_dent_init(): initialize file record.
*/
u3_dent*
u3_dent_init(const c3_c* nam_c);
/* u3_dent_free(): dispose file record.
*/
void
u3_dent_free(u3_dent *det_u);
/* u3_dire_init(): initialize directory record.
*/
u3_dire*
u3_dire_init(const c3_c* pax_c);
/* u3_dire_free(): dispose directory record.
*/
void
u3_dire_free(u3_dire *dir_u);
/* u3_fact_init(): initialize completed event.
*/
u3_fact*

View File

@ -121,14 +121,17 @@ _disk_commit_done(struct _cd_save* req_u)
static void
_disk_commit_after_cb(uv_work_t* ted_u, c3_i sas_i)
{
// XX UV_ECANCELED == sas_i?
//
struct _cd_save* req_u = ted_u->data;
if ( UV_ECANCELED == sas_i ) {
_disk_free_save(req_u);
}
else {
ted_u->data = 0;
req_u->log_u->ted_o = c3n;
_disk_commit_done(req_u);
}
}
/* _disk_commit_cb(): off the main thread, write event-batch.
*/
@ -286,20 +289,73 @@ u3_disk_boot_save(u3_disk* log_u)
_disk_commit(log_u);
}
static void
_disk_read_free(u3_read* red_u)
{
// free facts (if the read failed)
//
{
u3_fact* tac_u = red_u->ext_u;
u3_fact* nex_u;
while ( tac_u ) {
nex_u = tac_u->nex_u;
u3_fact_free(tac_u);
tac_u = nex_u;
}
}
c3_free(red_u);
}
/* _disk_read_close_cb():
*/
static void
_disk_read_close_cb(uv_handle_t* had_u)
{
u3_read* red_u = had_u->data;
_disk_read_free(red_u);
}
static void
_disk_read_close(u3_read* red_u)
{
u3_disk* log_u = red_u->log_u;
// unlink request
//
{
if ( red_u->pre_u ) {
red_u->pre_u->nex_u = red_u->nex_u;
}
else {
log_u->red_u = red_u->nex_u;
}
if ( red_u->nex_u ) {
red_u->nex_u->pre_u = red_u->pre_u;
}
}
uv_close(&red_u->had_u, _disk_read_close_cb);
}
/* _disk_read_done_cb(): finalize read, invoke callback with response.
*/
static void
_disk_read_done_cb(uv_timer_t* tim_u)
{
struct _cd_read* red_u = tim_u->data;
u3_read* red_u = tim_u->data;
u3_disk* log_u = red_u->log_u;
u3_info pay_u = { .ent_u = red_u->ent_u, .ext_u = red_u->ext_u };
c3_assert( red_u->ent_u );
c3_assert( red_u->ext_u );
red_u->ent_u = 0;
red_u->ext_u = 0;
log_u->cb_u.read_done_f(log_u->cb_u.vod_p, pay_u);
uv_close((uv_handle_t*)tim_u, (uv_close_cb)free);
_disk_read_close(red_u);
}
/* _disk_read_one_cb(): lmdb read callback, invoked for each event in order
@ -307,7 +363,7 @@ _disk_read_done_cb(uv_timer_t* tim_u)
static c3_o
_disk_read_one_cb(void* vod_p, c3_d eve_d, size_t val_i, void* val_p)
{
struct _cd_read* red_u = vod_p;
u3_read* red_u = vod_p;
u3_disk* log_u = red_u->log_u;
u3_fact* tac_u;
@ -358,7 +414,7 @@ _disk_read_one_cb(void* vod_p, c3_d eve_d, size_t val_i, void* val_p)
static void
_disk_read_start_cb(uv_timer_t* tim_u)
{
struct _cd_read* red_u = tim_u->data;
u3_read* red_u = tim_u->data;
u3_disk* log_u = red_u->log_u;
// read events synchronously
@ -370,35 +426,35 @@ _disk_read_start_cb(uv_timer_t* tim_u)
_disk_read_one_cb) )
{
log_u->cb_u.read_bail_f(log_u->cb_u.vod_p, red_u->eve_d);
// XX dispose all facts in red_u
//
c3_free(red_u);
tim_u->data = 0;
return;
_disk_read_close(red_u);
}
// finish the read asynchronously
//
else {
uv_timer_start(&red_u->tim_u, _disk_read_done_cb, 0, 0);
}
}
/* u3_disk_read(): read [len_d] events starting at [eve_d].
*/
void
u3_disk_read(u3_disk* log_u, c3_d eve_d, c3_d len_d)
{
// XX enqueue [red_u] in [log_u] for cancellation
//
struct _cd_read* red_u = c3_malloc(sizeof(*red_u));
u3_read* red_u = c3_malloc(sizeof(*red_u));
red_u->log_u = log_u;
red_u->eve_d = eve_d;
red_u->len_d = len_d;
red_u->ent_u = red_u->ext_u = 0;
red_u->pre_u = 0;
red_u->nex_u = log_u->red_u;
if ( log_u->red_u ) {
log_u->red_u->pre_u = red_u;
}
log_u->red_u = red_u;
// perform the read asynchronously
//
// XX queue reads for cancelation
//
uv_timer_init(u3L, &red_u->tim_u);
red_u->tim_u.data = red_u;
@ -542,9 +598,52 @@ u3_disk_read_meta(u3_disk* log_u,
void
u3_disk_exit(u3_disk* log_u)
{
u3_lmdb_exit(log_u->mdb_u);
// XX dispose
// cancel all outstanding reads
//
{
u3_read* red_u = log_u->red_u;
while ( red_u ) {
_disk_read_close(red_u);
red_u = red_u->nex_u;
}
}
// cancel write thread
//
if ( c3y == log_u->ted_o ) {
c3_i sas_i;
do {
sas_i = uv_cancel(&log_u->req_u);
fprintf(stderr, "disk canceling\r\n");
}
while ( UV_EBUSY == sas_i );
}
// close database
//
u3_lmdb_exit(log_u->mdb_u);
// dispose planned writes
//
{
u3_fact* tac_u = log_u->put_u.ext_u;
u3_fact* nex_u;
while ( tac_u ) {
nex_u = tac_u->nex_u;
u3_fact_free(tac_u);
tac_u = nex_u;
}
}
u3_dire_free(log_u->dir_u);
u3_dire_free(log_u->urb_u);
u3_dire_free(log_u->com_u);
c3_free(log_u);
}
/* u3_disk_init(): load or create pier directories and event log.
@ -556,8 +655,8 @@ u3_disk_init(c3_c* pax_c, u3_disk_cb cb_u)
log_u->liv_o = c3n;
log_u->ted_o = c3n;
log_u->cb_u = cb_u;
// uv_timer_init(u3L, &log_u->tim_u);
log_u->red_u = 0;
log_u->put_u.ent_u = log_u->put_u.ext_u = 0;
// create/load pier directory
//

View File

@ -121,21 +121,15 @@ u3_foil_folder(const c3_c* pax_c)
}
}
}
dir_u = c3_malloc(sizeof *dir_u);
dir_u->all_u = 0;
dir_u->pax_c = c3_malloc(1 + strlen(pax_c));
strcpy(dir_u->pax_c, pax_c);
dir_u = u3_dire_init(pax_c);
}
/* create entries for all files
*/
while ( UV_EOF != uv_fs_scandir_next(&ruq_u, &den_u) ) {
if ( UV_DIRENT_FILE == den_u.type ) {
u3_dent* det_u = c3_malloc(sizeof(*det_u));
det_u->nam_c = c3_malloc(1 + strlen(den_u.name));
strcpy(det_u->nam_c, den_u.name);
u3_dent* det_u = u3_dent_init(den_u.name);
det_u->nex_u = dir_u->all_u;
dir_u->all_u = det_u;
}

View File

@ -25,6 +25,61 @@
// or allocated in one and freed in another
//
/* u3_dent_init(): initialize file record.
*/
u3_dent*
u3_dent_init(const c3_c* nam_c)
{
u3_dent *det_u = c3_malloc(sizeof(*det_u));
det_u->nex_u = 0;
det_u->nam_c = c3_malloc(1 + strlen(nam_c));
strcpy(det_u->nam_c, nam_c);
return det_u;
}
/* u3_dent_free(): dispose file record.
*/
void
u3_dent_free(u3_dent *det_u)
{
c3_free(det_u->nam_c);
c3_free(det_u);
}
/* u3_dire_init(): initialize directory record.
*/
u3_dire*
u3_dire_init(const c3_c* pax_c)
{
u3_dire *dir_u = c3_malloc(sizeof *dir_u);
dir_u->all_u = 0;
dir_u->pax_c = c3_malloc(1 + strlen(pax_c));
strcpy(dir_u->pax_c, pax_c);
return dir_u;
}
/* u3_dire_free(): dispose directory record.
*/
void
u3_dire_free(u3_dire *dir_u)
{
{
u3_dent *det_u = dir_u->all_u;
u3_dent *nex_u;
while ( det_u ) {
nex_u = det_u->nex_u;
u3_dent_free(det_u);
det_u = nex_u;
}
}
c3_free(dir_u->pax_c);
c3_free(dir_u);
}
/* u3_fact_init(): initialize completed event.
*/
u3_fact*