From 47813a189aaf69d82344f72dd9eb9f159b6118fa Mon Sep 17 00:00:00 2001 From: yosoyubik Date: Sun, 23 May 2021 15:49:55 +0200 Subject: [PATCH] naive: hook up subscription to azimuth for %tx diffs --- pkg/arvo/app/aggregator.hoon | 111 +++++++++++++++++++++++------------ 1 file changed, 72 insertions(+), 39 deletions(-) diff --git a/pkg/arvo/app/aggregator.hoon b/pkg/arvo/app/aggregator.hoon index e093d779c8..a59997b061 100644 --- a/pkg/arvo/app/aggregator.hoon +++ b/pkg/arvo/app/aggregator.hoon @@ -17,8 +17,6 @@ :: on %tx diff from naive, remove the matching tx from the frozen group. :: ::TODO remaining general work: -:: - hook up subscription to azimuth for %tx diffs -:: - hook up thread updates/results :: - hook up timer callbacks :: - cache state, upate after every azimuth %fact :: - properly support private key changes @@ -62,6 +60,7 @@ [%setkey pk=@] [%endpoint endpoint=@t] [%nonce nonce=@ud] + [%subs ~] ::TODO contract address, chain..? == :: @@ -95,7 +94,8 @@ ^- (quip card _this) ::TODO set default frequency and endpoint? =. frequency ~h1 - [~ this] + :_ this + [%pass /azimuth %agent [our.bowl %azimuth] %watch /(scot %p our.bowl)]~ :: ++ on-save !>(state) ++ on-load @@ -196,45 +196,73 @@ |= [=wire =sign:agent:gall] ^- (quip card _this) ~& wire+wire - ?. ?=([%send @t *] wire) + |^ + ?: ?=([%send @t *] wire) + (process-thread i.t.wire sign) + ?. =(%azimuth -.wire) (on-agent:def wire sign) - ?- -.sign - %poke-ack - ?~ p.sign - %- (slog leaf+"Thread started successfully" ~) - [~ this] - %- (slog leaf+"{(trip dap.bowl)} couldn't start thread" u.p.sign) - :_ this - [(leave:spider:do wire)]~ + (process-azimuth-update wire sign) :: - %watch-ack - ?~ p.sign - [~ this] - =/ =tank leaf+"{(trip dap.bowl)} couldn't start listen to thread" - %- (slog tank u.p.sign) - [~ this] - :: - %kick - [~ this] - :: - %fact - ?+ p.cage.sign (on-agent:def wire sign) - %thread-fail - =+ !<([=term =tang] q.cage.sign) - %- (slog leaf+"{(trip dap.bowl)} failed" leaf+ tang) - =^ cards state - (on-thread-result:do (rash i.t.wire dem) %.n^'thread failed') - [cards this] + ++ process-thread + |= [nonce=@t =sign:agent:gall] + ^- (quip card _this) + ?- -.sign + %poke-ack + ?~ p.sign + %- (slog leaf+"Thread started successfully" ~) + [~ this] + %- (slog leaf+"{(trip dap.bowl)} couldn't start thread" u.p.sign) + :_ this + [(leave:spider:do wire)]~ :: - %thread-done - ~& ['all submitted to' t.wire] - :: is aggregator/send thread expected to maybe return an error? - =+ !<(result=(each @ud @t) q.cage.sign) - =^ cards state - (on-thread-result:do (rash i.t.wire dem) result) - [cards this] + %watch-ack + ?~ p.sign + [~ this] + =/ =tank leaf+"{(trip dap.bowl)} couldn't start listen to thread" + %- (slog tank u.p.sign) + [~ this] + :: + %kick + [~ this] + :: + %fact + ?+ p.cage.sign (on-agent:def wire sign) + %thread-fail + =+ !<([=term =tang] q.cage.sign) + %- (slog leaf+"{(trip dap.bowl)} failed" leaf+ tang) + =^ cards state + (on-thread-result:do (rash nonce dem) %.n^'thread failed') + [cards this] + :: + %thread-done + =+ !<(result=(each @ud @t) q.cage.sign) + =^ cards state + (on-thread-result:do (rash nonce dem) result) + [cards this] + == == - == + :: TODO: not tested + :: + ++ process-azimuth-update + |= [=^wire =sign:agent:gall] + ^- (quip card _this) + ?+ -.sign [~ this] + %watch-ack + ?~ p.sign [~ this] + =/ =tank leaf+"{(trip dap.bowl)} couldn't start listen to %azimuth" + %- (slog tank u.p.sign) + [~ this] + :: + %fact + ?+ p.cage.sign (on-agent:def wire sign) + %azimuth-udiffs + =+ !<(=diff:naive q.cage.sign) + =^ cards state + (on-naive-diff:do diff) + [cards this] + == + == + -- -- :: |_ =bowl:gall @@ -256,8 +284,9 @@ |= [=wire thread=term arg=vase] ^- (list card) =/ tid=@ta (rap 3 thread '--' (scot %uv eny.bowl) ~) + =/ args [~ `tid thread arg] :~ [%pass wire %agent [our.bowl %spider] %watch /thread-result/[tid]] - [%pass wire %agent [our.bowl %spider] %poke %spider-start !>([~ `tid thread arg])] + [%pass wire %agent [our.bowl %spider] %poke %spider-start !>(args)] == :: ++ leave @@ -339,6 +368,10 @@ %config [~ state(frequency frequency.action)] %nonce [~ state(next-nonce nonce.action)] %endpoint [~ state(endpoint endpoint.action)] + :: + %subs + :_ state + [%pass /azimuth %agent [our.bowl %azimuth] %watch /(scot %p our.bowl)]~ :: %setkey ::TODO what about existing sending entries?