vere: move scry api into pier.c, interleave events

Scries were being prioritized over events, in the IPC queue. If scry
requests came in faster than we could process them (as is easily the
case for ames forward requests), this would cause scries to completely
clog the IPC queue, preventing any events from being processed at all.

The short-term solution implemented here is to simply alternate between
scry requests and events when building/sending a work batch.

To accomplish this, we separate scry requests into their own queue. We
keep this in pier.c, and pass them on to lord.c interleaved with regular
events in _pier_work_send.

The interleaving of regular events (as opposed to doing scries with the
highest priority) complicates the situation around auto-filled scry
cases (for scrying at "the latest timestamp").
To ensure we're always scrying at a sane latest timestamp, we keep a
queue of u3_pico, instead of fully-qualified scry requests. Where
necessary, lord.c fills in the missing path elements right before
sending it over the pipe.
This commit is contained in:
Fang 2020-08-05 01:24:01 +02:00
parent 71a0e61c2e
commit b5a9a7d538
No known key found for this signature in database
GPG Key ID: EB035760C1BBA972
4 changed files with 232 additions and 103 deletions

View File

@ -381,6 +381,36 @@
*/
typedef void (*u3_peek_cb)(void*, u3_noun);
/* u3_pico_type: kinds of proto-peek
*/
typedef enum {
u3_pico_full = 0,
u3_pico_mine = 1,
u3_pico_last = 2
} u3_pico_type;
/* u3_pico: proto-peek
*/
typedef struct _u3_pico {
struct _u3_pico* nex_u; // next in queue
void* ptr_v; // context
u3_peek_cb fun_f; // callback
u3_noun gan; // leakset
u3_pico_type typ_e; // type-tagged
union { //
u3_noun ful; // full: /care/beam
struct { // mine:
c3_m car_m; // care
u3_noun pax; // /desk/case/path
} min_u; //
struct { // last:
c3_m car_m; // care
u3_atom des; // desk
u3_noun pax; // /path
} las_u;
};
} u3_pico;
/* u3_peek: namespace read request
*/
typedef struct _u3_peek {
@ -603,6 +633,10 @@
u3_play* pay_u; // recompute
u3_work* wok_u; // work
};
struct {
u3_pico* ent_u;
u3_pico* ext_u;
} pec_u;
// XX remove
c3_s por_s; // UDP port
u3_save* sav_u; // autosave
@ -958,35 +992,11 @@
void
u3_lord_play(u3_lord* god_u, u3_info fon_u);
/* u3_lord_peek(): read namespace.
/* u3_lord_peek_pico(): read namespace, injecting what's missing.
*/
void
u3_lord_peek(u3_lord* god_u,
u3_noun gan,
u3_noun ful,
void* ptr_v,
u3_peek_cb fun_f);
/* u3_lord_peek_mine(): read namespace, injecting ship.
*/
void
u3_lord_peek_mine(u3_lord* god_u,
u3_noun gan,
c3_m car_m,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f);
/* u3_lord_peek_last(): read namespace, injecting ship and case.
*/
void
u3_lord_peek_last(u3_lord* god_u,
u3_noun gan,
c3_m car_m,
u3_atom des,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f);
u3_lord_peek_pico(u3_lord* god_u,
u3_pico* pic_u);
/** Filesystem (new api).
**/
@ -1199,6 +1209,38 @@
void
u3_newt_mojo_stop(u3_mojo* moj_u, u3_moor_bail bal_f);
/** Pier scries.
**/
/* u3_pier_peek(): read namespace.
*/
void
u3_pier_peek(u3_pier* pir_u,
u3_noun gan,
u3_noun ful,
void* ptr_v,
u3_peek_cb fun_f);
/* u3_pier_peek_mine(): read namespace, injecting ship.
*/
void
u3_pier_peek_mine(u3_pier* pir_u,
u3_noun gan,
c3_m car_m,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f);
/* u3_pier_peek_last(): read namespace, injecting ship and case.
*/
void
u3_pier_peek_last(u3_pier* pir_u,
u3_noun gan,
c3_m car_m,
u3_atom des,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f);
/** Pier control.
**/
/* u3_pier_exit(): trigger a gentle shutdown.

View File

@ -831,7 +831,7 @@ _ames_recv_cb(uv_udp_t* wax_u,
u3dc("scot", 'p', u3i_chubs(2, rec_d)),
u3i_string("forward-lane"),
u3_nul);
u3_lord_peek_last(sam_u->pir_u->god_u, u3_nul, c3_s2('a', 'x'),
u3_pier_peek_last(sam_u->pir_u, u3_nul, c3_s2('a', 'x'),
u3_nul, pax, pac_u, _ames_lane_scry_cb);
}
}
@ -1143,7 +1143,7 @@ u3_ames_io_init(u3_pier* pir_u)
// scry the protocol version out of arvo
//
u3_lord_peek_last(pir_u->god_u, u3_nul, c3_s2('a', 'x'), u3_nul,
u3_pier_peek_last(pir_u, u3_nul, c3_s2('a', 'x'), u3_nul,
u3nt(u3i_string("protocol"), u3i_string("version"), u3_nul),
sam_u, _ames_prot_scry_cb);

View File

@ -803,92 +803,53 @@ _lord_writ_plan(u3_lord* god_u, u3_writ* wit_u)
_lord_writ_send(god_u, wit_u);
}
/* u3_lord_peek(): read namespace.
/* u3_lord_peek_pico(): read namespace, injecting what's missing.
*/
void
u3_lord_peek(u3_lord* god_u,
u3_noun gan,
u3_noun ful,
void* ptr_v,
u3_peek_cb fun_f)
u3_lord_peek_pico(u3_lord* god_u,
u3_pico* pic_u)
{
u3_writ* wit_u = _lord_writ_new(god_u);
wit_u->typ_e = u3_writ_peek;
wit_u->pek_u = c3_calloc(sizeof(*wit_u->pek_u));
wit_u->pek_u->ptr_v = ptr_v;
wit_u->pek_u->fun_f = fun_f;
wit_u->pek_u->ptr_v = pic_u->ptr_v;
wit_u->pek_u->fun_f = pic_u->fun_f;
wit_u->pek_u->now = u3_time_in_tv(&wit_u->tim_u);
wit_u->pek_u->gan = gan;
wit_u->pek_u->ful = ful;
wit_u->pek_u->gan = pic_u->gan;
// XX cache check
// construct the full scry path
//
switch ( pic_u->typ_e ) {
default: c3_assert(0);
_lord_writ_plan(god_u, wit_u);
}
case u3_pico_full: {
wit_u->pek_u->ful = pic_u->ful;
} break;
/* u3_lord_peek_mine(): read namespace, injecting ship (our).
*/
void
u3_lord_peek_mine(u3_lord* god_u,
u3_noun gan,
c3_m car_m,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f)
{
u3_writ* wit_u = _lord_writ_new(god_u);
wit_u->typ_e = u3_writ_peek;
wit_u->pek_u = c3_calloc(sizeof(*wit_u->pek_u));
wit_u->pek_u->ptr_v = ptr_v;
wit_u->pek_u->fun_f = fun_f;
wit_u->pek_u->now = u3_time_in_tv(&wit_u->tim_u);
wit_u->pek_u->gan = gan;
case u3_pico_mine: {
// XX cache
//
u3_pier* pir_u = god_u->cb_u.ptr_v; // XX do better
u3_noun our = u3dc("scot", 'p', u3i_chubs(2, pir_u->who_d));
wit_u->pek_u->ful = u3nt(pic_u->min_u.car_m, our, pic_u->min_u.pax);
} break;
{
// XX cache
//
u3_pier* pir_u = god_u->cb_u.ptr_v; // XX do better
u3_noun our = u3dc("scot", 'p', u3i_chubs(2, pir_u->who_d));
wit_u->pek_u->ful = u3nt(car_m, our, pax);
case u3_pico_last: {
// XX cache
//
u3_pier* pir_u = god_u->cb_u.ptr_v; // XX do better
u3_noun our = u3dc("scot", 'p', u3i_chubs(2, pir_u->who_d));
u3_noun cas = u3dc("scot", c3__da, u3k(wit_u->pek_u->now));
wit_u->pek_u->ful = u3nc(pic_u->las_u.car_m,
u3nq(our,
pic_u->las_u.des,
cas,
pic_u->las_u.pax));
} break;
}
// XX cache check
//
_lord_writ_plan(god_u, wit_u);
}
/* u3_lord_peek_last(): read namespace, injecting ship (our) and case (now).
*/
void
u3_lord_peek_last(u3_lord* god_u,
u3_noun gan,
c3_m car_m,
u3_atom des,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f)
{
u3_writ* wit_u = _lord_writ_new(god_u);
wit_u->typ_e = u3_writ_peek;
wit_u->pek_u = c3_calloc(sizeof(*wit_u->pek_u));
wit_u->pek_u->ptr_v = ptr_v;
wit_u->pek_u->fun_f = fun_f;
wit_u->pek_u->now = u3_time_in_tv(&wit_u->tim_u);
wit_u->pek_u->gan = gan;
{
// XX cache
//
u3_pier* pir_u = god_u->cb_u.ptr_v; // XX do better
u3_noun our = u3dc("scot", 'p', u3i_chubs(2, pir_u->who_d));
u3_noun cas = u3dc("scot", c3__da, u3k(wit_u->pek_u->now));
wit_u->pek_u->ful = u3nc(car_m, u3nq(our, des, cas, pax));
}
// NB, won't be cached, result shouldn't be
// XX cache check, unless last
//
_lord_writ_plan(god_u, wit_u);
}

View File

@ -26,6 +26,44 @@
#undef VERBOSE_PIER
/* _pier_peek_new(): add a new u3_pico to the peek queue
*/
static u3_pico*
_pier_peek_new(u3_pier* pir_u)
{
u3_pico* pic_u = c3_calloc(sizeof(*pic_u));
if (!pir_u->pec_u.ent_u) {
c3_assert( !pir_u->pec_u.ext_u );
pir_u->pec_u.ent_u = pir_u->pec_u.ext_u = pic_u;
}
else {
pir_u->pec_u.ent_u->nex_u = pic_u;
pir_u->pec_u.ent_u = pic_u;
}
return pic_u;
}
/* _pier_peek_next(): pop u3_pico off of peek queue
*/
static u3_pico*
_pier_peek_next(u3_pier* pir_u)
{
u3_pico* pic_u = pir_u->pec_u.ext_u;
if (pic_u) {
pir_u->pec_u.ext_u = pic_u->nex_u;
if (!pir_u->pec_u.ext_u) {
pir_u->pec_u.ent_u = 0;
}
pic_u->nex_u = 0;
}
return pic_u;
}
/* _pier_work_send(): send new events for processing
*/
static void
@ -60,13 +98,32 @@ _pier_work_send(u3_work* wok_u)
{
u3_ovum* egg_u;
u3_noun ovo;
u3_pico* pic_u;
while ( len_w-- && car_u && (egg_u = u3_auto_next(car_u, &ovo)) ) {
while ( len_w-- && car_u && (egg_u = u3_auto_next(car_u, &ovo)) ) {
u3_lord_work(god_u, egg_u, ovo);
// queue events depth first
//
car_u = egg_u->car_u;
// interleave scry requests
//
if ( len_w
&& (pic_u = _pier_peek_next(pir_u)) )
{
len_w--;
u3_lord_peek_pico(god_u, pic_u);
c3_free(pic_u);
}
}
// if there's room left in the batch, fill it up with remaining scries
//
while ( len_w-- && (pic_u = _pier_peek_next(pir_u)) )
{
u3_lord_peek_pico(god_u, pic_u);
c3_free(pic_u);
}
}
}
@ -353,6 +410,75 @@ u3_pier_spin(u3_pier* pir_u)
}
}
/* u3_pier_peek(): read namespace.
*/
void
u3_pier_peek(u3_pier* pir_u,
u3_noun gan,
u3_noun ful,
void* ptr_v,
u3_peek_cb fun_f)
{
u3_pico* pic_u = _pier_peek_new(pir_u);
pic_u->ptr_v = ptr_v;
pic_u->fun_f = fun_f;
pic_u->gan = gan;
//
pic_u->typ_e = u3_pico_full;
pic_u->ful = ful;
u3_pier_spin(pir_u);
}
/* u3_pier_peek_mine(): read namespace, injecting ship.
*/
void
u3_pier_peek_mine(u3_pier* pir_u,
u3_noun gan,
c3_m car_m,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f)
{
u3_pico* pic_u = _pier_peek_new(pir_u);
pic_u->ptr_v = ptr_v;
pic_u->fun_f = fun_f;
pic_u->gan = gan;
//
pic_u->typ_e = u3_pico_mine;
pic_u->min_u.car_m = car_m;
pic_u->min_u.pax = pax;
u3_pier_spin(pir_u);
}
/* u3_pier_peek_last(): read namespace, injecting ship and case.
*/
void
u3_pier_peek_last(u3_pier* pir_u,
u3_noun gan,
c3_m car_m,
u3_atom des,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f)
{
u3_pico* pic_u = _pier_peek_new(pir_u);
pic_u->ptr_v = ptr_v;
pic_u->fun_f = fun_f;
pic_u->gan = gan;
//
pic_u->typ_e = u3_pico_last;
pic_u->las_u.car_m = car_m;
pic_u->las_u.des = des;
pic_u->las_u.pax = pax;
u3_pier_spin(pir_u);
}
/* _pier_work_init(): begin processing new events
*/
static void
@ -1024,7 +1150,7 @@ _pier_on_lord_live(void* ptr_v)
// run the requested scry, jam to disk, then exit
//
u3l_log("pier: scry\n");
u3_lord_peek_last(god_u, u3_nul, u3k(car), u3k(dek), u3k(pax),
u3_pier_peek_last(pir_u, u3_nul, u3k(car), u3k(dek), u3k(pax),
pir_u, _pier_on_scry_done);
}
u3z(pex);