From f508a569f8203679960d5a8a3cf25c44d46aec6b Mon Sep 17 00:00:00 2001 From: yosoyubik Date: Tue, 21 Feb 2023 13:38:03 +0100 Subject: [PATCH] ames: add core chapters to peer-core --- pkg/arvo/sys/vane/ames.hoon | 196 +++++++++++++++++++----------------- 1 file changed, 104 insertions(+), 92 deletions(-) diff --git a/pkg/arvo/sys/vane/ames.hoon b/pkg/arvo/sys/vane/ames.hoon index 683a54a775..9605039e26 100644 --- a/pkg/arvo/sys/vane/ames.hoon +++ b/pkg/arvo/sys/vane/ames.hoon @@ -3184,6 +3184,9 @@ |= [=peer-state =channel] =* veb veb.bug.channel |% + :: + +| %helpers + :: ++ peer-core . ++ pe-emit |=(move peer-core(event-core (emit +<))) ++ abet @@ -3197,6 +3200,17 @@ |= [verb=? print=(trap tape)] ^+ same (ev-trace verb her.channel print) + :: + :: +got-duct: look up $duct by .bone, asserting already bound + :: + ++ got-duct + |= =bone + ^- ^duct + ~| %dangling-bone^her.channel^bone + (~(got by by-bone.ossuary.peer-state) bone) + :: + +| %tasks + :: ++ on-heed peer-core(heeds.peer-state (~(put in heeds.peer-state) duct)) ++ on-jilt peer-core(heeds.peer-state (~(del in heeds.peer-state) duct)) :: +update-qos: update and maybe print connection status @@ -3221,59 +3235,6 @@ ?. ?=(?(%dead %unborn) -.qos.peer-state) peer-core check-clog - :: +check-clog: notify clients if peer has stopped responding - :: - ++ check-clog - ^+ peer-core - :: - :: Only look at response bones. Request bones are unregulated, - :: since requests tend to be much smaller than responses. - :: - =/ pumps=(list message-pump-state) - %+ murn ~(tap by snd.peer-state) - |= [=bone =message-pump-state] - ?: =(0 (end 0 bone)) - ~ - `u=message-pump-state - :: - =/ clogged=? - |^ &(nuf-messages nuf-memory) - :: +nuf-messages: are there enough messages to mark as clogged? - :: - ++ nuf-messages - =| num=@ud - |- ^- ? - ?~ pumps | - =. num - ;: add num - (sub [next current]:i.pumps) - ~(wyt in unsent-messages.i.pumps) - == - ?: (gte num msg.cong.ames-state) - & - $(pumps t.pumps) - :: +nuf-memory: is enough memory used to mark as clogged? - :: - ++ nuf-memory - =| mem=@ud - |- ^- ? - ?~ pumps | - =. mem - %+ add - %- ~(rep in unsent-messages.i.pumps) - |=([a=@ b=_mem] (add b (met 3 a))) - ?~ unsent-fragments.i.pumps 0 - (met 3 fragment.i.unsent-fragments.i.pumps) - ?: (gte mem mem.cong.ames-state) - & - $(pumps t.pumps) - -- - :: if clogged, notify client vane - :: - ?. clogged - peer-core - %+ roll ~(tap in heeds.peer-state) - |=([d=^duct core=_peer-core] (pe-emit:core d %give %clog her.channel)) :: +on-hear-shut-packet: handle receipt of ack or message fragment :: ++ on-hear-shut-packet @@ -3330,39 +3291,6 @@ == check-clog peer-core - :: +dedup-message: replace with any existing copy of this message - :: - ++ dedup-message - |= =message-blob - ^+ message-blob - ?: (lte (met 13 message-blob) 1) - message-blob - =/ peers-l=(list [=ship =ship-state]) ~(tap by peers.ames-state) - |- ^+ message-blob - =* peer-loop $ - ?~ peers-l - message-blob - ?. ?=(%known -.ship-state.i.peers-l) - peer-loop(peers-l t.peers-l) - =/ snd-l=(list [=bone =message-pump-state]) - ~(tap by snd.ship-state.i.peers-l) - |- ^+ message-blob - =* bone-loop $ - ?~ snd-l - peer-loop(peers-l t.peers-l) - =/ blob-l=(list ^message-blob) - ~(tap to unsent-messages.message-pump-state.i.snd-l) - |- ^+ message-blob - =* blob-loop $ - ?^ blob-l - ?: =(i.blob-l message-blob) - i.blob-l - blob-loop(blob-l t.blob-l) - ?~ unsent-fragments.message-pump-state.i.snd-l - bone-loop(snd-l t.snd-l) - ?: =(message-blob fragment.i.unsent-fragments.message-pump-state.i.snd-l) - `@`fragment.i.unsent-fragments.message-pump-state.i.snd-l - bone-loop(snd-l t.snd-l) :: +on-wake: handle timer expiration :: ++ on-wake @@ -3437,6 +3365,95 @@ :: maybe resend some timed out packets :: (run-message-pump bone %wake ~) + :: + +| %implementation + :: +dedup-message: replace with any existing copy of this message + :: + ++ dedup-message + |= =message-blob + ^+ message-blob + ?: (lte (met 13 message-blob) 1) + message-blob + =/ peers-l=(list [=ship =ship-state]) ~(tap by peers.ames-state) + |- ^+ message-blob + =* peer-loop $ + ?~ peers-l + message-blob + ?. ?=(%known -.ship-state.i.peers-l) + peer-loop(peers-l t.peers-l) + =/ snd-l=(list [=bone =message-pump-state]) + ~(tap by snd.ship-state.i.peers-l) + |- ^+ message-blob + =* bone-loop $ + ?~ snd-l + peer-loop(peers-l t.peers-l) + =/ blob-l=(list ^message-blob) + ~(tap to unsent-messages.message-pump-state.i.snd-l) + |- ^+ message-blob + =* blob-loop $ + ?^ blob-l + ?: =(i.blob-l message-blob) + i.blob-l + blob-loop(blob-l t.blob-l) + ?~ unsent-fragments.message-pump-state.i.snd-l + bone-loop(snd-l t.snd-l) + ?: =(message-blob fragment.i.unsent-fragments.message-pump-state.i.snd-l) + `@`fragment.i.unsent-fragments.message-pump-state.i.snd-l + bone-loop(snd-l t.snd-l) + + :: +check-clog: notify clients if peer has stopped responding + :: + ++ check-clog + ^+ peer-core + :: + :: Only look at response bones. Request bones are unregulated, + :: since requests tend to be much smaller than responses. + :: + =/ pumps=(list message-pump-state) + %+ murn ~(tap by snd.peer-state) + |= [=bone =message-pump-state] + ?: =(0 (end 0 bone)) + ~ + `u=message-pump-state + :: + =/ clogged=? + |^ &(nuf-messages nuf-memory) + :: +nuf-messages: are there enough messages to mark as clogged? + :: + ++ nuf-messages + =| num=@ud + |- ^- ? + ?~ pumps | + =. num + ;: add num + (sub [next current]:i.pumps) + ~(wyt in unsent-messages.i.pumps) + == + ?: (gte num msg.cong.ames-state) + & + $(pumps t.pumps) + :: +nuf-memory: is enough memory used to mark as clogged? + :: + ++ nuf-memory + =| mem=@ud + |- ^- ? + ?~ pumps | + =. mem + %+ add + %- ~(rep in unsent-messages.i.pumps) + |=([a=@ b=_mem] (add b (met 3 a))) + ?~ unsent-fragments.i.pumps 0 + (met 3 fragment.i.unsent-fragments.i.pumps) + ?: (gte mem mem.cong.ames-state) + & + $(pumps t.pumps) + -- + :: if clogged, notify client vane + :: + ?. clogged + peer-core + %+ roll ~(tap in heeds.peer-state) + |=([d=^duct core=_peer-core] (pe-emit:core d %give %clog her.channel)) :: +send-shut-packet: fire encrypted packet at rcvr and maybe sponsors :: ++ send-shut-packet @@ -3482,13 +3499,8 @@ ~> %slog.0^leaf/"ames: recork {}" =/ =plea [%$ /flow [%cork ~]] (on-memo i.boz plea %plea) - :: +got-duct: look up $duct by .bone, asserting already bound :: - ++ got-duct - |= =bone - ^- ^duct - ~| %dangling-bone^her.channel^bone - (~(got by by-bone.ossuary.peer-state) bone) + +| %internals :: +run-message-pump: process $message-pump-task and its effects :: ++ run-message-pump