vere: cache lanes for stateless forwarding

While stateless forwarding doesn't need to touch disk, there's still
overhead in needing to communicate with the serf over IPC. By caching
lanes, we get to skip the IPC pipeline, and can respond to forwarding
requests synchronously.

We include timestamps alongside the entries in the cache, and consider
entries older than two minutes as stale.

The cache is capped at around ~100mb of memory use. Further commentary
is provided inline.

Previously, ~nus would drop 3 forward requests, for every one it
fulfilled. Now, it seems able to keep up with demand, only dropping
forwards shortly after boot, while the cache isn't primed yet.
This commit is contained in:
Fang 2020-08-29 17:17:45 +02:00
parent 024c16cd97
commit 5f77200d0d
No known key found for this signature in database
GPG Key ID: EB035760C1BBA972

View File

@ -45,6 +45,7 @@
c3_d saw_d; // successive scry failures
c3_o fit_o; // filtering active
c3_y ver_y; // protocol version
u3p(u3h_root) lax_p; // lane scry cache
c3_d vet_d; // version mismatches filtered
c3_d mut_d; // invalid mugs filtered
struct _u3_panc* pac_u; // packets pending forwards
@ -309,6 +310,43 @@ _ames_lane_from_sockaddr(struct sockaddr_in* add_u)
return lan_u;
}
/* _ames_lane_into_cache(): put las for who into cache, including timestamp
*/
void
_ames_lane_into_cache(u3p(u3h_root) lax_p, u3_noun who, u3_noun las) {
struct timeval tim_tv;
gettimeofday(&tim_tv, 0);
u3_noun now = u3_time_in_tv(&tim_tv);
u3_noun val = u3nc(las, now);
u3h_put(lax_p, who, val);
u3z(who);
}
/* _ames_lane_from_cache(): retrieve lane for who from cache, if any & fresh
*/
u3_weak
_ames_lane_from_cache(u3p(u3h_root) lax_p, u3_noun who) {
u3_weak lac = u3h_git(lax_p, who);
if (u3_none != lac) {
struct timeval tim_tv;
gettimeofday(&tim_tv, 0);
u3_noun now = u3_time_in_tv(&tim_tv);
u3_noun den = u3t(lac);
// consider entries older than 2 minutes stale, ignore them
//
if (120000 > u3_time_gap_ms(u3k(den), now)) {
lac = u3k(u3h(lac));
} else {
lac = u3_none;
}
}
u3z(who);
return lac;
}
/* _ames_serialize_packet(): u3_panc to atom, updating the origin lane if dop_o
** (retains pac_u)
*/
@ -627,7 +665,7 @@ static void
_ames_forward(u3_panc* pac_u, u3_noun las)
{
pac_u->sam_u->fow_d++;
if ( 0 == (pac_u->sam_u->fow_d % 10000) ) {
if ( 0 == (pac_u->sam_u->fow_d % 1000000) ) {
u3l_log("ames: forwarded %" PRIu64 " total\n", pac_u->sam_u->fow_d);
}
@ -670,6 +708,13 @@ _ames_lane_scry_cb(void* vod_p, u3_noun nun)
if (0 < pac_u->sam_u->saw_d) {
pac_u->sam_u->saw_d--;
}
// cache the scry result for later use
//
_ames_lane_into_cache(pac_u->sam_u->lax_p,
u3i_chubs(2, pac_u->bod_u.rec_d),
u3k(las));
// if there is no lane, drop the packet
//
if (u3_nul == las) {
@ -784,7 +829,19 @@ _ames_recv_cb(uv_udp_t* wax_u,
{
pas_o = c3n;
// if the queue is full, and we can't forward synchronously,
u3_weak lac;
// if the recipient is a galaxy, their lane is always &+~gax
//
if ( (rec_d[1] == 0) && (256 > rec_d[0]) ) {
lac = u3nc(c3y, (c3_y)rec_d[0]);
}
// otherwise, try to get the lane from cache
//
else {
lac = _ames_lane_from_cache(sam_u->lax_p, u3i_chubs(2, rec_d));
}
// if we don't know the lane, and the scry queue is full,
// just drop the packet
//
//TODO drop oldest item in forward queue in favor of this one.
@ -792,16 +849,21 @@ _ames_recv_cb(uv_udp_t* wax_u,
// so can't pluck these out of the event queue like it does in
// _ames_cap_queue. as such, blocked on u3_lord_peek_cancel or w/e.
//
if ( (1000 < sam_u->foq_d)
&& !(rec_d[1] == 0 && (256 > rec_d[0])) )
if ( (u3_none == lac) && (1000 < sam_u->foq_d) )
{
c3_free(con_y);
sam_u->fod_d++;
if ( 0 == (sam_u->fod_d % 100000) ) {
if ( 0 == (sam_u->fod_d % 10000) ) {
u3l_log("ames: dropped %" PRIu64 " forwards total\n", sam_u->fod_d);
}
}
// if we know there's no lane, drop the packet
//
else if (u3_nul == lac) {
c3_free(con_y);
u3z(lac);
}
// otherwise, proceed with forwarding
//
else {
@ -826,12 +888,12 @@ _ames_recv_cb(uv_udp_t* wax_u,
}
sam_u->pac_u = pac_u;
// if the recipient is a galaxy, their lane is always &+~gax
// if we already know the lane, just forward
//
if ( (rec_d[1] == 0) && (256 > rec_d[0]) ) {
_ames_forward(pac_u, u3nc(u3nc(c3y, (c3_y)rec_d[0]), u3_nul));
if (u3_none != lac) {
_ames_forward(pac_u, lac);
}
// otherwise, if there's space in the queue, scry the lane out of ames
// otherwise, there's space in the scry queue; scry the lane out of ames
//
else {
u3_noun pax = u3nq(u3i_string("peers"),
@ -1106,6 +1168,8 @@ _ames_exit_cb(uv_handle_t* had_u)
pac_u = nex_u;
}
u3h_free(sam_u->lax_p);
c3_free(sam_u);
}
@ -1131,6 +1195,7 @@ _ames_io_info(u3_auto* car_u)
u3l_log(" filtered (ver): %" PRIu64 "\n", sam_u->vet_d);
u3l_log(" filtered (mug): %" PRIu64 "\n", sam_u->mut_d);
u3l_log(" crashed: %" PRIu64 "\n", sam_u->fal_d);
u3l_log(" cached lanes: %u\n", u3to(u3h_root, sam_u->lax_p)->use_w);
}
/* u3_ames_io_init(): initialize ames I/O.
@ -1145,6 +1210,23 @@ u3_ames_io_init(u3_pier* pir_u)
sam_u->fit_o = c3n;
sam_u->foq_d = 0;
//NOTE some numbers on memory usage for the lane cache
//
// assuming we store:
// a (list lane) with 1 item, 1+8 + 1 + (6*2) = 22 words
// and a @da as timestamp, 5 words
// consed together, 6 words
// with worst-case (128-bit) @p keys, 8 words
// and an additional cell for the k-v pair, 6 words
// that makes for a per-entry memory use of 47 words => 188 bytes
//
// the 500k entries below would take about 94mb (in the worst case, but
// not accounting for hashtable overhead).
// we could afford more, but 500k entries is more than we'll likely use
// in the near future.
//
sam_u->lax_p = u3h_new_cache(500000);
c3_assert( !uv_udp_init(u3L, &sam_u->wax_u) );
sam_u->wax_u.data = sam_u;