From 81ff98c5d41c965bc8b02ae05105c88fb4a24b99 Mon Sep 17 00:00:00 2001 From: Joe Bryan Date: Thu, 25 Jun 2020 11:35:58 -0700 Subject: [PATCH] vere/king/serf: adds and enables event timeouts --- pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs | 8 ++--- pkg/urbit/include/noun/manage.h | 2 +- pkg/urbit/include/vere/serf.h | 2 +- pkg/urbit/include/vere/vere.h | 6 ++-- pkg/urbit/noun/manage.c | 38 +++++++++++--------- pkg/urbit/vere/auto.c | 4 +-- pkg/urbit/vere/lord.c | 5 +-- pkg/urbit/vere/ward.c | 4 +-- pkg/urbit/worker/serf.c | 38 +++++++++++--------- 9 files changed, 59 insertions(+), 48 deletions(-) diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs index d7ae9e4751..7321c13005 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Serf/IPC.hs @@ -25,7 +25,7 @@ == == [%peek now=date lyc=gang pat=path] [%play eve=@ lit=(list ?((pair date ovum) *))] - [%work job=(pair date ovum)] + [%work mil=@ job=(pair date ovum)] == :: +plea: from serf to king :: @@ -122,7 +122,7 @@ data Writ = WLive Live | WPeek Wen Gang Path | WPlay EventId [Noun] - | WWork Wen Ev + | WWork Atom Wen Ev deriving (Show) data Plea @@ -493,7 +493,7 @@ swim serf = do Nothing -> do pure (SerfState eve mug) Just (wen, evn) -> do - io (sendWrit serf (WWork wen evn)) + io (sendWrit serf (WWork 0 wen evn)) io (recvWork serf) >>= \case WBail goofs -> do throwIO (BailDuringReplay eve goofs) @@ -636,7 +636,7 @@ processWork serf maxSize q onResp spin = do now <- Time.now let cb = onResp now evErr atomically $ modifyTVar' vInFlight (:|> (ev, cb)) - sendWrit serf (WWork now ev) + sendWrit serf (WWork 0 now ev) loop vInFlight vDone {-| diff --git a/pkg/urbit/include/noun/manage.h b/pkg/urbit/include/noun/manage.h index 7da08e6292..3f8203b7c0 100644 --- a/pkg/urbit/include/noun/manage.h +++ b/pkg/urbit/include/noun/manage.h @@ -66,7 +66,7 @@ ** Produces [%$ result] or [%error (list tank)]. */ u3_noun - u3m_soft(c3_w sec_w, u3_funk fun_f, u3_noun arg); + u3m_soft(c3_w mil_w, u3_funk fun_f, u3_noun arg); /* u3m_soft_slam: top-level call. */ diff --git a/pkg/urbit/include/vere/serf.h b/pkg/urbit/include/vere/serf.h index 9fb6ef0c60..e9fa790ea3 100644 --- a/pkg/urbit/include/vere/serf.h +++ b/pkg/urbit/include/vere/serf.h @@ -52,7 +52,7 @@ /* u3_serf_work(): apply event, producing effects. */ u3_noun - u3_serf_work(u3_serf* sef_u, u3_noun job); + u3_serf_work(u3_serf* sef_u, c3_w mil_w, u3_noun job); /* u3_serf_post(): update serf state post-writ. */ diff --git a/pkg/urbit/include/vere/vere.h b/pkg/urbit/include/vere/vere.h index 8b6eedb4b6..1dc30f981a 100644 --- a/pkg/urbit/include/vere/vere.h +++ b/pkg/urbit/include/vere/vere.h @@ -331,7 +331,7 @@ */ typedef struct _u3_ovum { void* vod_p; // context - c3_l msc_l; // ms to timeout + c3_w mil_w; // timeout ms u3_noun tar; // target (in arvo) u3_noun wir; // wire u3_noun cad; // card @@ -744,7 +744,7 @@ /* u3_ovum_init: initialize an unlinked potential event */ u3_ovum* - u3_ovum_init(c3_l msc_l, + u3_ovum_init(c3_w mil_w, u3_noun tar, u3_noun wir, u3_noun cad); @@ -820,7 +820,7 @@ */ u3_ovum* u3_auto_plan(u3_auto* car_u, - c3_l msc_l, + c3_w mil_w, u3_noun tar, u3_noun wir, u3_noun cad); diff --git a/pkg/urbit/noun/manage.c b/pkg/urbit/noun/manage.c index bf87bd96a3..df9042028a 100644 --- a/pkg/urbit/noun/manage.c +++ b/pkg/urbit/noun/manage.c @@ -68,10 +68,10 @@ /* u3m_soft_top(): top-level safety wrapper. */ u3_noun - u3m_soft_top(c3_w sec_w, // timer seconds + u3m_soft_top(c3_w mil_w, // timer ms c3_w pad_w, // base memory pad u3_funk fun_f, - u3_noun arg); + u3_noun arg); static sigjmp_buf u3_Signal; @@ -323,10 +323,10 @@ _cm_signal_recover(c3_l sig_l, u3_noun arg) } } -/* _cm_signal_deep(): start deep processing; set timer for sec_w or 0. +/* _cm_signal_deep(): start deep processing; set timer for [mil_w] or 0. */ static void -_cm_signal_deep(c3_w sec_w) +_cm_signal_deep(c3_w mil_w) { // disable outer system signal handling // @@ -347,15 +347,19 @@ _cm_signal_deep(c3_w sec_w) u3H->rod_u.bug.mer = u3i_string("emergency buffer"); } - if ( sec_w ) { + if ( mil_w ) { struct itimerval itm_u; timerclear(&itm_u.it_interval); - itm_u.it_value.tv_sec = sec_w; - itm_u.it_value.tv_usec = 0; + itm_u.it_value.tv_sec = (mil_w / 1000); + itm_u.it_value.tv_usec = 1000 * (mil_w % 1000); - setitimer(ITIMER_VIRTUAL, &itm_u, 0); - signal(SIGVTALRM, _cm_signal_handle_alrm); + if ( setitimer(ITIMER_VIRTUAL, &itm_u, 0) ) { + u3l_log("loom: set timer failed %s\r\n", strerror(errno)); + } + else { + signal(SIGVTALRM, _cm_signal_handle_alrm); + } } u3t_boot(); @@ -379,7 +383,9 @@ _cm_signal_done() timerclear(&itm_u.it_interval); timerclear(&itm_u.it_value); - setitimer(ITIMER_VIRTUAL, &itm_u, 0); + if ( setitimer(ITIMER_VIRTUAL, &itm_u, 0) ) { + u3l_log("loom: clear timer failed %s\r\n", strerror(errno)); + } } // restore outer system signal handling @@ -927,17 +933,17 @@ u3m_water(c3_w* low_w, c3_w* hig_w) /* u3m_soft_top(): top-level safety wrapper. */ u3_noun -u3m_soft_top(c3_w sec_w, // timer seconds +u3m_soft_top(c3_w mil_w, // timer ms c3_w pad_w, // base memory pad u3_funk fun_f, - u3_noun arg) + u3_noun arg) { u3_noun why, pro; c3_l sig_l; /* Enter internal signal regime. */ - _cm_signal_deep(0); + _cm_signal_deep(mil_w); if ( 0 != (sig_l = sigsetjmp(u3_Signal, 1)) ) { // reinitialize trace state @@ -1210,13 +1216,13 @@ u3m_grab(u3_noun som, ...) // terminate with u3_none ** Produces [0 product] or [%error (list tank)], top last. */ u3_noun -u3m_soft(c3_w sec_w, +u3m_soft(c3_w mil_w, u3_funk fun_f, - u3_noun arg) + u3_noun arg) { u3_noun why; - why = u3m_soft_top(sec_w, (1 << 20), fun_f, arg); // 2MB pad + why = u3m_soft_top(mil_w, (1 << 20), fun_f, arg); // 2MB pad if ( 0 == u3h(why) ) { return why; diff --git a/pkg/urbit/vere/auto.c b/pkg/urbit/vere/auto.c index 137ca01e38..d163d576fc 100644 --- a/pkg/urbit/vere/auto.c +++ b/pkg/urbit/vere/auto.c @@ -23,12 +23,12 @@ */ u3_ovum* u3_auto_plan(u3_auto* car_u, - c3_l msc_l, + c3_w mil_w, u3_noun tar, u3_noun wir, u3_noun cad) { - u3_ovum *egg_u = u3_ovum_init(msc_l, tar, wir, cad); + u3_ovum *egg_u = u3_ovum_init(mil_w, tar, wir, cad); egg_u->car_u = car_u; diff --git a/pkg/urbit/vere/lord.c b/pkg/urbit/vere/lord.c index bce742d3d6..268bb1bf12 100644 --- a/pkg/urbit/vere/lord.c +++ b/pkg/urbit/vere/lord.c @@ -31,7 +31,7 @@ == == [%peek now=date lyc=gang pat=path] [%play eve=@ lit=(list ?((pair date ovum) *))] - [%work job=(pair date ovum)] + [%work mil=@ job=(pair date ovum)] == :: +plea: from serf to king :: @@ -666,7 +666,8 @@ _lord_writ_jam(u3_lord* god_u, u3_writ* wit_u) default: c3_assert(0); case u3_writ_work: { - msg = u3nc(c3__work, u3k(wit_u->wok_u.job)); + u3_noun mil = u3i_words(1, &wit_u->wok_u.egg_u->mil_w); + msg = u3nt(c3__work, mil, u3k(wit_u->wok_u.job)); } break; case u3_writ_peek: { diff --git a/pkg/urbit/vere/ward.c b/pkg/urbit/vere/ward.c index e47ccae767..9df12dbd00 100644 --- a/pkg/urbit/vere/ward.c +++ b/pkg/urbit/vere/ward.c @@ -128,7 +128,7 @@ u3_gift_free(u3_gift *gif_u) /* u3_ovum_init: initialize an unlinked potential event */ u3_ovum* -u3_ovum_init(c3_l msc_l, +u3_ovum_init(c3_w mil_w, u3_noun tar, u3_noun wir, u3_noun cad) @@ -136,7 +136,7 @@ u3_ovum_init(c3_l msc_l, u3_ovum* egg_u = c3_malloc(sizeof(*egg_u)); egg_u->car_u = 0; egg_u->vod_p = 0; - egg_u->msc_l = msc_l; + egg_u->mil_w = mil_w; egg_u->tar = tar; egg_u->wir = wir; egg_u->cad = cad; diff --git a/pkg/urbit/worker/serf.c b/pkg/urbit/worker/serf.c index 77d87d0ede..76825d4acf 100644 --- a/pkg/urbit/worker/serf.c +++ b/pkg/urbit/worker/serf.c @@ -32,7 +32,7 @@ == == [%peek now=date lyc=gang pat=path] [%play eve=@ lit=(list ?((pair date ovum) *))] - [%work job=(pair date ovum)] + [%work mil=@ job=(pair date ovum)] == :: +plea: from serf to king :: @@ -60,13 +60,7 @@ questions: - %play - expect lifecycle on [%ripe ... eve=0 mug=0] - eve identifies failed event on [%play @ %bail ...] -- %pack - - could just be [%save %full ...] followed by a restart -- %mass - - is technically a query of the serf directly -- milliseconds - - in $writ for timeouts - - in $plea for measurement +- %mass is technically a query of the serf directly - duct or vane stack for spinner */ @@ -481,7 +475,7 @@ _serf_make_crud(u3_noun job, u3_noun dud) /* _serf_poke(): RETAIN */ static u3_noun -_serf_poke(u3_serf* sef_u, c3_c* cap_c, u3_noun job) +_serf_poke(u3_serf* sef_u, c3_c* cap_c, c3_w mil_w, u3_noun job) { u3_noun now, ovo, wen, gon; u3x_cell(job, &now, &ovo); @@ -507,7 +501,7 @@ _serf_poke(u3_serf* sef_u, c3_c* cap_c, u3_noun job) } #endif - gon = u3m_soft(0, u3v_poke, u3k(ovo)); + gon = u3m_soft(mil_w, u3v_poke, u3k(ovo)); #ifdef U3_EVENT_TIME_DEBUG { @@ -545,7 +539,7 @@ _serf_poke(u3_serf* sef_u, c3_c* cap_c, u3_noun job) /* _serf_work(): apply event, capture effects. */ static u3_noun -_serf_work(u3_serf* sef_u, u3_noun job) +_serf_work(u3_serf* sef_u, c3_w mil_w, u3_noun job) { u3_noun gon; c3_w pre_w = u3a_open(u3R); @@ -555,7 +549,7 @@ _serf_work(u3_serf* sef_u, u3_noun job) c3_assert( sef_u->sen_d == sef_u->dun_d); sef_u->sen_d++; - gon = _serf_poke(sef_u, "work", job); // retain + gon = _serf_poke(sef_u, "work", mil_w, job); // retain // event accepted // @@ -578,7 +572,7 @@ _serf_work(u3_serf* sef_u, u3_noun job) // job = _serf_make_crud(job, dud); - gon = _serf_poke(sef_u, "crud", job); // retain + gon = _serf_poke(sef_u, "crud", mil_w, job); // retain // error notification accepted // @@ -608,7 +602,7 @@ _serf_work(u3_serf* sef_u, u3_noun job) /* u3_serf_work(): apply event, producing effects. */ u3_noun -u3_serf_work(u3_serf* sef_u, u3_noun job) +u3_serf_work(u3_serf* sef_u, c3_w mil_w, u3_noun job) { c3_t tac_t = ( 0 != u3_Host.tra_u.fil_u ); c3_c lab_c[2048]; @@ -635,7 +629,7 @@ u3_serf_work(u3_serf* sef_u, u3_noun job) // c3_assert( 0 != sef_u->mug_l); - pro = u3nc(c3__work, _serf_work(sef_u, job)); + pro = u3nc(c3__work, _serf_work(sef_u, mil_w, job)); if ( tac_t ) { u3t_event_trace(lab_c, 'E'); @@ -1034,8 +1028,18 @@ u3_serf_writ(u3_serf* sef_u, u3_noun wit, u3_noun* pel) } break; case c3__work: { - *pel = u3_serf_work(sef_u, u3k(com)); - ret_o = c3y; + u3_noun job, tim; + c3_w mil_w; + + if ( (c3n == u3r_cell(com, &tim, &job)) || + (c3n == u3r_safe_word(tim, &mil_w)) ) + { + ret_o = c3n; + } + else { + *pel = u3_serf_work(sef_u, mil_w, u3k(job)); + ret_o = c3y; + } } break; } }