mirror of
synced 2024-12-14 17:41:33 +03:00
Merge pull request #3174 from urbit/m/stateless-forward
ames.c: stateless forwarding
This commit is contained in:
@ -26,6 +26,7 @@
# define c3__at c3_s2('a','t')
# define c3__atom c3_s4('a','t','o','m')
# define c3__auth c3_s4('a','u','t','h')
# define c3__ax c3_s2('a','x')
# define c3__axe c3_s3('a','x','e')
# define c3__axil c3_s4('a','x','i','l')
# define c3__axis c3_s4('a','x','i','s')
@ -381,6 +381,36 @@
typedef void (*u3_peek_cb)(void*, u3_noun);
/* u3_pico_type: kinds of proto-peek
typedef enum {
u3_pico_full = 0,
u3_pico_mine = 1,
u3_pico_last = 2
} u3_pico_type;
/* u3_pico: proto-peek
typedef struct _u3_pico {
struct _u3_pico* nex_u; // next in queue
void* ptr_v; // context
u3_peek_cb fun_f; // callback
u3_noun gan; // leakset
u3_pico_type typ_e; // type-tagged
union { //
u3_noun ful; // full: /care/beam
struct { // mine:
c3_m car_m; // care
u3_noun pax; // /desk/case/path
} min_u; //
struct { // last:
c3_m car_m; // care
u3_atom des; // desk
u3_noun pax; // /path
} las_u;
} u3_pico;
/* u3_peek: namespace read request
typedef struct _u3_peek {
@ -603,6 +633,10 @@
u3_play* pay_u; // recompute
u3_work* wok_u; // work
struct {
u3_pico* ent_u;
u3_pico* ext_u;
} pec_u;
// XX remove
c3_s por_s; // UDP port
u3_save* sav_u; // autosave
@ -759,6 +793,16 @@
u3_ovum_free(u3_ovum *egg_u);
/* u3_pico_init(): initialize a scry request struct
/* u3_pico_free(): dispose a scry request struct
u3_pico_free(u3_pico* pic_u);
/* u3_mcut_char(): measure/cut character.
@ -958,35 +1002,10 @@
u3_lord_play(u3_lord* god_u, u3_info fon_u);
/* u3_lord_peek(): read namespace.
/* u3_lord_peek(): read namespace, injecting what's missing.
u3_lord_peek(u3_lord* god_u,
u3_noun gan,
u3_noun ful,
void* ptr_v,
u3_peek_cb fun_f);
/* u3_lord_peek_mine(): read namespace, injecting ship.
u3_lord_peek_mine(u3_lord* god_u,
u3_noun gan,
c3_m car_m,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f);
/* u3_lord_peek_last(): read namespace, injecting ship and case.
u3_lord_peek_last(u3_lord* god_u,
u3_noun gan,
c3_m car_m,
u3_atom des,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f);
u3_lord_peek(u3_lord* god_u, u3_pico* pic_u);
/** Filesystem (new api).
@ -1199,6 +1218,38 @@
u3_newt_mojo_stop(u3_mojo* moj_u, u3_moor_bail bal_f);
/** Pier scries.
/* u3_pier_peek(): read namespace.
u3_pier_peek(u3_pier* pir_u,
u3_noun gan,
u3_noun ful,
void* ptr_v,
u3_peek_cb fun_f);
/* u3_pier_peek_mine(): read namespace, injecting ship.
u3_pier_peek_mine(u3_pier* pir_u,
u3_noun gan,
c3_m car_m,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f);
/* u3_pier_peek_last(): read namespace, injecting ship and case.
u3_pier_peek_last(u3_pier* pir_u,
u3_noun gan,
c3_m car_m,
u3_atom des,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f);
/** Pier control.
/* u3_pier_exit(): trigger a gentle shutdown.
@ -29,22 +29,59 @@
/* u3_ames: ames networking.
typedef struct _u3_ames { // packet network state
u3_auto car_u; // driver
u3_auto car_u; // driver
u3_pier* pir_u; // pier
union { //
uv_udp_t wax_u; //
uv_handle_t had_u; //
uv_udp_t wax_u; //
uv_handle_t had_u; //
}; //
c3_d who_d[2]; // identity
c3_o fak_o; // fake keys
c3_s por_s; // public IPv4 port
c3_c* dns_c; // domain XX multiple/fallback
c3_d dop_d; // drop count
c3_d fal_d; // crash count
c3_w imp_w[256]; // imperial IPs
time_t imp_t[256]; // imperial IP timestamps
c3_o imp_o[256]; // imperial print status
c3_c* dns_c; // domain XX multiple/fallback
c3_d dop_d; // drop count
c3_d fal_d; // crash count
c3_w imp_w[256]; // imperial IPs
time_t imp_t[256]; // imperial IP timestamps
c3_o imp_o[256]; // imperial print status
c3_o see_o; // can scry
c3_o fit_o; // filtering active
c3_y ver_y; // protocol version
c3_d vet_d; // version mismatches filtered
c3_d mut_d; // invalid mugs filtered
struct _u3_panc* pac_u; // packets pending forwards
c3_d foq_d; // forward queue size
c3_d fow_d; // forwarded count
c3_d fod_d; // forwards dropped count
} u3_ames;
/* u3_head: ames packet header
typedef struct _u3_head {
c3_y ver_y; // protocol version
c3_l mug_l; // truncated mug hash of u3_body
c3_y sac_y; // sender class
c3_y rac_y; // receiver class
c3_o enc_o; // encrypted?
} u3_head;
/* u3_body: ames packet body
typedef struct _u3_body {
c3_d sen_d[2]; // sender
c3_d rec_d[2]; // receiver
c3_w con_w; // jam size
c3_y* con_y; // (jam [origin content])
} u3_body;
/* u3_panc: deconstructed incoming packet
typedef struct _u3_panc {
u3_ames* sam_u; // ames backpointer
struct _u3_panc* pre_u; // previous packet
struct _u3_panc* nex_u; // next packet
u3_lane ore_u; // origin lane
u3_head hed_u; // header
u3_body bod_u; // body
} u3_panc;
/* _ames_alloc(): libuv buffer allocator.
static void
@ -70,6 +107,34 @@ _ames_pact_free(u3_pact* pac_u)
/* _ames_panc_free(): remove references, lose refcounts and free struct
static void
_ames_panc_free(u3_panc* pac_u) {
if (0 != pac_u->nex_u) {
pac_u->nex_u->pre_u = pac_u->pre_u;
if (0 != pac_u->pre_u) {
pac_u->pre_u->nex_u = pac_u->nex_u;
} else {
c3_assert(pac_u == pac_u->sam_u->pac_u);
pac_u->sam_u->pac_u = pac_u->nex_u;
/* _ca_mug_body(): truncated mug hash of bytes
static c3_l
_ca_mug_body(c3_w len_w, c3_y* byt_y)
// mask off ((1 << 20) - 1)
return u3r_mug_bytes(byt_y, len_w) & 0xfffff;
/* _ames_send_cb(): send callback.
static void
@ -232,6 +297,108 @@ u3_ames_encode_lane(u3_lane lan) {
return u3ke_jam(u3nt(c3__ipv4, u3i_words(1, &lan.pip_w), lan.por_s));
/* _ames_lane_from_sockaddr(): sockaddr_in to lane struct
static u3_lane
_ames_lane_from_sockaddr(struct sockaddr_in* add_u)
u3_lane lan_u;
lan_u.por_s = ntohs(add_u->sin_port);
lan_u.pip_w = ntohl(add_u->sin_addr.s_addr);
return lan_u;
/* _ames_serialize_packet(): u3_panc to atom, updating the origin lane if dop_o
** (retains pac_u)
static u3_noun
_ames_serialize_packet(u3_panc* pac_u, c3_o dop_o)
c3_y sen_y = 2 << pac_u->hed_u.sac_y;
c3_y rec_y = 2 << pac_u->hed_u.rac_y;
c3_o nal_o = c3n;
// update the body's lane, if desired
if (c3y == dop_o) {
// unpack (jam [(unit lane) body])
u3_noun lon, bod;
//NOTE we checked for cue safety in _ames_recv_cb
u3_noun old = u3ke_cue(u3i_bytes(pac_u->bod_u.con_w, pac_u->bod_u.con_y));
u3x_cell(old, &lon, &bod);
u3k(lon); u3k(bod);
// only replace the lane if it was ~
//NOTE this sets an opaque lane even in the "sender is galaxy" case,
// but that doesn't matter: ames.hoon ignores origin in that case,
// always using the appropriate galaxy lane instead.
if (u3_nul == lon) {
lon = u3nt(u3_nul, c3n, u3_ames_encode_lane(pac_u->ore_u));
nal_o = c3y;
u3_noun jam = u3ke_jam(u3nc(lon, bod));
pac_u->bod_u.con_w = u3r_met(3, jam);
pac_u->bod_u.con_y = c3_malloc(pac_u->bod_u.con_w);
u3r_bytes(0, pac_u->bod_u.con_w, pac_u->bod_u.con_y, jam);
else {
u3z(lon); u3z(bod);
// serialize the packet
u3_noun pac;
// start with the body
u3_body* bod_u = &pac_u->bod_u;
c3_y* pac_y = c3_malloc(4 + sen_y + rec_y + bod_u->con_w);
u3_atom sen = u3i_chubs(2, bod_u->sen_d);
u3_atom rec = u3i_chubs(2, bod_u->rec_d);
u3r_bytes(0, sen_y, pac_y + 4, sen);
u3r_bytes(0, rec_y, pac_y + 4 + sen_y, rec);
u3z(sen); u3z(rec);
memcpy(pac_y + 4 + sen_y + rec_y, bod_u->con_y, bod_u->con_w);
// if we updated the origin lane, we need to update the mug too
if (c3y == nal_o) {
pac_u->hed_u.mug_l = _ca_mug_body(sen_y + rec_y + bod_u->con_w,
pac_y + 4);
// now we can serialize the head
u3_head* hed_u = &pac_u->hed_u;
c3_w hed_w = hed_u->ver_y
| (hed_u->mug_l << 3)
| (hed_u->sac_y << 23)
| (hed_u->rac_y << 25)
| (hed_u->enc_o << 27);
// XX assumes little-endian
memcpy(pac_y, &hed_w, 4);
pac = u3i_bytes(4 + sen_y + rec_y + bod_u->con_w, pac_y);
return pac;
/* _ames_czar(): galaxy address resolution.
static void
@ -435,6 +602,77 @@ _ames_hear_bail(u3_ovum* egg_u, u3_noun lud)
/* _ames_put_packet(): add packet to queue, drop old packets on pressure
static void
_ames_put_packet(u3_ames* sam_u,
u3_noun msg,
u3_lane lan_u)
u3_noun wir = u3nc(c3__ames, u3_nul);
u3_noun cad = u3nt(c3__hear, u3nc(c3n, u3_ames_encode_lane(lan_u)), msg);
u3_ovum_init(0, c3__a, wir, cad)),
0, 0, _ames_hear_bail);
/* _ames_forward(): forward pac_u onto the (list lane) las, then free pac_u
static void
_ames_forward(u3_panc* pac_u, u3_noun las)
if ( 0 == (pac_u->sam_u->fow_d % 10000) ) {
u3l_log("ames: forwarded %" PRIu64 " total\n", pac_u->sam_u->fow_d);
u3_noun los = las;
u3_noun pac = _ames_serialize_packet(pac_u, c3y);
while (u3_nul != las) {
_ames_ef_send(pac_u->sam_u, u3k(u3h(las)), u3k(pac));
las = u3t(las);
u3z(los); u3z(pac);
/* _ames_lane_scry_cb(): learn lane to forward packet on
static void
_ames_lane_scry_cb(void* vod_p, u3_noun nun)
u3_panc* pac_u = vod_p;
u3_weak las = u3r_at(7, nun);
// if scry fails, remember we can't scry, and just inject the packet
if (u3_none == las) {
u3l_log("ames: giving up scry\n");
pac_u->sam_u->see_o = c3n;
_ames_serialize_packet(pac_u, c3n),
// if there is a lane, forward the packet on it
else if (u3_nul != las) {
_ames_forward(pac_u, u3k(las));
// if there is no lane, drop the packet
/* _ames_recv_cb(): receive callback.
static void
@ -445,39 +683,161 @@ _ames_recv_cb(uv_udp_t* wax_u,
unsigned flg_i)
u3_ames* sam_u = wax_u->data;
c3_o pas_o = c3y;
c3_y* byt_y = (c3_y*)buf_u->base;
c3_y* bod_y = byt_y + 4;
u3_head hed_u;
// data present, and protocol version in header matches 0
// ensure a sane message size
// XX inflexible, scry version out of ames
if (4 >= nrd_i) {
pas_o = c3n;
// unpack the packet header
if ( (0 < nrd_i)
&& (0 == (0x7 & *((c3_w*)buf_u->base))) )
else {
c3_w hed_w = (byt_y[0] << 0)
| (byt_y[1] << 8)
| (byt_y[2] << 16)
| (byt_y[3] << 24);
hed_u.ver_y = hed_w & 0x7;
hed_u.mug_l = (hed_w >> 3) & 0xfffff; //NOTE ((1 << 20) - 1)
hed_u.sac_y = (hed_w >> 23) & 0x3;
hed_u.rac_y = (hed_w >> 25) & 0x3;
hed_u.enc_o = (hed_w >> 27) & 0x1;
// ensure the protocol version matches ours
if ( c3y == pas_o
&& (c3y == sam_u->fit_o)
&& (sam_u->ver_y != hed_u.ver_y) )
u3_noun wir = u3nc(c3__ames, u3_nul);
u3_noun cad;
pas_o = c3n;
u3_noun msg = u3i_bytes((c3_w)nrd_i, (c3_y*)buf_u->base);
u3_noun lan;
struct sockaddr_in* add_u = (struct sockaddr_in *)adr_u;
u3_lane lan_u;
lan_u.por_s = ntohs(add_u->sin_port);
lan_u.pip_w = ntohl(add_u->sin_addr.s_addr);
lan = u3_ames_encode_lane(lan_u);
cad = u3nt(c3__hear, u3nc(c3n, lan), msg);
if ( 0 == (sam_u->vet_d % 100) ) {
u3l_log("ames: %" PRIu64 " dropped for version mismatch\n", sam_u->vet_d);
u3_ovum_init(0, c3__a, wir, cad)),
0, 0, _ames_hear_bail);
// ensure the mug is valid
if ( c3y == pas_o
&& (hed_u.mug_l != _ca_mug_body(nrd_i - 4, bod_y)) )
pas_o = c3n;
if ( 0 == (sam_u->mut_d % 100) ) {
u3l_log("ames: %" PRIu64 " dropped for invalid mug\n", sam_u->mut_d);
// unpack the body
c3_y sen_y = 2 << hed_u.sac_y;
c3_y rec_y = 2 << hed_u.rac_y;
c3_d sen_d[2];
c3_d rec_d[2];
c3_w con_w = nrd_i - 4 - sen_y - rec_y;
c3_y* con_y = NULL;
if (c3y == pas_o) {
u3_noun sen = u3i_bytes(sen_y, bod_y);
u3_noun rec = u3i_bytes(rec_y, bod_y + sen_y);
u3r_chubs(0, 2, rec_d, rec);
u3r_chubs(0, 2, sen_d, sen);
u3z(sen); u3z(rec);
con_y = c3_malloc(con_w);
memcpy(con_y, bod_y + sen_y + rec_y, con_w);
// ensure the content is cue-able
u3_noun pro = u3m_soft(0, u3ke_cue, u3i_bytes(con_w, con_y));
pas_o = (u3_blip == u3h(pro)) ? c3y : c3n;
// if we can scry,
// and we are not the recipient,
// we might want to forward statelessly
if ( c3y == pas_o
&& c3y == sam_u->see_o
&& ( (rec_d[0] != sam_u->pir_u->who_d[0])
|| (rec_d[1] != sam_u->pir_u->who_d[1]) ) )
pas_o = c3n;
// if the queue is full, and we can't forward synchronously,
// just drop the packet
//TODO drop oldest item in forward queue in favor of this one.
// ames.c doesn't/shouldn't know about the shape of scry events,
// so can't pluck these out of the event queue like it does in
// _ames_cap_queue. as such, blocked on u3_lord_peek_cancel or w/e.
if ( (1000 < sam_u->foq_d)
&& !(rec_d[1] == 0 && (256 > rec_d[0])) )
if ( 0 == (sam_u->fod_d % 100000) ) {
u3l_log("ames: dropped %" PRIu64 " forwards total\n", sam_u->fod_d);
// otherwise, proceed with forwarding
else {
// store the packet details for later processing
u3_panc* pac_u = c3_calloc(sizeof(*pac_u));
pac_u->sam_u = sam_u;
pac_u->hed_u = hed_u;
pac_u->bod_u.sen_d[0] = sen_d[0];
pac_u->bod_u.sen_d[1] = sen_d[1];
pac_u->bod_u.rec_d[0] = rec_d[0];
pac_u->bod_u.rec_d[1] = rec_d[1];
pac_u->bod_u.con_w = con_w;
pac_u->bod_u.con_y = con_y;
pac_u->ore_u = _ames_lane_from_sockaddr((struct sockaddr_in *)adr_u);
if (0 != sam_u->pac_u) {
pac_u->nex_u = sam_u->pac_u;
sam_u->pac_u->pre_u = pac_u;
sam_u->pac_u = pac_u;
// if the recipient is a galaxy, their lane is always &+~gax
if ( (rec_d[1] == 0) && (256 > rec_d[0]) ) {
_ames_forward(pac_u, u3nc(u3nc(c3y, (c3_y)rec_d[0]), u3_nul));
// otherwise, if there's space in the queue, scry the lane out of ames
else {
u3_noun pax = u3nq(u3i_string("peers"),
u3dc("scot", 'p', u3i_chubs(2, rec_d)),
u3_pier_peek_last(sam_u->pir_u, u3_nul, c3__ax,
u3_nul, pax, pac_u, _ames_lane_scry_cb);
// if we passed the filter, inject the packet
if (c3y == pas_o) {
u3_lane ore_u = _ames_lane_from_sockaddr((struct sockaddr_in *)adr_u);
u3_noun msg = u3i_bytes((c3_w)nrd_i, (c3_y*)buf_u->base);
_ames_put_packet(sam_u, msg, ore_u);
@ -488,13 +848,13 @@ _ames_recv_cb(uv_udp_t* wax_u,
static void
_ames_io_start(u3_ames* sam_u)
c3_s por_s = sam_u->por_s;
u3_noun who = u3i_chubs(2, sam_u->who_d);
c3_s por_s = sam_u->pir_u->por_s;
u3_noun who = u3i_chubs(2, sam_u->pir_u->who_d);
u3_noun rac = u3do("clan:title", u3k(who));
c3_i ret_i;
if ( c3__czar == rac ) {
c3_y num_y = (c3_y)sam_u->who_d[0];
c3_y num_y = (c3_y)sam_u->pir_u->who_d[0];
c3_s zar_s = _ames_czar_port(num_y);
if ( 0 == por_s ) {
@ -537,14 +897,14 @@ _ames_io_start(u3_ames* sam_u)
uv_udp_getsockname(&sam_u->wax_u, (struct sockaddr *)&add_u, &add_i);
sam_u->por_s = ntohs(add_u.sin_port);
sam_u->pir_u->por_s = ntohs(add_u.sin_port);
if ( c3y == u3_Host.ops_u.net ) {
u3l_log("ames: live on %d\n", sam_u->por_s);
u3l_log("ames: live on %d\n", sam_u->pir_u->por_s);
else {
u3l_log("ames: live on %d (localhost only)\n", sam_u->por_s);
u3l_log("ames: live on %d (localhost only)\n", sam_u->pir_u->por_s);
uv_udp_recv_start(&sam_u->wax_u, _ames_alloc, _ames_recv_cb);
@ -574,7 +934,7 @@ _ames_ef_turf(u3_ames* sam_u, u3_noun tuf)
else if ( (c3n == sam_u->fak_o) && (0 == sam_u->dns_c) ) {
else if ( (c3n == sam_u->pir_u->fak_o) && (0 == sam_u->dns_c) ) {
u3l_log("ames: turf: no domains\n");
@ -585,6 +945,32 @@ _ames_ef_turf(u3_ames* sam_u, u3_noun tuf)
/* _ames_prot_scry_cb(): receive protocol version
static void
_ames_prot_scry_cb(void* vod_p, u3_noun nun)
u3_ames* sam_u = vod_p;
u3_weak ver = u3r_at(7, nun);
if (u3_none == ver) {
// assume protocol version 0
sam_u->ver_y = 0;
else if ( (c3n == u3a_is_cat(ver))
|| (7 < ver) ) {
u3m_p("ames: strange protocol", nun);
sam_u->ver_y = 0;
else {
sam_u->ver_y = ver;
sam_u->fit_o = c3y;
/* _ames_io_talk(): start receiving ames traffic.
static void
@ -601,6 +987,12 @@ _ames_io_talk(u3_auto* car_u)
u3_auto_plan(car_u, u3_ovum_init(0, c3__a, wir, cad));
// scry the protocol version out of arvo
u3_pier_peek_last(car_u->pir_u, u3_nul, c3__ax, u3_nul,
u3nt(u3i_string("protocol"), u3i_string("version"), u3_nul),
sam_u, _ames_prot_scry_cb);
/* _ames_kick_newt(): apply packet network outputs.
@ -693,6 +1085,14 @@ static void
_ames_exit_cb(uv_handle_t* had_u)
u3_ames* sam_u = had_u->data;
u3_panc* pac_u = sam_u->pac_u;
while (0 != pac_u) {
u3_panc* nex_u = pac_u->nex_u;
pac_u = nex_u;
@ -711,8 +1111,13 @@ static void
_ames_io_info(u3_auto* car_u)
u3_ames* sam_u = (u3_ames*)car_u;
u3l_log(" dropped: %" PRIu64 "\n", sam_u->dop_d);
u3l_log(" crashed: %" PRIu64 "\n", sam_u->fal_d);
u3l_log(" dropped: %" PRIu64 "\n", sam_u->dop_d);
u3l_log(" forwards dropped: %" PRIu64 "\n", sam_u->fod_d);
u3l_log(" forwards pending: %" PRIu64 "\n", sam_u->foq_d);
u3l_log(" forwarded: %" PRIu64 "\n", sam_u->fow_d);
u3l_log(" filtered (ver): %" PRIu64 "\n", sam_u->vet_d);
u3l_log(" filtered (mug): %" PRIu64 "\n", sam_u->mut_d);
u3l_log(" crashed: %" PRIu64 "\n", sam_u->fal_d);
/* u3_ames_io_init(): initialize ames I/O.
@ -721,22 +1126,21 @@ u3_auto*
u3_ames_io_init(u3_pier* pir_u)
u3_ames* sam_u = c3_calloc(sizeof(*sam_u));
sam_u->who_d[0] = pir_u->who_d[0];
sam_u->who_d[1] = pir_u->who_d[1];
sam_u->por_s = pir_u->por_s;
sam_u->fak_o = pir_u->fak_o;
sam_u->pir_u = pir_u;
sam_u->dop_d = 0;
sam_u->see_o = c3y;
sam_u->fit_o = c3n;
sam_u->foq_d = 0;
c3_assert( !uv_udp_init(u3L, &sam_u->wax_u) );
sam_u->wax_u.data = sam_u;
// Disable networking for fake ships
if ( c3y == sam_u->fak_o ) {
if ( c3y == sam_u->pir_u->fak_o ) {
u3_Host.ops_u.net = c3n;
u3_auto* car_u = &sam_u->car_u;
car_u->nam_m = c3__ames;
car_u->liv_o = c3n;
@ -349,12 +349,12 @@ _lord_plea_peek_bail(u3_lord* god_u, u3_peek* pek_u, u3_noun dud)
u3_pier_punt_goof("peek", dud);
pek_u->fun_f(pek_u->ptr_v, u3_nul);
/* _lord_plea_peek_done(): hear serf %peek %done
@ -803,92 +803,52 @@ _lord_writ_plan(u3_lord* god_u, u3_writ* wit_u)
_lord_writ_send(god_u, wit_u);
/* u3_lord_peek(): read namespace.
/* u3_lord_peek(): read namespace, injecting what's missing.
u3_lord_peek(u3_lord* god_u,
u3_noun gan,
u3_noun ful,
void* ptr_v,
u3_peek_cb fun_f)
u3_lord_peek(u3_lord* god_u, u3_pico* pic_u)
u3_writ* wit_u = _lord_writ_new(god_u);
wit_u->typ_e = u3_writ_peek;
wit_u->pek_u = c3_calloc(sizeof(*wit_u->pek_u));
wit_u->pek_u->ptr_v = ptr_v;
wit_u->pek_u->fun_f = fun_f;
wit_u->pek_u->ptr_v = pic_u->ptr_v;
wit_u->pek_u->fun_f = pic_u->fun_f;
wit_u->pek_u->now = u3_time_in_tv(&wit_u->tim_u);
wit_u->pek_u->gan = gan;
wit_u->pek_u->ful = ful;
wit_u->pek_u->gan = u3k(pic_u->gan);
// XX cache check
// construct the full scry path
switch ( pic_u->typ_e ) {
default: c3_assert(0);
_lord_writ_plan(god_u, wit_u);
case u3_pico_full: {
wit_u->pek_u->ful = u3k(pic_u->ful);
} break;
/* u3_lord_peek_mine(): read namespace, injecting ship (our).
u3_lord_peek_mine(u3_lord* god_u,
u3_noun gan,
c3_m car_m,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f)
u3_writ* wit_u = _lord_writ_new(god_u);
wit_u->typ_e = u3_writ_peek;
wit_u->pek_u = c3_calloc(sizeof(*wit_u->pek_u));
wit_u->pek_u->ptr_v = ptr_v;
wit_u->pek_u->fun_f = fun_f;
wit_u->pek_u->now = u3_time_in_tv(&wit_u->tim_u);
wit_u->pek_u->gan = gan;
case u3_pico_mine: {
// XX cache
u3_pier* pir_u = god_u->cb_u.ptr_v; // XX do better
u3_noun our = u3dc("scot", 'p', u3i_chubs(2, pir_u->who_d));
wit_u->pek_u->ful = u3nt(pic_u->min_u.car_m, our, u3k(pic_u->min_u.pax));
} break;
// XX cache
u3_pier* pir_u = god_u->cb_u.ptr_v; // XX do better
u3_noun our = u3dc("scot", 'p', u3i_chubs(2, pir_u->who_d));
wit_u->pek_u->ful = u3nt(car_m, our, pax);
case u3_pico_last: {
// XX cache
u3_pier* pir_u = god_u->cb_u.ptr_v; // XX do better
u3_noun our = u3dc("scot", 'p', u3i_chubs(2, pir_u->who_d));
u3_noun cas = u3dc("scot", c3__da, u3k(wit_u->pek_u->now));
wit_u->pek_u->ful = u3nc(pic_u->las_u.car_m,
} break;
// XX cache check
_lord_writ_plan(god_u, wit_u);
/* u3_lord_peek_last(): read namespace, injecting ship (our) and case (now).
u3_lord_peek_last(u3_lord* god_u,
u3_noun gan,
c3_m car_m,
u3_atom des,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f)
u3_writ* wit_u = _lord_writ_new(god_u);
wit_u->typ_e = u3_writ_peek;
wit_u->pek_u = c3_calloc(sizeof(*wit_u->pek_u));
wit_u->pek_u->ptr_v = ptr_v;
wit_u->pek_u->fun_f = fun_f;
wit_u->pek_u->now = u3_time_in_tv(&wit_u->tim_u);
wit_u->pek_u->gan = gan;
// XX cache
u3_pier* pir_u = god_u->cb_u.ptr_v; // XX do better
u3_noun our = u3dc("scot", 'p', u3i_chubs(2, pir_u->who_d));
u3_noun cas = u3dc("scot", c3__da, u3k(wit_u->pek_u->now));
wit_u->pek_u->ful = u3nc(car_m, u3nq(our, des, cas, pax));
// NB, won't be cached, result shouldn't be
// XX cache check, unless last
_lord_writ_plan(god_u, wit_u);
@ -26,6 +26,42 @@
/* _pier_peek_plan(): add a u3_pico to the peek queue
static void
_pier_peek_plan(u3_pier* pir_u, u3_pico* pic_u)
if (!pir_u->pec_u.ent_u) {
c3_assert( !pir_u->pec_u.ext_u );
pir_u->pec_u.ent_u = pir_u->pec_u.ext_u = pic_u;
else {
pir_u->pec_u.ent_u->nex_u = pic_u;
pir_u->pec_u.ent_u = pic_u;
/* _pier_peek_next(): pop u3_pico off of peek queue
static u3_pico*
_pier_peek_next(u3_pier* pir_u)
u3_pico* pic_u = pir_u->pec_u.ext_u;
if (pic_u) {
pir_u->pec_u.ext_u = pic_u->nex_u;
if (!pir_u->pec_u.ext_u) {
pir_u->pec_u.ent_u = 0;
pic_u->nex_u = 0;
return pic_u;
/* _pier_work_send(): send new events for processing
static void
@ -60,13 +96,32 @@ _pier_work_send(u3_work* wok_u)
u3_ovum* egg_u;
u3_noun ovo;
u3_pico* pic_u;
while ( len_w-- && car_u && (egg_u = u3_auto_next(car_u, &ovo)) ) {
while ( len_w && car_u && (egg_u = u3_auto_next(car_u, &ovo)) ) {
u3_lord_work(god_u, egg_u, ovo);
// queue events depth first
car_u = egg_u->car_u;
// interleave scry requests
if ( len_w && (pic_u = _pier_peek_next(pir_u)) )
u3_lord_peek(god_u, pic_u);
// if there's room left in the batch, fill it up with remaining scries
while ( len_w-- && (pic_u = _pier_peek_next(pir_u)) )
u3_lord_peek(god_u, pic_u);
@ -353,6 +408,75 @@ u3_pier_spin(u3_pier* pir_u)
/* u3_pier_peek(): read namespace.
u3_pier_peek(u3_pier* pir_u,
u3_noun gan,
u3_noun ful,
void* ptr_v,
u3_peek_cb fun_f)
u3_pico* pic_u = u3_pico_init();
pic_u->ptr_v = ptr_v;
pic_u->fun_f = fun_f;
pic_u->gan = gan;
pic_u->typ_e = u3_pico_full;
pic_u->ful = ful;
_pier_peek_plan(pir_u, pic_u);
/* u3_pier_peek_mine(): read namespace, injecting ship.
u3_pier_peek_mine(u3_pier* pir_u,
u3_noun gan,
c3_m car_m,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f)
u3_pico* pic_u = u3_pico_init();
pic_u->ptr_v = ptr_v;
pic_u->fun_f = fun_f;
pic_u->gan = gan;
pic_u->typ_e = u3_pico_mine;
pic_u->min_u.car_m = car_m;
pic_u->min_u.pax = pax;
_pier_peek_plan(pir_u, pic_u);
/* u3_pier_peek_last(): read namespace, injecting ship and case.
u3_pier_peek_last(u3_pier* pir_u,
u3_noun gan,
c3_m car_m,
u3_atom des,
u3_noun pax,
void* ptr_v,
u3_peek_cb fun_f)
u3_pico* pic_u = u3_pico_init();
pic_u->ptr_v = ptr_v;
pic_u->fun_f = fun_f;
pic_u->gan = gan;
pic_u->typ_e = u3_pico_last;
pic_u->las_u.car_m = car_m;
pic_u->las_u.des = des;
pic_u->las_u.pax = pax;
_pier_peek_plan(pir_u, pic_u);
/* _pier_work_init(): begin processing new events
static void
@ -1024,7 +1148,7 @@ _pier_on_lord_live(void* ptr_v)
// run the requested scry, jam to disk, then exit
u3l_log("pier: scry\n");
u3_lord_peek_last(god_u, u3_nul, u3k(car), u3k(dek), u3k(pax),
u3_pier_peek_last(pir_u, u3_nul, u3k(car), u3k(dek), u3k(pax),
pir_u, _pier_on_scry_done);
@ -168,6 +168,42 @@ u3_ovum_free(u3_ovum *egg_u)
/* u3_pico_init(): initialize a scry request struct
u3_pico* pic_u = c3_calloc(sizeof(*pic_u));
return pic_u;
/* u3_pico_free(): dispose a scry request struct
u3_pico_free(u3_pico* pic_u)
switch ( pic_u->typ_e ) {
default: c3_assert(0);
case u3_pico_full: {
} break;
case u3_pico_mine: {
} break;
case u3_pico_last: {
} break;
/* u3_mcut_char(): measure/cut character.
Reference in New Issue
Block a user