:: Ames extends Arvo's %pass/%give move semantics across the network. :: :: Ames receives packets as Arvo events and emits packets as Arvo :: effects. The runtime is responsible for transferring the bytes in :: an Ames packet across a physical network to another ship. :: :: The runtime tells Ames which physical address a packet came from, :: represented as an opaque atom. Ames can emit a packet effect to :: one of those opaque atoms or to the Urbit address of a galaxy :: (root node), which the runtime is responsible for translating to a :: physical address. One runtime implementation sends UDP packets :: using IPv4 addresses for ships and DNS lookups for galaxies, but :: other implementations may overlay over other kinds of networks. :: :: A local vane can pass Ames a %plea request message. Ames :: transmits the message over the wire to the peer ship's Ames, which :: passes the message to the destination vane. :: :: Once the peer has processed the %plea message, it sends a :: message-acknowledgment packet over the wire back to the local :: Ames. This ack can either be positive to indicate the request was :: processed, or negative to indicate the request failed, in which :: case it's called a "nack". (Don't confuse Ames nacks with TCP :: nacks, which are a different concept). :: :: When the local Ames receives either a positive message-ack or a :: combination of a nack and naxplanation (explained in more detail :: below), it gives an %done move to the local vane that had :: requested the original %plea message be sent. :: :: A local vane can give Ames zero or more %boon response messages in :: response to a %plea, on the same duct that Ames used to pass the :: %plea to the vane. Ames transmits a %boon over the wire to the :: peer's Ames, which gives it to the destination vane on the same :: duct the vane had used to pass the original %plea to Ames. :: :: %boon messages are acked automatically by the receiver Ames. They :: cannot be nacked, and Ames only uses the ack internally, without :: notifying the client vane that gave Ames the %boon. :: :: If the Arvo event that completed receipt of a %boon message :: crashes, Ames instead sends the client vane a %lost message :: indicating the %boon was missed. :: :: %plea messages can be nacked, in which case the peer will send :: both a message-nack packet and a naxplanation message, which is :: sent in a way that does not interfere with normal operation. The :: naxplanation is sent as a full Ames message, instead of just a :: packet, because the contained error information can be arbitrarily :: large. A naxplanation can only give rise to a positive ack -- :: never ack an ack, and never nack a naxplanation. :: :: Ames guarantees a total ordering of messages within a "flow", :: identified in other vanes by a duct and over the wire by a "bone": :: an opaque number. Each flow has a FIFO queue of %plea requests :: from the requesting ship to the responding ship and a FIFO queue :: of %boon's in the other direction. :: :: Message order across flows is not specified and may vary based on :: network conditions. :: :: Ames guarantees that a message will only be delivered once to the :: destination vane. :: :: Ames encrypts every message using symmetric-key encryption by :: performing an elliptic curve Diffie-Hellman using our private key :: and the public key of the peer. For ships in the Jael PKI :: (public-key infrastructure), Ames looks up the peer's public key :: from Jael. Comets (128-bit ephemeral addresses) are not :: cryptographic assets and must self-attest over Ames by sending a :: single self-signed packet containing their public key. :: :: When a peer suffers a continuity breach, Ames removes all :: messaging state related to it. Ames does not guarantee that all :: messages will be fully delivered to the now-stale peer. From :: Ames's perspective, the newly restarted peer is a new ship. :: Ames's guarantees are not maintained across a breach. :: :: A vane can pass Ames a %heed $task to request Ames track a peer's :: responsiveness. If our %boon's to it start backing up locally, :: Ames will give a %clog back to the requesting vane containing the :: unresponsive peer's urbit address. This interaction does not use :: ducts as unique keys. Stop tracking a peer by sending Ames a :: %jilt $task. :: :: Debug output can be adjusted using %sift and %spew $task's. :: ::TODO fine :: - receiving packets: :: +on-hear (1st) -> +on-hear-packet -> %fine :: - sending packets: :: +on-plea :: -> +make-peer-core (make a function kind of like +on-memo) :: -> call +on-pump-send kind of like how +run-message-pump does :: (assuming as event, scry just stateless) :: !: =, ames =* point point:jael =* public-keys-result public-keys-result:jael :: veb: verbosity flags :: =/ veb-all-off :* snd=`?`%.n :: sending packets rcv=`?`%.n :: receiving packets odd=`?`%.n :: unusual events msg=`?`%.n :: message-level events ges=`?`%.n :: congestion control for=`?`%.n :: packet forwarding rot=`?`%.n :: routing attempts kay=`?`%.n :: is ok/not responding fin=`?`%.n :: remote-scry == =/ packet-size 13 => ~% %ames ..part ~ |% +| %helpers :: +trace: print if .verb is set and we're tracking .ship :: ++ trace |= [mode=?(%ames %fine) verb=? =ship ships=(set ship) print=(trap tape)] ^+ same ?. verb same ?. => [ship=ship ships=ships in=in] ~+ |(=(~ ships) (~(has in ships) ship)) same (slog leaf/"{(trip mode)}: {(scow %p ship)}: {(print)}" ~) :: +qos-update-text: notice text for if connection state changes :: ++ qos-update-text |= [=ship mode=?(%ames %fine) old=qos new=qos k=? ships=(set ship)] ^- (unit tape) :: =+ trace=(cury trace mode) ?+ [-.old -.new] ~ [%unborn %live] `"; {(scow %p ship)} is your neighbor" [%dead %live] ((trace k ship ships |.("is ok")) ~) [%live %dead] ((trace k ship ships |.("not responding still trying")) ~) [%unborn %dead] ((trace k ship ships |.("not responding still trying")) ~) [%live %unborn] `"; {(scow %p ship)} has sunk" [%dead %unborn] `"; {(scow %p ship)} has sunk" == :: +lte-packets: yes if a is before b :: ++ lte-packets |= [a=live-packet-key b=live-packet-key] ^- ? :: ?: (lth message-num.a message-num.b) %.y ?: (gth message-num.a message-num.b) %.n (lte fragment-num.a fragment-num.b) :: +split-message: split message into kilobyte-sized fragments :: :: We don't literally split it here since that would allocate many :: large atoms with no structural sharing. Instead, each :: static-fragment has the entire message and a counter. In :: +encrypt, we interpret this to get the actual fragment. :: ++ split-message ~/ %split-message |= [=message-num =message-blob] ^- (list static-fragment) :: =/ num-fragments=fragment-num (met packet-size message-blob) =| counter=@ :: |- ^- (list static-fragment) ?: (gte counter num-fragments) ~ :: :- [message-num num-fragments counter `@`message-blob] $(counter +(counter)) :: +assemble-fragments: concatenate fragments into a $message :: ++ assemble-fragments ~/ %assemble-fragments |= [num-fragments=fragment-num fragments=(map fragment-num fragment)] ^- * :: =| sorted=(list fragment) =. sorted =/ index=fragment-num 0 |- ^+ sorted ?: =(index num-fragments) sorted $(index +(index), sorted [(~(got by fragments) index) sorted]) :: (cue (rep packet-size (flop sorted))) :: +jim: caching +jam :: ++ jim |=(n=* ~+((jam n))) ++ spit |= =path ^- [pat=@t wid=@ud] =+ pat=(spat path) =+ wid=(met 3 pat) ?> (lte wid 384) [pat wid] :: :: +bind-duct: find or make new $bone for .duct in .ossuary :: ++ bind-duct |= [=ossuary =duct] ^+ [next-bone.ossuary ossuary] :: ?^ existing=(~(get by by-duct.ossuary) duct) [u.existing ossuary] :: :- next-bone.ossuary :+ (add 4 next-bone.ossuary) (~(put by by-duct.ossuary) duct next-bone.ossuary) (~(put by by-bone.ossuary) next-bone.ossuary duct) :: +make-bone-wire: encode ship, rift and bone in wire for sending to vane :: ++ make-bone-wire |= [her=ship =rift =bone] ^- wire :: /bone/(scot %p her)/(scot %ud rift)/(scot %ud bone) :: +parse-bone-wire: decode ship, bone and rift from wire from local vane :: ++ parse-bone-wire |= =wire ^- %- unit $% [%old her=ship =bone] [%new her=ship =rift =bone] == ?. ?| ?=([%bone @ @ @ ~] wire) ?=([%bone @ @ ~] wire) == :: ignore malformed wires :: ~ ?+ wire ~ [%bone @ @ ~] `[%old `@p`(slav %p i.t.wire) `@ud`(slav %ud i.t.t.wire)] :: [%bone @ @ @ ~] %- some :^ %new `@p`(slav %p i.t.wire) `@ud`(slav %ud i.t.t.wire) `@ud`(slav %ud i.t.t.t.wire) == :: +make-pump-timer-wire: construct wire for |packet-pump timer :: ++ make-pump-timer-wire |= [her=ship =bone] ^- wire /pump/(scot %p her)/(scot %ud bone) :: +parse-pump-wire: parse .her and .bone from |packet-pump wire :: ++ parse-pump-wire |= [ship=@ bone=@] ^- (unit [%pump her=^ship =^bone]) ?~ ship=`(unit @p)`(slaw %p ship) ~ ?~ bone=`(unit @ud)`(slaw %ud bone) ~ `pump/[u.ship u.bone] :: ++ parse-fine-wire |= [ship=@ =wire] ^- (unit [%fine her=^ship =^wire]) ?~ ship=`(unit @p)`(slaw %p ship) ~ `fine/[u.ship wire] :: +derive-symmetric-key: $symmetric-key from $private-key and $public-key :: :: Assumes keys have a tag on them like the result of the |ex:crub core. :: ++ derive-symmetric-key ~/ %derive-symmetric-key |= [=public-key =private-key] ^- symmetric-key :: ?> =('b' (end 3 public-key)) =. public-key (rsh 8 (rsh 3 public-key)) :: ?> =('B' (end 3 private-key)) =. private-key (rsh 8 (rsh 3 private-key)) :: `@`(shar:ed:crypto public-key private-key) :: +encode-keys-packet: create key request $packet :: ++ encode-keys-packet ~/ %encode-keys-packet |= [sndr=ship rcvr=ship sndr-life=life] ^- shot :* [sndr rcvr] & & (mod sndr-life 16) `@`1 origin=~ content=`@`%keys == :: ++ response-size 13 :: 1kb :: +sift-roar: assemble scry response fragments into full message :: ++ sift-roar |= [total=@ud hav=(list have)] ^- roar =/ mes=@ %+ rep response-size %+ turn (flop hav) |= =have dat.have =+ sig=(end 9 mes) :- sig =+ dat=(rsh 9 mes) ?~ dat ~ ~| [%fine %response-not-cask] ;;((cask) (cue dat)) :: +welt: like +weld but first argument is reversed :: TODO: move to hoon.hoon ++ welt ~/ %welt |* [a=(list) b=(list)] => .(a ^.(homo a), b ^.(homo b)) |- ^+ b ?~ a b $(a t.a, b [i.a b]) :: +etch-hunk: helper core to serialize a $hunk :: ++ etch-hunk |= [=ship =life =acru:ames] |% :: +| %helpers :: +show-meow: prepare $meow for printing :: ++ show-meow |= =meow :* sig=`@q`(mug sig.meow) num=num.meow siz=siz.meow dat=`@q`(mug dat.meow) == :: ++ make-meow |= [=path mes=@ num=@ud] ^- meow =/ tot (met 13 mes) =/ dat (cut 13 [(dec num) 1] mes) =/ wid (met 3 dat) :* sig=(sign-fra path num dat) :: fragment signature num=tot :: number of fragments siz=?:(=(num tot) (met 3 dat) 1.024) :: fragment byte width dat=dat :: response data fragment == :: ++ etch-meow |= =meow ^- @uxmeow %+ can 3 :~ 64^sig.meow 4^num.meow 2^siz.meow (met 3 dat.meow)^dat.meow == +| %keys :: ++ sign sigh:as:acru ++ sign-fra |= [=path fra=@ud dat=@ux] ::~> %bout.[1 %sign-fra] (sign (jam path fra dat)) :: ++ full |= [=path data=$@(~ (cask))] =/ buf (jam ship life path data) ::=/ nam (crip "sign-full {<(met 3 buf)>}") ::~> %bout.[1 nam] (sign buf) :: +| %serialization :: ++ etch |= [=path =hunk data=$@(~ (cask))] ^- (list @uxmeow) =/ mes=@ =/ sig=@ (full path data) ?~ data sig (mix sig (lsh 9 (jam data))) ::(cat 9 sig (jam data)) :: =/ las (met 13 mes) =/ tip (dec (add [lop len]:hunk)) =/ top (min las tip) =/ num lop.hunk ?> (lte num top) =| res=(list @uxmeow) |- ^+ res ?: =(num top) =- (flop - res) (etch-meow (make-meow path mes num)) $(num +(num), res :_(res (etch-meow (make-meow path mes num)))) -- :: +etch-open-packet: convert $open-packet attestation to $shot :: ++ etch-open-packet ~/ %etch-open-packet |= [pac=open-packet =acru:ames] ^- shot :* [sndr rcvr]:pac req=& sam=& (mod sndr-life.pac 16) (mod rcvr-life.pac 16) origin=~ content=`@`(sign:as:acru (jam pac)) == :: +sift-open-packet: decode comet attestation into an $open-packet :: ++ sift-open-packet ~/ %sift-open-packet |= [=shot our=ship our-life=@] ^- open-packet :: deserialize and type-check packet contents :: =+ ;; [signature=@ signed=@] (cue content.shot) =+ ;; =open-packet (cue signed) :: assert .our and .her and lives match :: ?> .= sndr.open-packet sndr.shot ?> .= rcvr.open-packet our ?> .= sndr-life.open-packet 1 ?> .= rcvr-life.open-packet our-life :: only a star can sponsor a comet :: ?> =(%king (clan:title (^sein:title sndr.shot))) =/ crub (com:nu:crub:crypto public-key.open-packet) :: comet public-key must hash to its @p address :: ?> =(sndr.shot fig:ex:crub) :: verify signature :: ?> (safe:as:crub signature signed) open-packet :: +etch-shut-packet: encrypt and packetize a $shut-packet :: ++ etch-shut-packet ~/ %etch-shut-packet :: TODO add rift to signed messages to prevent replay attacks? :: |= $: =shut-packet =symmetric-key sndr=ship rcvr=ship sndr-life=@ rcvr-life=@ == ^- shot :: =? meat.shut-packet ?& ?=(%& -.meat.shut-packet) (gth (met packet-size fragment.p.meat.shut-packet) 1) == %_ meat.shut-packet fragment.p (cut packet-size [[fragment-num 1] fragment]:p.meat.shut-packet) == :: =/ vec ~[sndr rcvr sndr-life rcvr-life] =/ [siv=@uxH len=@ cyf=@ux] (~(en sivc:aes:crypto (shaz symmetric-key) vec) (jam shut-packet)) :: :* ^= dyad [sndr rcvr] ^= req ?=(%& -.meat.shut-packet) ^= sam & ^= sndr-tick (mod sndr-life 16) ^= sndr-tick (mod rcvr-life 16) ^= origin ~ ^= content :(mix siv (lsh 7 len) (lsh [3 18] cyf)) == :: +sift-shut-packet: decrypt a $shut-packet from a $shot :: ++ sift-shut-packet ~/ %sift-shut-packet |= [=shot =symmetric-key sndr-life=@ rcvr-life=@] ^- shut-packet ?. =(sndr-tick.shot (mod sndr-life 16)) ~| ames-sndr-tick+sndr-tick.shot !! ?. =(rcvr-tick.shot (mod rcvr-life 16)) ~| ames-rcvr-tick+rcvr-tick.shot !! =/ siv (end 7 content.shot) =/ len (end 4 (rsh 7 content.shot)) =/ cyf (rsh [3 18] content.shot) ~| ames-decrypt+[[sndr rcvr origin]:shot len siv] =/ vec ~[sndr.shot rcvr.shot sndr-life rcvr-life] ;; shut-packet %- cue %- need (~(de sivc:aes:crypto (shaz symmetric-key) vec) siv len cyf) :: ordered map for tracking remote scry requests :: ++ orm ((on @ud keen-state) lte) :: ++ is-peer-dead |= [now=@da =peer-state] ^+ peer-state =/ expiry=@da (add ~s30 last-contact.qos.peer-state) =? -.qos.peer-state (gte now expiry) %dead peer-state :: ++ update-peer-route |= [peer=ship =peer-state] ^+ peer-state :: If the peer is not responding, mark the .lane.route as :: indirect. The next packets we emit will be sent to the :: receiver's sponsorship chain in case the receiver's :: transport address has changed and this lane is no longer :: valid. :: :: If .peer is a galaxy, the lane will always remain direct. :: ?. ?& ?=(%dead -.qos.peer-state) ?=(^ route.peer-state) direct.u.route.peer-state !=(%czar (clan:title peer)) == peer-state peer-state(direct.u.route %.n) :: +| %atomics :: +$ private-key @uwprivatekey +$ signature @uwsignature +$ byuts [wid=@ud dat=@ux] :: +| %kinetics :: $channel: combined sender and receiver identifying data :: +$ channel $: [our=ship her=ship] now=@da :: our data, common to all dyads :: $: =our=life crypto-core=acru:ames =bug == :: her data, specific to this dyad :: $: =symmetric-key =her=life =her=rift =her=public-key her-sponsor=ship == == :: $open-packet: unencrypted packet payload, for comet self-attestation :: :: This data structure gets signed and jammed to form the .contents :: field of a $packet. :: :: TODO add rift to prevent replay attacks :: +$ open-packet $: =public-key sndr=ship =sndr=life rcvr=ship =rcvr=life == :: $shut-packet: encrypted packet payload :: +$ shut-packet $: =bone =message-num meat=(each fragment-meat ack-meat) == :: $fragment-meat: contents of a message-fragment packet :: +$ fragment-meat $: num-fragments=fragment-num =fragment-num =fragment == :: $ack-meat: contents of an acknowledgment packet; fragment or message :: :: Fragment acks reference the $fragment-num of the target packet. :: :: Message acks contain a success flag .ok, which is %.n in case of :: negative acknowledgment (nack), along with .lag that describes the :: time it took to process the message. .lag is zero if the message :: was processed during a single Arvo event. At the moment, .lag is :: always zero. :: +$ ack-meat (each fragment-num [ok=? lag=@dr]) :: $naxplanation: nack trace; explains which message failed and why :: +$ naxplanation [=message-num =error] :: +| %statics :: :: $ames-state: state for entire vane :: :: peers: states of connections to other ships :: unix-duct: handle to give moves to unix :: life: our $life; how many times we've rekeyed :: crypto-core: interface for encryption and signing :: bug: debug printing configuration :: snub: blocklist for incoming packets :: cong: parameters for marking a flow as clogged :: +$ ames-state $: peers=(map ship ship-state) =unix=duct =life =rift crypto-core=acru:ames =bug snub=[form=?(%allow %deny) ships=(set ship)] cong=[msg=@ud mem=@ud] == :: +$ azimuth-state [=symmetric-key =life =rift =public-key sponsor=ship] +$ ames-state-4 ames-state-5 +$ ames-state-5 $: peers=(map ship ship-state-5) =unix=duct =life crypto-core=acru-12 bug=bug-9 == :: +$ ship-state-4 ship-state-5 +$ ship-state-5 $% [%alien alien-agenda-12] [%known peer-state-5] == :: +$ peer-state-5 $: azimuth-state route=(unit [direct=? =lane]) =qos =ossuary snd=(map bone message-pump-state) rcv=(map bone message-sink-state) nax=(set [=bone =message-num]) heeds=(set duct) == :: +$ bug-9 $: veb=_[`?`%.n `?`%.n `?`%.n `?`%.n `?`%.n `?`%.n `?`%.n] ships=(set ship) == :: +$ ames-state-6 $: peers=(map ship ship-state-6) =unix=duct =life crypto-core=acru-12 bug=bug-9 == :: +$ ship-state-6 $% [%alien alien-agenda-12] [%known peer-state-6] == :: +$ peer-state-6 $: azimuth-state route=(unit [direct=? =lane]) =qos =ossuary snd=(map bone message-pump-state) rcv=(map bone message-sink-state) nax=(set [=bone =message-num]) heeds=(set duct) == :: +$ ames-state-7 $: peers=(map ship ship-state-7) =unix=duct =life crypto-core=acru-12 bug=bug-9 == :: +$ ames-state-8 $: peers=(map ship ship-state-7) =unix=duct =life crypto-core=acru-12 bug=bug-9 corks=(set wire) == :: +$ ames-state-9 $: peers=(map ship ship-state-7) =unix=duct =life crypto-core=acru-12 bug=bug-9 corks=(set wire) snub=(set ship) == :: +$ ames-state-10 $: peers=(map ship ship-state-7) =unix=duct =life crypto-core=acru-12 bug=bug-12 corks=(set wire) snub=(set ship) == :: +$ ship-state-7 $% [%alien alien-agenda-12] [%known peer-state-7] == :: +$ peer-state-7 $: azimuth-state route=(unit [direct=? =lane]) =qos =ossuary snd=(map bone message-pump-state) rcv=(map bone message-sink-state) nax=(set [=bone =message-num]) heeds=(set duct) closing=(set bone) corked=(set bone) krocs=(set bone) == :: +$ ames-state-11 $: peers=(map ship ship-state-7) =unix=duct =life crypto-core=acru-12 bug=bug-12 corks=(set wire) snub=(set ship) cong=[msg=@ud mem=@ud] == :: +$ queued-event-11 $% [%call =duct wrapped-task=(hobo task-11)] [%take =wire =duct =sign] == :: +$ task-11 $% [%snub ships=(list ship)] $<(%snub task) == :: +$ ames-state-12 $: peers=(map ship ship-state-12) =unix=duct =life crypto-core=acru-12 bug=bug-12 snub=[form=?(%allow %deny) ships=(set ship)] cong=[msg=@ud mem=@ud] == :: +$ ship-state-12 $% [%alien alien-agenda-12] [%known peer-state-12] == :: +$ alien-agenda-12 $: messages=(list [=duct =plea]) packets=(set =blob) heeds=(set duct) == :: +$ peer-state-12 $: azimuth-state route=(unit [direct=? =lane]) =qos =ossuary snd=(map bone message-pump-state) rcv=(map bone message-sink-state) nax=(set [=bone =message-num]) heeds=(set duct) closing=(set bone) corked=(set bone) == :: +$ bug-12 $: veb=_[`?`%.n `?`%.n `?`%.n `?`%.n `?`%.n `?`%.n `?`%.n `?`%.n] ships=(set ship) == :: ++ acru-12 $_ ^? |% ++ as ^? |% ++ seal |~([a=pass b=@] *@) ++ sign |~(a=@ *@) ++ sure |~(a=@ *(unit @)) ++ tear |~([a=pass b=@] *(unit @)) -- ++ de |~([a=@ b=@] *(unit @)) ++ dy |~([a=@ b=@] *@) ++ en |~([a=@ b=@] *@) ++ ex ^? |% ++ fig *@uvH ++ pac *@uvG ++ pub *pass ++ sec *ring -- ++ nu ^? |% ++ pit |~([a=@ b=@] ^?(..nu)) ++ nol |~(a=ring ^?(..nu)) ++ com |~(a=pass ^?(..nu)) -- -- :: $bug: debug printing configuration :: :: veb: verbosity toggles :: ships: identity filter; if ~, print for all :: +$ bug $: veb=_veb-all-off ships=(set ship) == :: +| %dialectics :: :: $move: output effect; either request or response :: +$ move [=duct card=(wind note gift)] :: $queued-event: event to be handled after initial boot completes :: +$ queued-event $% [%call =duct wrapped-task=(hobo task)] [%take =wire =duct =sign] == :: $note: request to other vane :: :: Ames passes a %plea note to another vane when it receives a :: message on a "forward flow" from a peer, originally passed from :: one of the peer's vanes to the peer's Ames. :: :: Ames passes a %plea to itself to handle internal %cork and %pine moves :: Ames passes a %private-keys to Jael to request our private keys. :: Ames passes a %public-keys to Jael to request a peer's public :: keys. :: +$ note $~ [%b %wait *@da] $% $: %b $% [%wait date=@da] [%rest date=@da] == == $: %c $% $>(%warp task:clay) == == $: %d $% [%flog flog:dill] == == $: %j $% [%private-keys ~] [%public-keys ships=(set ship)] [%turf ~] [%ruin ships=(set ship)] == == $: @tas $% [%plea =ship =plea] == == == :: $sign: response from other vane :: +$ sign $~ [%behn %wake ~] $% $: %behn $% $>(%wake gift:behn) == == $: %jael $% [%private-keys =life vein=(map life ring)] [%public-keys =public-keys-result] [%turf turfs=(list turf)] == == $: @tas $% [%done error=(unit error)] [%boon payload=*] == == == :: $message-pump-task: job for |message-pump :: :: %memo: packetize and send application-level message :: %hear: handle receipt of ack on fragment or message :: %near: handle receipt of naxplanation :: $prod: reset congestion control :: %wake: handle timer firing :: +$ message-pump-task $% [%memo =message-blob] [%hear =message-num =ack-meat] [%near =naxplanation] [%prod ~] [%wake ~] == :: $packet-pump-task: job for |packet-pump :: :: %hear: deal with a packet acknowledgment :: %done: deal with message acknowledgment :: %halt: finish event, possibly updating timer :: %wake: handle timer firing :: %prod: reset congestion control :: +$ packet-pump-task $% [%hear =message-num =fragment-num] [%done =message-num lag=@dr] [%halt ~] [%wake current=message-num] [%prod ~] == :: $message-sink-task: job for |message-sink :: :: %done: receive confirmation from vane of processing or failure :: %drop: clear .message-num from .nax.state :: %hear: handle receiving a message fragment packet :: .ok: %.y unless previous failed attempt :: +$ message-sink-task $% [%done ok=?] [%drop =message-num] [%hear =lane =shut-packet ok=?] == -- :: external vane interface :: |= our=ship :: larval ames, before %born sets .unix-duct; wraps adult ames core :: =< =* adult-gate . =| queued-events=(qeu queued-event) =| $= cached-state %- unit $% [%5 ames-state-5] [%6 ames-state-6] [%7 ames-state-7] [%8 ames-state-8] [%9 ames-state-9] [%10 ames-state-10] [%11 ames-state-11] [%12 ames-state-12] [%13 ^ames-state] == :: |= [now=@da eny=@ rof=roof] =* larval-gate . =* adult-core (adult-gate +<) =< |% ++ call ^call ++ load ^load ++ scry ^scry ++ stay ^stay ++ take ^take -- |% ++ larval-core . :: +call: handle request $task :: ++ call |= [=duct dud=(unit goof) wrapped-task=(hobo task)] :: =/ =task ((harden task) wrapped-task) :: reject larval error notifications :: ?^ dud ~|(%ames-larval-call-dud (mean tang.u.dud)) :: before processing events, make sure we have state loaded :: =^ molt-moves larval-core molt :: ?: &(!=(~ unix-duct.ames-state.adult-gate) =(~ queued-events)) =^ moves adult-gate (call:adult-core duct dud task) ~> %slog.0^leaf/"ames: metamorphosis" [(weld molt-moves moves) adult-gate] :: drop incoming packets until we metamorphose :: ?: ?=(%hear -.task) [~ larval-gate] :: %born: set .unix-duct and start draining .queued-events :: ?: ?=(%born -.task) :: process %born using wrapped adult ames :: =^ moves adult-gate (call:adult-core duct dud task) =. moves (weld molt-moves moves) :: kick off a timer to process the first of .queued-events :: =. moves :_(moves [duct %pass /larva %b %wait now]) [moves larval-gate] :: any other event: enqueue it until we have a .unix-duct :: :: XX what to do with errors? :: =. queued-events (~(put to queued-events) %call duct task) [~ larval-gate] :: +take: handle response $sign :: ++ take |= [=wire =duct dud=(unit goof) =sign] ?^ dud ~|(%ames-larval-take-dud (mean tang.u.dud)) :: =^ molt-moves larval-core molt :: ?: &(!=(~ unix-duct.ames-state.adult-gate) =(~ queued-events)) =^ moves adult-gate (take:adult-core wire duct dud sign) ~> %slog.0^leaf/"ames: metamorphosis" [(weld molt-moves moves) adult-gate] :: enqueue event if not a larval drainage timer :: ?. =(/larva wire) =. queued-events (~(put to queued-events) %take wire duct sign) [~ larval-gate] :: larval event drainage timer; pop and process a queued event :: ?. ?=([%behn %wake *] sign) ~> %slog.0^leaf/"ames: larva: strange sign" [~ larval-gate] :: if crashed, print, dequeue, and set next drainage timer :: ?^ error.sign :: .queued-events should never be ~ here, but if it is, don't crash :: ?: =(~ queued-events) =/ =tang [leaf/"ames: cursed metamorphosis" u.error.sign] =/ moves [duct %pass /larva-crash %d %flog %crud %larva tang]~ [moves adult-gate] :: dequeue and discard crashed event :: =. queued-events +:~(get to queued-events) :: .queued-events has been cleared; metamorphose :: ?~ queued-events ~> %slog.0^leaf/"ames: metamorphosis" [~ adult-gate] :: set timer to drain next event :: =/ moves =/ =tang [leaf/"ames: larva: drain crash" u.error.sign] :~ [duct %pass /larva-crash %d %flog %crud %larva tang] [duct %pass /larva %b %wait now] == [moves larval-gate] :: normal drain timer; dequeue and run event :: =^ first-event queued-events ~(get to queued-events) =^ moves adult-gate ?- -.first-event %call (call:adult-core [duct ~ wrapped-task]:+.first-event) %take (take:adult-core [wire duct ~ sign]:+.first-event) == =. moves (weld molt-moves moves) :: .queued-events has been cleared; done! :: ?~ queued-events ~> %slog.0^leaf/"ames: metamorphosis" [moves adult-gate] :: set timer to drain next event :: =. moves :_(moves [duct %pass /larva %b %wait now]) [moves larval-gate] :: lifecycle arms; mostly pass-throughs to the contained adult ames :: ++ scry scry:adult-core ++ stay [%13 %larva queued-events ames-state.adult-gate] ++ load |= $= old $% $: %4 $% $: %larva events=(qeu queued-event) state=ames-state-4 == [%adult state=ames-state-4] == == $: %5 $% $: %larva events=(qeu queued-event) state=ames-state-5 == [%adult state=ames-state-5] == == $: %6 $% $: %larva events=(qeu queued-event) state=ames-state-6 == [%adult state=ames-state-6] == == $: %7 $% $: %larva events=(qeu queued-event) state=ames-state-7 == [%adult state=ames-state-7] == == $: %8 $% $: %larva events=(qeu queued-event) state=ames-state-8 == [%adult state=ames-state-8] == == $: %9 $% $: %larva events=(qeu queued-event) state=ames-state-9 == [%adult state=ames-state-9] == == $: %10 $% $: %larva events=(qeu queued-event) state=ames-state-10 == [%adult state=ames-state-10] == == $: %11 $% $: %larva events=(qeu queued-event-11) state=ames-state-11 == [%adult state=ames-state-11] == == $: %12 $% $: %larva events=(qeu queued-event) state=ames-state-12 == [%adult state=ames-state-12] == == $: %13 $% $: %larva events=(qeu queued-event) state=_ames-state.adult-gate == [%adult state=_ames-state.adult-gate] == == == ?- old [%4 %adult *] $(old [%5 %adult (state-4-to-5:load:adult-core state.old)]) :: [%4 %larva *] =. state.old (state-4-to-5:load:adult-core state.old) $(-.old %5) :: [%5 %adult *] =. cached-state `[%5 state.old] ~> %slog.0^leaf/"ames: larva reload" larval-gate :: [%5 %larva *] ~> %slog.0^leaf/"ames: larva: load" =. queued-events events.old larval-gate :: [%6 %adult *] =. cached-state `[%6 state.old] ~> %slog.0^leaf/"ames: larva reload" larval-gate :: [%6 %larva *] ~> %slog.0^leaf/"ames: larva: load" =. queued-events events.old larval-gate :: [%7 %adult *] =. cached-state `[%7 state.old] ~> %slog.0^leaf/"ames: larva reload" larval-gate :: [%7 %larva *] ~> %slog.0^leaf/"ames: larva: load" =. queued-events events.old larval-gate :: [%8 %adult *] =. cached-state `[%8 state.old] ~> %slog.0^leaf/"ames: larva reload" larval-gate :: [%8 %larva *] ~> %slog.0^leaf/"ames: larva: load" =. queued-events events.old larval-gate :: [%9 %adult *] =. cached-state `[%9 state.old] ~> %slog.0^leaf/"ames: larva reload" larval-gate :: [%9 %larva *] ~> %slog.0^leaf/"ames: larva: load" =. queued-events events.old larval-gate :: [%10 %adult *] =. cached-state `[%10 state.old] ~> %slog.0^leaf/"ames: larva reload" larval-gate :: [%10 %larva *] ~> %slog.1^leaf/"ames: larva: load" =. queued-events events.old larval-gate :: [%11 %adult *] =. cached-state `[%11 state.old] ~> %slog.0^leaf/"ames: larva reload" larval-gate :: [%11 %larva *] ~> %slog.1^leaf/"ames: larva: load" =. queued-events :: "+rep:in on a +qeu looks strange, but works fine." :: %- ~(rep in events.old) |= [e=queued-event-11 q=(qeu queued-event)] %- ~(put to q) ^- queued-event ?. ?=(%call -.e) e =/ task=task-11 ((harden task-11) wrapped-task.e) %= e wrapped-task ?.(?=(%snub -.task) task [%snub %deny ships.task]) == larval-gate :: [%12 %adult *] =. cached-state `[%12 state.old] ~> %slog.0^leaf/"ames: larva reload" larval-gate :: [%12 %larva *] ~> %slog.1^leaf/"ames: larva: load" =. queued-events events.old larval-gate :: [%13 %adult *] (load:adult-core %13 state.old) :: [%13 %larva *] ~> %slog.1^leaf/"ames: larva: load" =. queued-events events.old =. adult-gate (load:adult-core %13 state.old) larval-gate :: == :: +molt: re-evolve to adult-ames :: ++ molt ^- (quip move _larval-core) ?~ cached-state [~ larval-core] ~> %slog.0^leaf/"ames: molt" =? u.cached-state ?=(%5 -.u.cached-state) 6+(state-5-to-6:load:adult-core +.u.cached-state) =? u.cached-state ?=(%6 -.u.cached-state) 7+(state-6-to-7:load:adult-core +.u.cached-state) =^ moz u.cached-state ?. ?=(%7 -.u.cached-state) [~ u.cached-state] ~> %slog.0^leaf/"ames: init daily recork timer" :- [[/ames]~ %pass /recork %b %wait `@da`(add now ~d1)]~ 8+(state-7-to-8:load:adult-core +.u.cached-state) =? u.cached-state ?=(%8 -.u.cached-state) 9+(state-8-to-9:load:adult-core +.u.cached-state) =? u.cached-state ?=(%9 -.u.cached-state) 10+(state-9-to-10:load:adult-core +.u.cached-state) =? u.cached-state ?=(%10 -.u.cached-state) 11+(state-10-to-11:load:adult-core +.u.cached-state) =? u.cached-state ?=(%11 -.u.cached-state) 12+(state-11-to-12:load:adult-core +.u.cached-state) =? u.cached-state ?=(%12 -.u.cached-state) 13+(state-12-to-13:load:adult-core +.u.cached-state) ?> ?=(%13 -.u.cached-state) =. ames-state.adult-gate +.u.cached-state [moz larval-core(cached-state ~)] -- :: => :: |ev: inner event-handling core :: ~% %per-event ..trace ~ |% ++ ev =| moves=(list move) ~% %event-gate ..ev ~ |= [[now=@da eny=@ rof=roof] =duct =ames-state] =* veb veb.bug.ames-state =| cork-bone=(unit bone) :: modified by +on-kroc ~% %event-core ..$ ~ |% +| %helpers :: ++ event-core . ++ abet [(flop moves) ames-state] ++ emit |=(=move event-core(moves [move moves])) ++ emil |=(mos=(list move) event-core(moves (weld (flop mos) moves))) ++ channel-state [life crypto-core bug]:ames-state ++ trace-fine (cury trace %fine) ++ trace-ames (cury trace %ames) ++ ev-trace |= [verb=? =ship print=(trap tape)] ^+ same (trace-ames verb ship ships.bug.ames-state print) :: +get-peer-state: lookup .her state or ~ :: ++ get-peer-state |= her=ship ^- (unit peer-state) :: =- ?.(?=([~ %known *] -) ~ `+.u) (~(get by peers.ames-state) her) :: +got-peer-state: lookup .her state or crash :: ++ got-peer-state |= her=ship ^- peer-state :: ~| %freaky-alien^her =- ?>(?=(%known -<) ->) (~(got by peers.ames-state) her) :: +gut-peer-state: lookup .her state or default :: ++ gut-peer-state |= her=ship ^- peer-state =/ ship-state (~(get by peers.ames-state) her) ?. ?=([~ %known *] ship-state) *peer-state +.u.ship-state :: +| %tasks :: +on-take-done: handle notice from vane that it processed a message :: ++ on-take-done |= [=wire error=(unit error)] ^+ event-core ?: ?=([%fine ?(%pine %unsub) *] wire) ?~ error event-core %- (slog leaf/"fine: {} {(spud +>.wire)}" ~) (emit duct %give %miss +>.wire) ?~ parsed=(parse-bone-wire wire) :: no-op :: ~> %slog.0^leaf/"ames: dropping malformed wire: {(spud wire)}" event-core ?> ?=([@ her=ship *] u.parsed) =* her her.u.parsed =/ peer-core (abed-got:pe her) |^ ?: ?& ?=([%new *] u.parsed) (lth rift.u.parsed rift.peer-state.peer-core) == :: ignore events from an old rift :: %- %^ ev-trace odd.veb her |.("dropping old rift wire: {(spud wire)}") event-core =/ =bone ?-(u.parsed [%new *] bone.u.parsed, [%old *] bone.u.parsed) =? peer-core ?=([%old *] u.parsed) %- %^ ev-trace odd.veb her |.("parsing old wire: {(spud wire)}") peer-core :: relay the vane ack to the foreign peer :: =< abet ?~(error (send-ack bone) (send-nack bone u.error)) :: :: if processing succeded, send positive ack packet and exit :: ++ send-ack |= =bone ^+ peer-core :: handle cork only deals with bones that are in closing :: (handle-cork:abet:(call:(abed:mi:peer-core bone) %done ok=%.y) bone) :: failed; send message nack packet :: ++ send-nack |= [=bone =^error] ^+ peer-core =. peer-core abet:(call:(abed:mi:peer-core bone) %done ok=%.n) =. event-core abet:peer-core :: XX extraneous? :: construct nack-trace message, referencing .failed $message-num :: =/ failed=message-num last-acked:(~(got by rcv.peer-state.peer-core) bone) =/ =naxplanation [failed error] =/ =message-blob (jam naxplanation) :: send nack-trace message on associated .nack-bone :: =/ nack-bone=^bone (mix 0b10 bone) abet:(call:(abed:mu:(abed-got:pe her) nack-bone) %memo message-blob) -- :: +on-sift: handle request to filter debug output by ship :: ++ on-sift |= ships=(list ship) ^+ event-core =. ships.bug.ames-state (sy ships) event-core :: +on-snub: handle request to change ship blacklist :: ++ on-snub |= [form=?(%allow %deny) ships=(list ship)] ^+ event-core =. snub.ames-state [form (sy ships)] event-core :: +on-spew: handle request to set verbosity toggles on debug output :: ++ on-spew |= verbs=(list verb) ^+ event-core :: start from all %.n's, then flip requested toggles :: =. veb.bug.ames-state %+ roll verbs |= [=verb acc=_veb-all-off] ^+ veb.bug.ames-state ?- verb %snd acc(snd %.y) %rcv acc(rcv %.y) %odd acc(odd %.y) %msg acc(msg %.y) %ges acc(ges %.y) %for acc(for %.y) %rot acc(rot %.y) %kay acc(kay %.y) %fin acc(fin %.y) == event-core :: +on-prod: re-send a packet per flow to each of .ships :: ++ on-prod |= ships=(list ship) ^+ event-core =? ships =(~ ships) ~(tap in ~(key by peers.ames-state)) |^ ^+ event-core ?~ ships event-core $(ships t.ships, event-core (prod-peer i.ships)) :: ++ prod-peer |= her=ship ^+ event-core =/ par (get-peer-state her) ?~ par event-core =/ peer-core (abed-peer:pe her u.par) =/ bones ~(tap in ~(key by snd.u.par)) |- ^+ event-core ?~ bones abet:peer-core =. peer-core abet:(call:(abed:mu:peer-core i.bones) %prod ~) $(bones t.bones) -- :: +on-cong: adjust congestion control parameters :: ++ on-cong |= [msg=@ud mem=@ud] ^+ event-core =. cong.ames-state msg^mem event-core :: +on-stir: recover from timer desync, setting new timers as needed :: :: .arg is unused, meant to ease future debug commands :: ++ on-stir |= arg=@t ^+ event-core =/ want=(set [@da ^duct]) %- ~(rep by peers.ames-state) |= [[who=ship s=ship-state] acc=(set [@da ^duct])] ?. ?=(%known -.s) acc %- ~(rep by snd.+.s) |= [[b=bone m=message-pump-state] acc=_acc] =* tim next-wake.packet-pump-state.m ?~ tim acc %- ~(put in acc) [u.tim `^duct`~[ames+(make-pump-timer-wire who b) /ames]] =. want (~(put in want) (add now ~d1) ~[/ames/recork /ames]) :: =/ have %- ~(gas in *(set [@da ^duct])) =/ tim ;; (list [@da ^duct]) =< q.q %- need %- need (rof ~ %bx [[our %$ da+now] /debug/timers]) (skim tim |=([@da hen=^duct] ?=([[%ames ?(%pump %recork) *] *] hen))) :: :: set timers for flows that should have one set but don't :: =. event-core %- ~(rep in (~(dif in want) have)) |= [[wen=@da hen=^duct] this=_event-core] ?> ?=([^ *] hen) (emit:this ~[/ames] %pass t.i.hen %b %wait wen) :: :: cancel timers for flows that have one set but shouldn't :: %- ~(rep in (~(dif in have) want)) |= [[wen=@da hen=^duct] this=_event-core] ?> ?=([^ *] hen) (emit:this t.hen %pass t.i.hen %b %rest wen) :: +on-crud: handle event failure; print to dill :: ++ on-crud |= =error ^+ event-core (emit duct %pass /crud %d %flog %crud error) :: +on-heed: handle request to track .ship's responsiveness :: ++ on-heed |= =ship ^+ event-core =/ ship-state (~(get by peers.ames-state) ship) ?: ?=([~ %known *] ship-state) abet:on-heed:(abed-peer:pe ship +.u.ship-state) %+ enqueue-alien-todo ship |= todos=alien-agenda todos(heeds (~(put in heeds.todos) duct)) :: +on-jilt: handle request to stop tracking .ship's responsiveness :: ++ on-jilt |= =ship ^+ event-core =/ ship-state (~(get by peers.ames-state) ship) ?: ?=([~ %known *] ship-state) abet:on-jilt:(abed-peer:pe ship +.u.ship-state) %+ enqueue-alien-todo ship |= todos=alien-agenda todos(heeds (~(del in heeds.todos) duct)) :: +on-hear: handle raw packet receipt :: ++ on-hear |= [l=lane b=blob d=(unit goof)] ^+ event-core =/ =shot (sift-shot b) ?: sam.shot (on-hear-packet l shot d) ?: req.shot ~|([%fine %request-events-forbidden] !!) ?^ d ::TODO handle ~& [%fine %done-goofed mote.u.d] %- (slog tang.u.d) event-core :: TODO no longer true ::NOTE we only send requests to ships we know, :: so we should only get responses from ships we know. :: below we assume sndr.shot is a known peer. abet:(on-hear-fine:(abed-got:pe sndr.shot) l shot) :: +on-hear-packet: handle mildly processed packet receipt :: ++ on-hear-packet ~/ %on-hear-packet |= [=lane =shot dud=(unit goof)] ^+ event-core %- (ev-trace rcv.veb sndr.shot |.("received packet")) :: ?: =(our sndr.shot) event-core ?: .= =(%deny form.snub.ames-state) (~(has in ships.snub.ames-state) sndr.shot) %- (ev-trace rcv.veb sndr.shot |.("snubbed")) event-core :: %. +< :: ?. =(our rcvr.shot) on-hear-forward :: ?: =(%keys content.shot) on-hear-keys ?: ?& ?=(%pawn (clan:title sndr.shot)) !?=([~ %known *] (~(get by peers.ames-state) sndr.shot)) == on-hear-open on-hear-shut :: +on-hear-forward: maybe forward a packet to someone else :: :: Note that this performs all forwarding requests without :: filtering. Any protection against DDoS amplification will be :: provided by Vere. :: ++ on-hear-forward ~/ %on-hear-forward |= [=lane =shot dud=(unit goof)] ^+ event-core %- %^ ev-trace for.veb sndr.shot |.("forward: {} -> {}") :: set .origin.shot if it doesn't have one, re-encode, and send :: =? origin.shot &(?=(~ origin.shot) !=(%czar (clan:title sndr.shot))) ?: ?=(%& -.lane) ~ ?. (lte (met 3 p.lane) 6) ~| ames-lane-size+p.lane !! `p.lane :: =/ =blob (etch-shot shot) (send-blob & rcvr.shot blob) :: +on-hear-keys: handle receipt of attestion request :: ++ on-hear-keys ~/ %on-hear-keys |= [=lane =shot dud=(unit goof)] =+ %^ ev-trace msg.veb sndr.shot |.("requested attestation") ?. =(%pawn (clan:title our)) event-core (send-blob | sndr.shot (attestation-packet sndr.shot 1)) :: +on-hear-open: handle receipt of plaintext comet self-attestation :: ++ on-hear-open ~/ %on-hear-open |= [=lane =shot dud=(unit goof)] ^+ event-core =+ %^ ev-trace msg.veb sndr.shot |.("got attestation") :: assert the comet can't pretend to be a moon or other address :: ?> ?=(%pawn (clan:title sndr.shot)) :: if we already know .sndr, ignore duplicate attestation :: =/ ship-state (~(get by peers.ames-state) sndr.shot) ?: ?=([~ %known *] ship-state) event-core :: =/ =open-packet (sift-open-packet shot our life.ames-state) :: add comet as an %alien if we haven't already :: =? peers.ames-state ?=(~ ship-state) (~(put by peers.ames-state) sndr.shot %alien *alien-agenda) :: upgrade comet to %known via on-publ-full :: =. event-core =/ crypto-suite=@ud 1 =/ keys (my [sndr-life.open-packet crypto-suite public-key.open-packet]~) =/ =point :* ^= rift 0 ^= life sndr-life.open-packet ^= keys keys ^= sponsor `(^sein:title sndr.shot) == (on-publ / [%full (my [sndr.shot point]~)]) :: manually add the lane to the peer state :: =. peers.ames-state =/ =peer-state (gut-peer-state sndr.shot) =. route.peer-state `[direct=%.n lane] (~(put by peers.ames-state) sndr.shot %known peer-state) :: event-core :: +on-hear-shut: handle receipt of encrypted packet :: ++ on-hear-shut ~/ %on-hear-shut |= [=lane =shot dud=(unit goof)] ^+ event-core =/ sndr-state (~(get by peers.ames-state) sndr.shot) :: If we don't know them, ask Jael for their keys. If they're a :: comet, this will also cause us to request a self-attestation :: from the sender. The packet itself is dropped; we can assume it :: will be resent. :: ?. ?=([~ %known *] sndr-state) (enqueue-alien-todo sndr.shot |=(alien-agenda +<)) :: decrypt packet contents using symmetric-key.channel :: :: If we know them, we have a $channel with them, which we've :: populated with a .symmetric-key derived from our private key :: and their public key using elliptic curve Diffie-Hellman. :: =/ =peer-state +.u.sndr-state =/ =channel [[our sndr.shot] now channel-state -.peer-state] ~| %ames-crash-on-packet-from^her.channel =/ =shut-packet (sift-shut-packet shot [symmetric-key her-life our-life]:channel) :: non-galaxy: update route with heard lane or forwarded lane :: =? route.peer-state !=(%czar (clan:title her.channel)) :: if new packet is direct, use that. otherwise, if the new new :: and old lanes are indirect, use the new one. if the new lane :: is indirect but the old lane is direct, then if the lanes are :: identical, don't mark it indirect; if they're not identical, :: use the new lane and mark it indirect. :: :: if you mark lane as indirect because you got an indirect :: packet even though you already had a direct identical lane, :: then delayed forwarded packets will come later and reset to :: indirect, so you're unlikely to get a stable direct route :: (unless the forwarder goes offline for a while). :: :: conversely, if you don't accept indirect routes with different :: lanes, then if your lane is stale and they're trying to talk :: to you, your acks will go to the stale lane, and you'll never :: time it out unless you reach out to them. this manifests as :: needing to |hi or dotpost to get a response when the other :: ship has changed lanes. :: ?: ?=(~ origin.shot) `[direct=%.y lane] ?: ?=([~ %& *] route.peer-state) ?: =(lane.u.route.peer-state |+u.origin.shot) route.peer-state `[direct=%.n |+u.origin.shot] `[direct=%.n |+u.origin.shot] :: perform peer-specific handling of packet :: =< abet (~(on-hear-shut-packet pe peer-state channel) [lane shut-packet dud]) :: +on-take-boon: receive request to give message to peer :: ++ on-take-boon |= [=wire payload=*] ^+ event-core ?: ?=([%fine %pine @ *] wire) ?~ her=(slaw %p i.t.t.wire) :: XX use ev-trace? =/ =tape "; fine dropping malformed wire {}" (emit duct %pass /parse-wire %d %flog %text tape) abet:(on-pine-boon:(abed-got:pe u.her) t.t.t.wire payload) :: ?~ parsed=(parse-bone-wire wire) ~> %slog.0^leaf/"ames: dropping malformed wire: {(spud wire)}" event-core :: ?> ?=([@ her=ship *] u.parsed) =* her her.u.parsed =/ peer-core (abed-got:pe her) :: ?: ?& ?=([%new *] u.parsed) (lth rift.u.parsed rift.peer-state.peer-core) == :: ignore events from an old rift :: %- %^ ev-trace odd.veb her |.("dropping old rift wire: {(spud wire)}") event-core =/ =bone ?-(u.parsed [%new *] bone.u.parsed, [%old *] bone.u.parsed) =? peer-core ?=([%old *] u.parsed) %- %^ ev-trace odd.veb her |.("parsing old wire: {(spud wire)}") peer-core abet:(on-memo:peer-core bone payload %boon) :: +on-plea: handle request to send message :: ++ on-plea |= [=ship =plea] ^+ event-core :: since flow kill goes like: :: client vane cork task -> client ames pass cork as plea -> :: -> server ames sinks plea -> server ames +on-plea (we are here); :: if it's %cork plea passed to ames from its sink, :: give %done and process flow closing after +on-take-done call :: ?: =([%a /close ~] plea) (emit duct %give %done ~) :: ?: ?=([%a [%cone ~] *] plea) :: sent to ourselves from ha-plea, after sinking a remote /pine plea :: =+ ;;(blk=balk payload.plea) ?> &(=(our her.blk) =(%c van.blk) ?=(^ spr.blk)) =+ nom=(en-roof:balk blk(car %w, cas da/now, spr `path`~[i.spr.blk])) :: ?+ cage=(rof ~ nom) !! :: XX: crashing correct behaviour? [~ ~] (emit duct %give %done `miss/"{<(en-path:balk blk)>}") :: [~ ~ *] =+ !<(=cass:clay q.u.u.cage) =~ (emit duct %give %boon ud:cass) (emit duct %give %done ~) == == :: .plea is from local vane to foreign ship :: =/ ship-state (~(get by peers.ames-state) ship) :: ?. ?=([~ %known *] ship-state) ~& %on-plea %+ enqueue-alien-todo ship |= todos=alien-agenda todos(messages [[duct plea] messages.todos]) :: =/ =peer-state +.u.ship-state =/ =channel [[our ship] now channel-state -.peer-state] :: =^ =bone ossuary.peer-state (bind-duct ossuary.peer-state duct) %- %^ ev-trace msg.veb ship |. ^- tape =/ sndr [our our-life.channel] =/ rcvr [ship her-life.channel] "plea {}" abet:(~(on-memo pe [peer-state channel]) bone plea %plea) :: +on-cork: handle request to kill a flow :: ++ on-cork |= =ship ^+ event-core =/ =plea [%$ /flow [%cork ~]] =/ ship-state (~(get by peers.ames-state) ship) ?. ?=([~ %known *] ship-state) %+ enqueue-alien-todo ship |= todos=alien-agenda todos(messages [[duct plea] messages.todos]) =/ =peer-state +.u.ship-state =/ =channel [[our ship] now channel-state -.peer-state] :: =/ [=bone ossuary=_ossuary.peer-state] ?^ cork-bone [u.cork-bone ossuary.peer-state] (bind-duct ossuary.peer-state duct) =. ossuary.peer-state ossuary :: ?. (~(has by by-bone.ossuary.peer-state) bone) %. event-core %^ ev-trace odd.veb ship |.("trying to cork {}, not in the ossuary, ignoring") :: =. closing.peer-state (~(put in closing.peer-state) bone) %- %^ ev-trace msg.veb ship |. ^- tape =/ sndr [our our-life.channel] =/ rcvr [ship her-life.channel] "cork plea {}" abet:(~(on-memo pe [peer-state channel]) bone plea %plea) :: +on-kroc: cork all stale flows from failed subscriptions :: ++ on-kroc |= dry=? ^+ event-core :: no-op :: ?: & %.(event-core (slog leaf/"ames: %kroc task not ready" ~)) :: =; [corks=@ core=_event-core] ?. dry core %.(core (slog leaf/"ames: #{} flows can be corked" ~)) :: %+ roll ~(tap by peers.ames-state) |= [[=ship =ship-state] corks=@ core=_event-core] ?. ?=(%known -.ship-state) corks^core =/ =peer-state:ames ?>(?=(%known -.ship-state) +.ship-state) =/ subs=(jar path [bone sub-nonce=@]) %+ roll ~(tap by snd.peer-state) |= $: [=forward=bone message-pump-state:ames] subs=(jar path [bone sub-nonce=@]) == ?: (~(has in closing.peer-state) forward-bone) %. subs %^ ev-trace &(dry odd.veb) ship |. %+ weld "stale flow bone={} in closing, " "#{<~(wyt in live:packet-pump-state)>} packets retrying" ?~ duct=(~(get by by-bone.ossuary.peer-state) forward-bone) subs ?. ?=([* [%gall %use sub=@ @ %out @ @ nonce=@ pub=@ *] *] u.duct) subs =/ =wire i.t.u.duct =/ nonce=(unit @) (rush (snag 7 wire) dem) %- ~(add ja subs) :: 0 for old pre-nonce subscriptions :: :_ [forward-bone ?~(nonce 0 u.nonce)] ?~ nonce wire :: don't include the sub-nonce in the key :: (weld (scag 7 wire) (slag 8 wire)) %+ roll ~(tap by subs) |= [[=wire flows=(list [bone sub-nonce=@])] corks=_corks core=_core] :: %- tail %+ roll (sort flows |=([[@ n=@] [@ m=@]] (lte n m))) |= [[=bone nonce=@] resubs=_(lent flows) corks=_corks core=_core] =/ app=term ?>(?=([%gall %use sub=@ *] wire) i.t.t.wire) =/ =path (slag 7 wire) =/ log=tape "[bone={} agent={} nonce={}] {}" =; corkable=? =? corks corkable +(corks) =? core &(corkable !dry) (%*(on-cork core cork-bone `bone) ship) (dec resubs)^corks^core :: checks if this is a stale re-subscription :: ?. =(resubs 1) %. & (ev-trace &(dry odd.veb) ship |.((weld "stale %watch plea " log))) :: the current subscription can be safely corked if there :: is a flow with a naxplanation ack on a backward bone :: =+ backward-bone=(mix 0b10 bone) ?. =(2 (mod backward-bone 4)) | ?~ (~(get by rcv.peer-state) backward-bone) | %. & (ev-trace &(dry odd.veb) ship |.((weld "failed %watch plea " log))) :: +on-take-wake: receive wakeup or error notification from behn :: ++ on-take-wake |= [=wire error=(unit tang)] ^+ event-core ?: ?=([%alien @ ~] wire) :: if we haven't received an attestation, ask again :: ?^ error %- (slog 'ames: attestation timer failed' u.error) event-core ?~ ship=`(unit @p)`(slaw %p i.t.wire) %- (slog leaf+"ames: got timer for strange wire: {}" ~) event-core =/ ship-state (~(get by peers.ames-state) u.ship) ?: ?=([~ %known *] ship-state) event-core (request-attestation u.ship) :: ?. ?=([%recork ~] wire) =/ res=(unit ?([%fine her=ship =^wire] [%pump her=ship =bone])) ?+ wire ~| %ames-wire-timer^wire !! [%pump ship=@ bone=@ ~] (parse-pump-wire &2.wire &3.wire) [%fine %behn %wake @ *] (parse-fine-wire &4.wire t.t.t.t.wire) == ?~ res %- (slog leaf+"ames: got timer for strange wire: {}" ~) event-core :: =/ state=(unit peer-state) (get-peer-state her.u.res) ?~ state %. event-core %- slog [leaf+"ames: got timer for strange ship: {}, ignoring" ~] :: =/ peer-core (abed-peer:pe her.u.res u.state) ?- -.u.res %pump abet:(on-wake:peer-core bone.u.res error) %fine abet:fi-abet:fi-take-wake:(abed:fi:peer-core wire.u.res) == :: =. event-core (emit duct %pass /recork %b %wait `@da`(add now ~d1)) :: ?^ error %- (slog 'ames: recork timer failed' u.error) event-core :: recork up to one bone per peer :: =/ pez ~(tap by peers.ames-state) |- ^+ event-core ?~ pez event-core =+ [her sat]=i.pez ?. ?=(%known -.sat) $(pez t.pez) $(pez t.pez, event-core abet:recork-one:(abed-peer:pe her +.sat)) :: +on-init: first boot; subscribe to our info from jael :: ++ on-init ^+ event-core :: =~ (emit duct %pass /turf %j %turf ~) (emit duct %pass /private-keys %j %private-keys ~) == :: +on-priv: set our private key to jael's response :: ++ on-priv |= [=life vein=(map life private-key)] ^+ event-core :: =/ =private-key (~(got by vein) life) =. life.ames-state life =. crypto-core.ames-state (nol:nu:crub:crypto private-key) :: recalculate each peer's symmetric key :: =/ our-private-key sec:ex:crypto-core.ames-state =. peers.ames-state %- ~(run by peers.ames-state) |= =ship-state ^+ ship-state :: ?. ?=(%known -.ship-state) ship-state :: =/ =peer-state +.ship-state =. symmetric-key.peer-state (derive-symmetric-key public-key.+.ship-state our-private-key) :: [%known peer-state] :: event-core :: +on-publ: update pki data for peer or self :: ++ on-publ |= [=wire =public-keys-result] ^+ event-core :: |^ ^+ event-core :: ?- public-keys-result [%diff @ %rift *] (on-publ-rift [who to.diff]:public-keys-result) :: [%diff @ %keys *] (on-publ-rekey [who to.diff]:public-keys-result) :: [%diff @ %spon *] (on-publ-sponsor [who to.diff]:public-keys-result) :: [%full *] (on-publ-full points.public-keys-result) :: [%breach *] (on-publ-breach who.public-keys-result) == :: +on-publ-breach: handle continuity breach of .ship; wipe its state :: :: Abandon all pretense of continuity and delete all messaging state :: associated with .ship, including sent and unsent messages. :: Also cancel all timers related to .ship. :: ++ on-publ-breach |= =ship ^+ event-core :: =/ ship-state (~(get by peers.ames-state) ship) :: we shouldn't be hearing about ships we don't care about :: ?~ ship-state ~> %slog.0^leaf/"ames: breach unknown {}" event-core :: if an alien breached, this doesn't affect us :: ?: ?=([~ %alien *] ship-state) ~> %slog.0^leaf/"ames: breach alien {}" event-core ~> %slog.0^leaf/"ames: breach peer {}" :: a peer breached; drop messaging state :: =/ =peer-state +.u.ship-state =/ old-qos=qos qos.peer-state :: cancel all timers related to .ship :: =. event-core %+ roll ~(tap by snd.peer-state) |= [[=snd=bone =message-pump-state] core=_event-core] ^+ core :: ?~ next-wake=next-wake.packet-pump-state.message-pump-state core :: note: copies +on-pump-rest:message-pump :: =/ wire (make-pump-timer-wire ship snd-bone) =/ duct ~[/ames] (emit:core duct %pass wire %b %rest u.next-wake) :: reset all peer state other than pki data :: =. +.peer-state +:*^peer-state :: print change to quality of service, if any :: =/ text=(unit tape) %^ qos-update-text ship %ames [old-qos qos.peer-state kay.veb ships.bug.ames-state] :: =? event-core ?=(^ text) (emit duct %pass /qos %d %flog %text u.text) :: reinitialize galaxy route if applicable :: =? route.peer-state =(%czar (clan:title ship)) `[direct=%.y lane=[%& ship]] :: =. peers.ames-state (~(put by peers.ames-state) ship [%known peer-state]) :: event-core :: +on-publ-rekey: handle new key for peer :: :: TODO: assert .crypto-suite compatibility :: ++ on-publ-rekey |= $: =ship =life crypto-suite=@ud =public-key == ^+ event-core :: =/ ship-state (~(get by peers.ames-state) ship) ?. ?=([~ %known *] ship-state) =| =point =. life.point life =. keys.point (my [life crypto-suite public-key]~) =. sponsor.point `(^^sein:title rof our now ship) :: (on-publ-full (my [ship point]~)) :: =/ =peer-state +.u.ship-state =/ =private-key sec:ex:crypto-core.ames-state =. symmetric-key.peer-state (derive-symmetric-key public-key private-key) :: =. life.peer-state life =. public-key.peer-state public-key :: =. peers.ames-state (~(put by peers.ames-state) ship %known peer-state) event-core :: +on-publ-sponsor: handle new or lost sponsor for peer :: :: TODO: really handle sponsor loss :: ++ on-publ-sponsor |= [=ship sponsor=(unit ship)] ^+ event-core :: ?~ sponsor %- (slog leaf+"ames: {(scow %p ship)} lost sponsor, ignoring" ~) event-core :: =/ state=(unit peer-state) (get-peer-state ship) ?~ state %- (slog leaf+"ames: missing peer-state, ignoring" ~) event-core =. sponsor.u.state u.sponsor =. peers.ames-state (~(put by peers.ames-state) ship %known u.state) event-core :: +on-publ-full: handle new pki data for peer(s) :: ++ on-publ-full |= points=(map ship point) ^+ event-core :: => .(points ~(tap by points)) |^ ^+ event-core ?~ points event-core :: =+ ^- [=ship =point] i.points :: ?. (~(has by keys.point) life.point) $(points t.points) :: =/ old-ship-state (~(get by peers.ames-state) ship) :: =. event-core (insert-peer-state ship point) :: =? event-core ?=([~ %alien *] old-ship-state) (meet-alien ship point +.u.old-ship-state) :: $(points t.points) :: ++ meet-alien |= [=ship =point todos=alien-agenda] |^ ^+ event-core :: if we're a comet, send self-attestation packet first :: =? event-core =(%pawn (clan:title our)) (send-blob | ship (attestation-packet ship life.point)) :: save current duct :: =/ original-duct duct :: apply heeds :: =. event-core %+ roll ~(tap in heeds.todos) |= [=^duct core=_event-core] (on-heed:core(duct duct) ship) :: apply outgoing messages, reversing for FIFO order :: =. event-core %+ reel messages.todos |= [[=^duct =plea] core=_event-core] ?: ?=(%$ -.plea) (on-cork:core(duct duct) ship) (on-plea:core(duct duct) ship plea) :: apply outgoing packet blobs :: =. event-core %+ roll ~(tap in packets.todos) |= [=blob core=_event-core] (send-blob:core | ship blob) :: apply remote scry requests :: =. event-core (meet-alien-fine keens.todos %keen) =. event-core (meet-alien-fine pines.todos %pine) :: event-core(duct original-duct) :: ++ meet-alien-fine |= [peens=(jug path ^duct) key=?(%keen %pine)] ^+ event-core =+ peer-core=(abed:pe ship) =< abet ^+ peer-core %- ~(rep by peens) |= [[=path ducts=(set ^duct)] cor=_peer-core] %- ~(rep in ducts) |= [=^duct c=_cor] %.([path duct] ?-(key %pine on-pine:c, %keen on-keen:c)) -- -- :: on-publ-rift: XX :: ++ on-publ-rift |= [=ship =rift] ^+ event-core ?~ ship-state=(~(get by peers.ames-state) ship) :: print error here? %rift was probably called before %keys :: ~> %slog.1^leaf/"ames: missing peer-state on-publ-rift" event-core ?: ?=([%alien *] u.ship-state) :: ignore aliens :: event-core =/ =peer-state +.u.ship-state =. rift.peer-state rift =. peers.ames-state (~(put by peers.ames-state) ship %known peer-state) event-core :: ++ insert-peer-state |= [=ship =point] ^+ event-core :: =/ =peer-state (gut-peer-state ship) =/ =public-key pass:(~(got by keys.point) life.point) =/ =private-key sec:ex:crypto-core.ames-state =/ =symmetric-key (derive-symmetric-key public-key private-key) :: =. qos.peer-state [%unborn now] =. life.peer-state life.point =. rift.peer-state rift.point =. public-key.peer-state public-key =. symmetric-key.peer-state symmetric-key =. sponsor.peer-state ?^ sponsor.point u.sponsor.point (^^sein:title rof our now ship) :: automatically set galaxy route, since unix handles lookup :: =? route.peer-state ?=(%czar (clan:title ship)) `[direct=%.y lane=[%& ship]] :: =. peers.ames-state (~(put by peers.ames-state) ship %known peer-state) :: event-core -- :: +on-take-turf: relay %turf move from jael to unix :: ++ on-take-turf |= turfs=(list turf) ^+ event-core :: (emit unix-duct.ames-state %give %turf turfs) :: +on-born: handle unix process restart :: ++ on-born ^+ event-core :: =. unix-duct.ames-state duct :: =/ turfs ;; (list turf) =< q.q %- need %- need (rof ~ %j `beam`[[our %turf %da now] /]) :: (emit unix-duct.ames-state %give %turf turfs) :: +on-vega: handle kernel reload :: ++ on-vega event-core :: +on-trim: handle request to free memory :: :: %ruin comets not seen for six months :: ++ on-trim ::TODO trim fine parts on high prio ^+ event-core =; rui=(set @p) (emit duct %pass /ruin %j %ruin rui) =- (silt (turn - head)) %+ skim ~(tap by peers.ames-state) |= [=ship s=ship-state] ?. &(?=(%known -.s) =(%pawn (clan:title ship))) %.n ?& (gth (sub now ~d180) last-contact.qos.s) :: %- ~(any by snd.s) |= m=message-pump-state !=(~ unsent-fragments.m) == :: +| %fine-entry-points :: ++ on-pine |= [=ship =path] ^+ event-core ?< =(our ship) :: XX don't crash? =/ ship-state (~(get by peers.ames-state) ship) ?: ?=([~ %known *] ship-state) abet:(on-pine:(abed-peer:pe ship +.u.ship-state) path duct) %+ enqueue-alien-todo ship |= todos=alien-agenda todos(pines (~(put ju pines.todos) path duct)) :: ++ on-keen |= [=ship =path] ^+ event-core =+ ~:(spit path) :: assert length =/ ship-state (~(get by peers.ames-state) ship) ?: ?=([~ %known *] ship-state) abet:(on-keen:(abed-peer:pe ship +.u.ship-state) path duct) %+ enqueue-alien-todo ship |= todos=alien-agenda todos(keens (~(put ju keens.todos) path duct)) :: ++ on-cancel-scry |= [all=? =ship =path] ^+ event-core ?~ ship-state=(~(get by peers.ames-state) ship) ~|(%cancel-scry-missing-peer^ship^path !!) ?. ?=([~ %known *] ship-state) :: XX delete from alien agenda? %. event-core %^ trace-fine fin.veb ship [ships.bug.ames-state |.("peer still alien, skip cancel-scry")] =+ peer=(abed:pe ship) ?. (~(has by order.scry.peer-state.peer) path) event-core abet:fi-abet:(fi-unsub:(abed:fi:peer path) duct all) :: +| %implementation :: +enqueue-alien-todo: helper to enqueue a pending request :: :: Also requests key and life from Jael on first request. :: If talking to a comet, requests attestation packet. :: ++ enqueue-alien-todo |= [=ship mutate=$-(alien-agenda alien-agenda)] ^+ event-core :: =/ ship-state (~(get by peers.ames-state) ship) :: create a default $alien-agenda on first contact :: =+ ^- [already-pending=? todos=alien-agenda] ?~ ship-state [%.n *alien-agenda] [%.y ?>(?=(%alien -.u.ship-state) +.u.ship-state)] :: mutate .todos and apply to permanent state :: =. todos (mutate todos) =. peers.ames-state (~(put by peers.ames-state) ship %alien todos) ?: already-pending event-core :: ?: =(%pawn (clan:title ship)) (request-attestation ship) :: NB: we specifically look for this wire in +public-keys-give in :: Jael. if you change it here, you must change it there. :: (emit duct %pass /public-keys %j %public-keys [n=ship ~ ~]) :: +request-attestation: helper to request attestation from comet :: :: Also sets a timer to resend the request every 30s. :: ++ request-attestation |= =ship ^+ event-core =+ (ev-trace msg.veb ship |.("requesting attestion")) =. event-core (send-blob | ship (sendkeys-packet ship)) =/ =wire /alien/(scot %p ship) (emit duct %pass wire %b %wait (add now ~s30)) :: +send-blob: fire packet at .ship and maybe sponsors :: :: Send to .ship and sponsors until we find a direct lane, :: skipping .our in the sponsorship chain. :: :: If we have no PKI data for a recipient, enqueue the packet and :: request the information from Jael if we haven't already. :: ++ send-blob ~/ %send-blob |= [for=? =ship =blob] :: =/ final-ship ship %- (ev-trace rot.veb final-ship |.("send-blob: to {}")) |- |^ ^+ event-core :: =/ ship-state (~(get by peers.ames-state) ship) :: ?. ?=([~ %known *] ship-state) ?: ?=(%pawn (clan:title ship)) (try-next-sponsor (^sein:title ship)) %+ enqueue-alien-todo ship |= todos=alien-agenda todos(packets (~(put in packets.todos) blob)) :: =/ =peer-state +.u.ship-state :: :: XX routing hack to mimic old ames. :: :: Before removing this, consider: moons when their planet is :: behind a NAT; a planet receiving initial acknowledgment :: from a star; a planet talking to another planet under :: another galaxy. :: ?: ?| =(our ship) ?& !=(final-ship ship) !=(%czar (clan:title ship)) == == (try-next-sponsor sponsor.peer-state) :: ?: =(our ship) :: if forwarding, don't send to sponsor to avoid loops :: ?: for event-core (try-next-sponsor sponsor.peer-state) :: if forwarding, route must not be stale :: ?: &(for (lth last-contact.qos.peer-state (sub now ~h1))) (try-next-sponsor sponsor.peer-state) :: ?~ route=route.peer-state %- (ev-trace rot.veb final-ship |.("no route to: {}")) (try-next-sponsor sponsor.peer-state) :: %- (ev-trace rot.veb final-ship |.("trying route: {}")) =. event-core (emit unix-duct.ames-state %give %send lane.u.route blob) :: ?: direct.u.route event-core (try-next-sponsor sponsor.peer-state) :: ++ try-next-sponsor |= sponsor=^ship ^+ event-core :: ?: =(ship sponsor) event-core ^$(ship sponsor) -- :: +attestation-packet: generate signed self-attestation for .her :: :: Sent by a comet on first contact with a peer. Not acked. :: ++ attestation-packet |= [her=ship =her=life] ^- blob %- etch-shot %- etch-open-packet :_ crypto-core.ames-state :* ^= public-key pub:ex:crypto-core.ames-state ^= sndr our ^= sndr-life life.ames-state ^= rcvr her ^= rcvr-life her-life == :: +sendkeys-packet: generate a request for a self-attestation. :: :: Sent by non-comets to comets. Not acked. :: ++ sendkeys-packet |= her=ship ^- blob ?> ?=(%pawn (clan:title her)) %- etch-shot (encode-keys-packet our her life.ames-state) :: +| %internals :: +pe: create nested |peer-core for per-peer processing :: ++ pe |_ [=peer-state =channel] +* veb veb.bug.channel her her.channel scry scry.peer-state :: +| %helpers :: ++ peer-core . ++ pe-emit |=(move peer-core(event-core (emit +<))) ++ abed |=(=ship (abed-peer ship (gut-peer-state ship))) ++ abed-got |=(=ship (abed-peer ship (got-peer-state ship))) ++ abed-peer |= [=ship peer=^peer-state] %_ peer-core peer-state peer channel [[our ship] now channel-state -.peer] == :: ++ abet ^+ event-core =. peers.ames-state (~(put by peers.ames-state) her %known peer-state) event-core :: ++ pe-trace |= [verb=? print=(trap tape)] ^+ same (ev-trace verb her print) :: :: +got-duct: look up $duct by .bone, asserting already bound :: ++ got-duct |= =bone ^- ^duct ~| %dangling-bone^her^bone (~(got by by-bone.ossuary.peer-state) bone) :: +| %tasks :: ++ on-heed peer-core(heeds.peer-state (~(put in heeds.peer-state) duct)) :: ++ on-jilt peer-core(heeds.peer-state (~(del in heeds.peer-state) duct)) :: +update-qos: update and maybe print connection status :: ++ update-qos |= [mode=?(%ames %fine) =new=qos] ^+ peer-core :: =^ old-qos qos.peer-state [qos.peer-state new-qos] :: if no update worth reporting, we're done :: =/ text %^ qos-update-text her mode [old-qos new-qos kay.veb ships.bug.ames-state] ?~ text peer-core :: print message :: =. peer-core (pe-emit duct %pass /qos %d %flog %text u.text) :: if peer has stopped responding, check if %boon's are backing up :: ?. ?=(?(%dead %unborn) -.qos.peer-state) peer-core check-clog :: +on-hear-shut-packet: handle receipt of ack or message fragment :: ++ on-hear-shut-packet |= [=lane =shut-packet dud=(unit goof)] ^+ peer-core :: update and print connection status :: =. peer-core (update-qos %ames %live last-contact=now) :: =/ =bone bone.shut-packet :: ?: ?=(%& -.meat.shut-packet) =+ ?. &(?=(^ dud) msg.veb) ~ %. ~ %- slog :_ tang.u.dud leaf+"ames: {} fragment crashed {}" abet:(call:(abed:mi bone) %hear lane shut-packet ?=(~ dud)) :: benign ack on corked bone :: ?: (~(has in corked.peer-state) bone) peer-core :: Just try again on error, printing trace :: :: Note this implies that vanes should never crash on %done, :: since we have no way to continue using the flow if they do. :: =+ ?~ dud ~ %. ~ %+ slog leaf+"ames: {} ack crashed {}" ?. msg.veb ~ :- >[bone=bone message-num=message-num meat=meat]:shut-packet< tang.u.dud abet:(call:(abed:mu bone) %hear [message-num +.meat]:shut-packet) :: +on-memo: handle request to send message :: ++ on-memo |= [=bone payload=* valence=?(%plea %boon)] ^+ peer-core ?: ?& (~(has in closing.peer-state) bone) !=(payload [%$ /flow %cork ~]) == ~> %slog.0^leaf/"ames: ignoring message on closing bone {}" peer-core ?: (~(has in corked.peer-state) bone) ~> %slog.0^leaf/"ames: ignoring message on corked bone {}" peer-core :: =/ =message-blob (dedup-message (jim payload)) =. peer-core abet:(call:(abed:mu bone) %memo message-blob) :: ?: ?& =(%boon valence) (gte now (add ~s30 last-contact.qos.peer-state)) == check-clog peer-core :: +on-wake: handle timer expiration :: ++ on-wake |= [=bone error=(unit tang)] ^+ peer-core :: if we previously errored out, print and reset timer for later :: :: This really shouldn't happen, but if it does, make sure we :: don't brick either this messaging flow or Behn. :: ?^ error =. peer-core (pe-emit duct %pass /wake-fail %d %flog %crud %ames-wake u.error) :: ?~ message-pump-state=(~(get by snd.peer-state) bone) peer-core =* packet-state packet-pump-state.u.message-pump-state ?~ next-wake.packet-state peer-core :: If we crashed because we woke up too early, assume another :: timer is already set. :: ?: (lth now.channel u.next-wake.packet-state) peer-core :: =/ =wire (make-pump-timer-wire her bone) (pe-emit duct %pass wire %b %wait (add now.channel ~s30)) :: update and print connection state :: =. peer-core (update-qos %ames qos:(is-peer-dead now peer-state)) :: expire direct route if the peer is not responding :: =. peer-state (update-peer-route her peer-state) :: resend comet attestation packet if first message times out :: :: The attestation packet doesn't get acked, so if we tried to :: send a packet but it timed out, maybe they didn't get our :: attestation. :: :: Only resend on timeout of packets in the first message we :: send them, since they should remember forever. :: =? event-core ?& ?=(%pawn (clan:title our)) =(1 current:(~(got by snd.peer-state) bone)) == (send-blob | her (attestation-packet [her her-life]:channel)) ?: (~(has in corked.peer-state) bone) :: if the bone was corked the flow doesn't exist anymore :: TODO: clean up corked bones in the peer state when it's _safe_? :: (e.g. if this bone is N blocks behind the next one) :: peer-core :: maybe resend some timed out packets :: abet:(call:(abed:mu bone) %wake ~) :: ++ on-hear-fine |= [=lane =shot] ^+ peer-core ?> =(sndr-tick.shot (mod life.peer-state 16)) :: =/ [=peep =meow] (sift-purr `@ux`content.shot) =/ =path (slag 3 path.peep) ?. (~(has by order.scry) path) ~&(dead-response/peep peer-core) fi-abet:(fi-rcv:(abed:fi path) peep meow lane) :: ++ on-keen |= [=path =^duct] ^+ peer-core ?: (~(has by order.scry) path) ~> %slog.0^leaf/"fine: dupe {(spud path)}" fi-abet:(fi-sub:(abed:fi path) duct) =^ keen-id=@ud seq.scry [seq.scry +(seq.scry)] =. order.scry (~(put by order.scry) path keen-id) =. keens.scry (put:orm keens.scry keen-id *keen-state) fi-abet:(fi-start:(abed:fi path) duct) :: ++ on-pine |= [=path =^duct] ^+ peer-core ?~ blk=(de-part:balk her rift.peer-state life.peer-state path) !! :: XX: %miss ? =/ =wire [%fine %pine (scot %p her) path] (pe-emit duct %pass wire %a %plea her plea=[%$ /pine `*`u.blk]) :: ++ on-pine-boon |= [=path payload=*] ^+ peer-core ?~ blk=(de-part:balk her rift.peer-state life.peer-state path) !! :: XX: %miss ? =+ ;;(case=@ud payload) =. cas.u.blk ud+case (on-keen (slag 3 (en-path:balk u.blk)) duct) :: +| %implementation :: +dedup-message: replace with any existing copy of this message :: ++ dedup-message |= =message-blob ^+ message-blob ?: (lte (met 13 message-blob) 1) message-blob =/ peers-l=(list [=ship =ship-state]) ~(tap by peers.ames-state) |- ^+ message-blob =* peer-loop $ ?~ peers-l message-blob ?. ?=(%known -.ship-state.i.peers-l) peer-loop(peers-l t.peers-l) =/ snd-l=(list [=bone =message-pump-state]) ~(tap by snd.ship-state.i.peers-l) |- ^+ message-blob =* bone-loop $ ?~ snd-l peer-loop(peers-l t.peers-l) =* unsent-fragments unsent-fragments.message-pump-state.i.snd-l =/ blob-l=(list ^message-blob) ~(tap to unsent-messages.message-pump-state.i.snd-l) |- ^+ message-blob =* blob-loop $ ?^ blob-l ?: =(i.blob-l message-blob) i.blob-l blob-loop(blob-l t.blob-l) ?~ unsent-fragments bone-loop(snd-l t.snd-l) ?: =(message-blob fragment.i.unsent-fragments) `@`fragment.i.unsent-fragments bone-loop(snd-l t.snd-l) :: +check-clog: notify clients if peer has stopped responding :: ++ check-clog ^+ peer-core :: :: Only look at response bones. Request bones are unregulated, :: since requests tend to be much smaller than responses. :: =/ pumps=(list message-pump-state) %+ murn ~(tap by snd.peer-state) |= [=bone =message-pump-state] ?: =(0 (end 0 bone)) ~ `u=message-pump-state :: if clogged, notify client vane :: |^ ?. &(nuf-messages nuf-memory) peer-core %+ roll ~(tap in heeds.peer-state) |=([d=^duct core=_peer-core] (pe-emit:core d %give %clog her)) :: +nuf-messages: are there enough messages to mark as clogged? :: ++ nuf-messages =| num=@ud |- ^- ? ?~ pumps | =. num ;: add num (sub [next current]:i.pumps) ~(wyt in unsent-messages.i.pumps) == ?: (gte num msg.cong.ames-state) & $(pumps t.pumps) :: +nuf-memory: is enough memory used to mark as clogged? :: ++ nuf-memory =| mem=@ud |- ^- ? ?~ pumps | =. mem %+ add %- ~(rep in unsent-messages.i.pumps) |=([a=@ b=_mem] (add b (met 3 a))) ?~ unsent-fragments.i.pumps 0 (met 3 fragment.i.unsent-fragments.i.pumps) ?: (gte mem mem.cong.ames-state) & $(pumps t.pumps) -- :: +send-shut-packet: fire encrypted packet at rcvr and maybe sponsors :: ++ send-shut-packet |= =shut-packet ^+ peer-core :: swizzle last bone bit before sending :: :: The peer has the opposite perspective from ours about what :: kind of flow this is (forward/backward), so flip the bit :: here. :: =. event-core %^ send-blob | her %- etch-shot %: etch-shut-packet shut-packet(bone (mix 1 bone.shut-packet)) symmetric-key.channel our her our-life.channel her-life.channel == peer-core :: +recork-one: re-send the next %cork to the peer :: ++ recork-one ^+ peer-core =/ boz (sort ~(tap in closing.peer-state) lte) |- ^+ peer-core ?~ boz peer-core =/ pum=message-pump-state (~(got by snd.peer-state) i.boz) ?. =(next current):pum $(boz t.boz) :: sanity check on the message pump state :: ?. ?& =(~ unsent-messages.pum) =(~ unsent-fragments.pum) =(~ live.packet-pump-state.pum) == ~> %slog.0^leaf/"ames: bad pump state {}" $(boz t.boz) :: no outstanding messages, so send a new %cork :: :: TODO use +trace ~> %slog.0^leaf/"ames: recork {}" =/ =plea [%$ /flow [%cork ~]] (on-memo i.boz plea %plea) :: :: +handle-cork: handle flow kill after server ames has taken %done :: ++ handle-cork |= =bone ^+ peer-core ?. (~(has in closing.peer-state) bone) peer-core =/ =message-pump-state (~(gut by snd.peer-state) bone *message-pump-state) =? peer-core ?=(^ next-wake.packet-pump-state.message-pump-state) =* next-wake u.next-wake.packet-pump-state.message-pump-state =/ =wire (make-pump-timer-wire her bone) :: resetting timer for boons :: (pe-emit [/ames]~ %pass wire %b %rest next-wake) =/ nax-bone=^bone (mix 0b10 bone) =? peer-core (~(has by snd.peer-state) nax-bone) %. peer-core %+ pe-trace odd.veb |.("remove naxplanation flow {<[her bone=nax-bone]>}") =. peer-state =, peer-state %_ peer-state :: preemptively delete nax flows (e.g. nacks for %watches) :: snd (~(del by (~(del by snd) bone)) nax-bone) rcv (~(del by rcv) bone) corked (~(put in corked) bone) closing (~(del in closing) bone) == peer-core :: +| %internals :: +mu: constructor for |pump message sender core :: ++ mu |_ [=bone state=message-pump-state] :: +| %helpers :: ++ pump . ++ abed |= b=^bone pump(bone b, state (~(gut by snd.peer-state) b *message-pump-state)) ++ abet :: if the bone was corked, it's been removed from the state, :: so we avoid adding it again. :: =? snd.peer-state !corked (~(put by snd.peer-state) bone state) peer-core :: ++ packet-pump (pu packet-pump-state.state) ++ closing (~(has in closing.peer-state) bone) ++ corked (~(has in corked.peer-state) bone) :: +is-message-num-in-range: %.y unless duplicate or future ack :: ++ is-message-num-in-range |= =message-num ^- ? :: ?: (gte message-num next.state) %.n ?: (lth message-num current.state) %.n !(~(has by queued-message-acks.state) message-num) :: +| %entry-points :: +call: handle a $message-pump-task :: ++ call |= task=message-pump-task ^+ pump :: =. pump =~((dispatch-task task) feed-packets) =+ top=top-live:packet-pump :: sanity check to isolate error cases :: ?. |(?=(~ top) (lte current.state message-num.key.u.top)) ~|([%strange-current current=current.state key.u.top] !!) :: maybe trigger a timer based on congestion control calculations :: abet:(call:packet-pump %halt ~) :: +| %tasks :: +dispatch-task: perform task-specific processing :: ++ dispatch-task |= task=message-pump-task ^+ pump :: ?- -.task %memo (on-memo message-blob.task) %prod abet:(call:packet-pump %prod ~) %wake abet:(call:packet-pump %wake current.state) %near %- on-done [[message-num %naxplanation error]:naxplanation.task %&] %hear ?- -.ack-meat.task %& (on-hear [message-num fragment-num=p.ack-meat]:task) :: %| =/ cork=? =+ top=top-live:packet-pump :: If we send a %cork and get an ack, we can know by :: sequence number that the ack is for the %cork message :: ?& closing ?=(^ top) =(0 ~(wyt in unsent-messages.state)) =(0 (lent unsent-fragments.state)) =(1 ~(wyt by live.packet-pump-state.state)) =(message-num:task message-num.key.u.top) == =+ [ack msg]=[p.ack-meat message-num]:task =. pump %- on-done [[msg ?:(ok.ack [%ok ~] [%nack ~])] cork] ?. &(!ok.ack cork) pump %. pump %+ pe-trace odd.veb |.("got nack for %cork {}") == == :: +on-memo: handle request to send a message :: ++ on-memo |= blob=message-blob pump(unsent-messages.state (~(put to unsent-messages.state) blob)) :: +on-hear: handle packet acknowledgment :: ++ on-hear |= [=message-num =fragment-num] ^+ pump :: pass to |packet-pump unless duplicate or future ack :: ?. (is-message-num-in-range message-num) %. pump (pe-trace snd.veb |.("hear pump out of range")) abet:(call:packet-pump %hear message-num fragment-num) :: +on-done: handle message acknowledgment :: :: A nack-trace message counts as a valid message nack on the :: original failed message. :: :: This prevents us from having to wait for a message nack packet, :: which would mean we couldn't immediately ack the nack-trace :: message, which would in turn violate the semantics of backward :: flows. :: ++ on-done |= [[=message-num =ack] cork=?] ^+ pump :: unsent messages from the future should never get acked :: ~| :* bone=bone mnum=message-num next=next.state unsent-messages=~(wyt in unsent-messages.state) unsent-fragments=(lent unsent-fragments.state) any-live=!=(~ live.packet-pump-state.state) == ?> (lth message-num next.state) :: ignore duplicate message acks :: ?: (lth message-num current.state) %. pump %+ pe-trace snd.veb |. "duplicate done {}" :: ignore duplicate and future acks :: ?. (is-message-num-in-range message-num) pump :: clear and print .unsent-fragments if nonempty :: =? unsent-fragments.state &(=(current next) ?=(^ unsent-fragments)):state :: ~> %slog.0^leaf/"ames: early message ack {}" ~ :: clear all packets from this message from the packet pump :: =. pump abet:(call:packet-pump %done message-num lag=*@dr) :: enqueue this ack to be sent back to local client vane :: :: Don't clobber a naxplanation with just a nack packet. :: =? queued-message-acks.state =/ old (~(get by queued-message-acks.state) message-num) !?=([~ %naxplanation *] old) (~(put by queued-message-acks.state) message-num ack) :: emit local acks from .queued-message-acks until incomplete :: |- ^+ pump :: if .current hasn't been fully acked, we're done :: ?~ cur=(~(get by queued-message-acks.state) current.state) pump :: .current is complete; pop, emit local ack, and try next message :: =. queued-message-acks.state (~(del by queued-message-acks.state) current.state) :: clear all packets from this message from the packet pump :: :: Note we did this when the original packet came in, a few lines :: above. It's not clear why, but it doesn't always clear the :: packets when it's not the current message. As a workaround, :: we clear the packets again when we catch up to this packet. :: :: This is slightly inefficient because we run this twice for :: each packet and it may emit a few unnecessary packets, but :: it's not incorrect. pump-metrics are updated only once, :: at the time when we actually delete the packet. :: =. pump abet:(call:packet-pump %done current.state lag=*@dr) :: give %done to vane if we're ready :: ?- -.u.cur %ok =. peer-core :: don't give %done for corks :: ?: cork (pump-cork current.state) (pump-done current.state ~) $(current.state +(current.state)) :: %nack pump :: %naxplanation =. peer-core (pump-done current.state `error.u.cur) $(current.state +(current.state)) == :: +| %implementation :: +feed-packets: give packets to |packet-pump until full :: ++ feed-packets :: if nothing to send, no-op :: ?: &(=(~ unsent-messages) =(~ unsent-fragments)):state pump :: we have unsent fragments of the current message; feed them :: ?. =(~ unsent-fragments.state) :: we have unsent fragments of the current message; feed them :: =^ unsent pump abut:(feed:packet-pump unsent-fragments.state) =. unsent-fragments.state unsent :: if it sent all of them, feed it more; otherwise, we're done :: ?~(unsent feed-packets pump) :: .unsent-messages is nonempty; pop a message off and feed it :: =^ =message-blob unsent-messages.state ~(get to unsent-messages.state) :: break .message into .chunks and set as .unsent-fragments :: =. unsent-fragments.state (split-message next.state message-blob) :: try to feed packets from the next message :: =. next.state +(next.state) feed-packets :: +pump-done: handle |message-pump's report of message (n)ack :: ++ pump-done |= [=message-num error=(unit error)] ^+ peer-core ?: ?& =(1 (end 0 bone)) =(1 (end 0 (rsh 0 bone))) (~(has in corked.peer-state) (mix 0b10 bone)) == %- %+ pe-trace msg.veb =/ dat [her bone=bone message-num=message-num -.task] |.("remove naxplanation flow {}") :: XX we avoid re-adding the bone in abet:mu; test that it works :: =. snd.peer-state (~(del by snd.peer-state) bone) peer-core :: if odd bone, ack is on "subscription update" message; no-op :: ?: =(1 (end 0 bone)) peer-core :: even bone; is this bone a nack-trace bone? :: ?: =(1 (end 0 (rsh 0 bone))) :: nack-trace bone; assume .ok, clear nack from |message-sink :: abet:(call:(abed:mi (mix 0b10 bone)) %drop message-num) ?: &(closing ?=(%near -.task)) :: if the bone belongs to a closing flow and we got a :: naxplanation, don't relay ack to the client vane :: peer-core :: not a nack-trace bone; relay ack to client vane :: (pe-emit (got-duct bone) %give %done error) :: XX impure +abet pattern :: +pump-cork: kill flow on cork sender side :: ++ pump-cork |= =message-num ^+ peer-core :: clear all packets from this message from the packet pump :: =. pump abet:(call:packet-pump %done message-num lag=*@dr) ?: corked %- %+ pe-trace odd.veb |.("trying to delete a corked bone={}") peer-core =/ nack-bone=^bone (mix 0b10 bone) =? rcv.peer-state (~(has by rcv.peer-state) nack-bone) :: if the publisher was behind we remove nacks on that bone :: (~(del by rcv.peer-state) nack-bone) =. peer-state =, peer-state %_ peer-state snd (~(del by snd) bone) rcv (~(del by rcv) bone) corked (~(put in corked) bone) closing (~(del in closing) bone) by-duct.ossuary (~(del by by-duct.ossuary) (got-duct bone)) by-bone.ossuary (~(del by by-bone.ossuary) bone) == :: since we got one cork ack, try the next one :: recork-one :: +pu: construct |packet-pump core :: ++ pu |= state=packet-pump-state :: =| unsent=(list static-fragment) |% +| %helpers ++ pack . :: +abut: abet with gifts :: ++ abut [unsent abet] ++ abet pump(packet-pump-state.state state) ++ pu-trace |= [verb=? print=(trap tape)] ^+ same (trace %ames verb her ships.bug.channel print) :: ++ pu-wire (make-pump-timer-wire her bone) ++ pu-emit |=(=note (pe-emit pump-duct %pass pu-wire note)) :: +packet-queue: type for all sent fragments (order: seq number) :: ++ packet-queue %- (ordered-map live-packet-key live-packet-val) lte-packets :: +gauge: inflate a |pump-gauge to track congestion control :: ++ gauge (ga metrics.state ~(wyt by live.state)) :: +to-static-fragment: convenience function for |packet-pump :: ++ to-static-fragment |= [live-packet-key live-packet-val] ^- static-fragment [message-num num-fragments fragment-num fragment] :: ++ pump-duct ~[/ames] ++ top-live (pry:packet-queue live.state) :: +| %entry-points :: ++ call |= task=packet-pump-task ^+ pack ?- -.task %hear (on-hear [message-num fragment-num]:task) %done (on-done message-num.task) %wake (on-wake current.task) %prod on-prod %halt set-wake == :: +feed: try to send a list of packets, returning unsent ones :: ++ feed |= fragments=(list static-fragment) ^+ pack :: bite off as many fragments as we can send :: =/ num-slots num-slots:gauge =/ sent (scag num-slots fragments) =. unsent (slag num-slots fragments) :: if nothing to send, we're done :: ?~ sent pack :: convert $static-fragment's into +ordered-set [key val] pairs :: =/ send-list %+ turn sent |= static-fragment ^- [key=live-packet-key val=live-packet-val] :: :- [message-num fragment-num] :- [sent-date=now.channel tries=1 skips=0] [num-fragments fragment] :: update .live and .metrics :: =. live.state (gas:packet-queue live.state send-list) :: TMI :: => .(sent `(list static-fragment)`sent) :: emit a $shut-packet for each packet to send :: =. peer-core %+ roll sent |= [packet=static-fragment core=_peer-core] (send-shut-packet bone [message-num %& +]:packet) pack :: +| %tasks :: +on-prod: reset congestion control, re-send packets :: ++ on-prod ^+ pack ?: =(~ next-wake.state) pack :: =. metrics.state %*(. *pump-metrics counter counter.metrics.state) =. live.state %+ run:packet-queue live.state |=(p=live-packet-val p(- *packet-state)) :: =/ sot (max 1 num-slots:gauge) =/ liv live.state |- ^+ pack ?: =(0 sot) pack ?: =(~ liv) pack =^ hed liv (pop:packet-queue liv) =. peer-core %+ send-shut-packet bone [message-num %& +]:(to-static-fragment hed) $(sot (dec sot)) :: +on-wake: handle packet timeout :: ++ on-wake |= current=message-num ^+ pack :: assert temporal coherence :: ?< =(~ next-wake.state) =. next-wake.state ~ :: tell congestion control a packet timed out :: =. metrics.state on-timeout:gauge =| acc=(unit static-fragment) :: re-send first packet and update its state in-place :: =; [static-fragment=_acc live=_live.state] =. live.state live =? peer-core ?=(^ static-fragment) %- %+ pu-trace snd.veb =/ nums [message-num fragment-num]:u.static-fragment |.("dead {}") (send-shut-packet bone [message-num %& +]:u.static-fragment) pack :: %^ (dip:packet-queue _acc) live.state acc |= $: acc=_acc key=live-packet-key val=live-packet-val == ^- [new-val=(unit live-packet-val) stop=? _acc] :: if already acked later message, don't resend :: ?: (lth message-num.key current) %. [~ stop=%.n ~] %- slog :_ ~ :- %leaf "ames: strange wake queue, expected {}, got {}" :: packet has expired; update it in-place, stop, and produce it :: =. last-sent.val now.channel =. tries.val +(tries.val) :: [`val stop=%.y `(to-static-fragment key val)] :: +fast-resend-after-ack: resend timed out packets :: :: After we finally receive an ack, we want to resend all the :: live packets that have been building up. :: ++ fast-resend-after-ack |= [=message-num =fragment-num] ^+ pack =; res=[resends=(list static-fragment) live=_live.state] =. live.state live.res =. peer-core %+ reel resends.res |= [packet=static-fragment core=_peer-core] (send-shut-packet bone [message-num %& +]:packet) pack :: =/ acc resends=*(list static-fragment) :: %^ (dip:packet-queue _acc) live.state acc |= $: acc=_acc key=live-packet-key val=live-packet-val == ^- [new-val=(unit live-packet-val) stop=? _acc] ?: (lte-packets key [message-num fragment-num]) [new-val=`val stop=%.n acc] :: ?: (gth (next-expiry:gauge -.val) now.channel) [new-val=`val stop=%.y acc] :: =. last-sent.val now.channel =. resends.acc [(to-static-fragment key val) resends.acc] [new-val=`val stop=%.n acc] :: +on-hear: handle ack on a live packet :: :: If the packet was in our queue, delete it and update our :: metrics, possibly re-sending skipped packets. Otherwise, no-op :: ++ on-hear |= [=message-num =fragment-num] ^+ pack :: =- :: if no sent packet matches the ack, :: don't apply mutations or effects :: ?. found.- %- (pu-trace snd.veb |.("miss {}")) pack :: =. metrics.state metrics.- =. live.state live.- %- ?. ?| =(0 fragment-num) =(0 (mod counter.metrics.state 20)) == same %+ pu-trace snd.veb |.("send: {}") :: .resends is backward, so fold backward and emit :: =. peer-core %+ reel resends.- |= [packet=static-fragment core=_peer-core] (send-shut-packet bone [message-num %& +]:packet) (fast-resend-after-ack message-num fragment-num) :: =/ acc :* found=`?`%.n resends=*(list static-fragment) metrics=metrics.state == :: ^+ [acc live=live.state] :: %^ (dip:packet-queue _acc) live.state acc |= $: acc=_acc key=live-packet-key val=live-packet-val == ^- [new-val=(unit live-packet-val) stop=? _acc] :: =/ gauge (ga metrics.acc ~(wyt by live.state)) :: is this the acked packet? :: ?: =(key [message-num fragment-num]) :: delete acked packet, update metrics, and stop traversal :: =. found.acc %.y =. metrics.acc (on-ack:gauge -.val) [new-val=~ stop=%.y acc] :: is this a duplicate ack? :: ?. (lte-packets key [message-num fragment-num]) :: stop, nothing more to do :: [new-val=`val stop=%.y acc] :: ack was on later packet; mark skipped, tell gauge, & continue :: =. skips.val +(skips.val) =^ resend metrics.acc (on-skipped-packet:gauge -.val) ?. resend [new-val=`val stop=%.n acc] :: =. last-sent.val now.channel =. tries.val +(tries.val) =. resends.acc [(to-static-fragment key val) resends.acc] [new-val=`val stop=%.n acc] :: +on-done: apply ack to all packets from .message-num :: ++ on-done |= =message-num ^+ pack :: =- =. metrics.state metrics.- =. live.state live.- :: %. (fast-resend-after-ack message-num `fragment-num`0) (pu-trace snd.veb |.("done {}")) :: ^+ [metrics=metrics.state live=live.state] :: %^ (dip:packet-queue pump-metrics) live.state acc=metrics.state |= $: metrics=pump-metrics key=live-packet-key val=live-packet-val == ^- [new-val=(unit live-packet-val) stop=? pump-metrics] :: =/ gauge (ga metrics ~(wyt by live.state)) :: if we get an out-of-order ack for a message, skip until it :: ?: (lth message-num.key message-num) [new-val=`val stop=%.n metrics] :: if packet was from acked message, delete it and continue :: ?: =(message-num.key message-num) [new-val=~ stop=%.n metrics=(on-ack:gauge -.val)] :: we've gone past the acked message; we're done :: [new-val=`val stop=%.y metrics] :: +set-wake: set, unset, or reset timer, emitting moves :: ++ set-wake ^+ pack :: if nonempty .live, pry at head to get next wake time :: =/ new-wake=(unit @da) ?~ head=(pry:packet-queue live.state) ~ `(next-expiry:gauge -.val.u.head) :: no-op if no change :: ?: =(new-wake next-wake.state) pack :: unset old timer if non-null :: =? peer-core !=(~ next-wake.state) (pu-emit %b %rest (need next-wake.state)) :: set new timer if non-null :: =? peer-core ?=(^ new-wake) (pu-emit %b %wait u.new-wake) :: =? next-wake.state !=(~ next-wake.state) ~ :: unset =? next-wake.state ?=(^ new-wake) new-wake :: reset :: pack -- -- :: +mi: constructor for |sink message receiver core :: ++ mi |_ [=bone state=message-sink-state] :: +| %helpers :: ++ sink . ++ abed |= b=^bone sink(bone b, state (~(gut by rcv.peer-state) b *message-sink-state)) ++ abet :: if the bone was corked, it's been removed from the state, :: so we avoid adding it again. :: =? rcv.peer-state !corked (~(put by rcv.peer-state) bone state) peer-core :: ++ closing (~(has in closing.peer-state) bone) ++ corked (~(has in corked.peer-state) bone) ++ received |= =^bone :: odd bone: %plea request message :: even bone, 0 second bit: %boon response message :: even bone, 1 second bit: nack-trace %boon message :: ?: =(1 (end 0 bone)) %plea ?: =(0 (end 0 (rsh 0 bone))) %boon %nack :: +| %entry-points :: +call: handle a $message-sink-task :: ++ call |= task=message-sink-task ^+ sink ?- -.task %drop sink(nax.state (~(del in nax.state) message-num.task)) %done (done ok.task) :: %hear ?. ?| corked ?& %*(corked sink bone (mix 0b10 bone)) =(%nack (received bone)) == == (hear [lane shut-packet ok]:task) :: if we %hear a task on a corked bone, always ack :: =. peer-core %+ send-shut-packet bone [message-num.shut-packet.task %| %| ok=& lag=*@dr] %. sink %+ pe-trace odd.veb |.("hear {<(received bone)>} on corked bone={}") == :: +| %tasks :: +hear: receive message fragment, possibly completing message :: ++ hear |= [=lane =shut-packet ok=?] ^+ sink :: we know this is a fragment, not an ack; expose into namespace :: ?> ?=(%& -.meat.shut-packet) =+ [num-fragments fragment-num fragment]=+.meat.shut-packet :: seq: message sequence number, for convenience :: =/ seq message-num.shut-packet :: ignore messages from far future; limit to 10 in progress :: ?: (gte seq (add 10 last-acked.state)) %- %+ pe-trace odd.veb |.("future %hear {}") sink :: =/ is-last-fragment=? =(+(fragment-num) num-fragments) :: always ack a dupe! :: ?: (lte seq last-acked.state) ?. is-last-fragment :: single packet ack :: =. peer-core (send-shut-packet bone seq %| %& fragment-num) %. sink %+ pe-trace rcv.veb |.("send dupe ack {}") :: whole message (n)ack :: =/ ok=? !(~(has in nax.state) seq) =. peer-core (send-shut-packet bone seq %| %| ok lag=`@dr`0) %. sink %+ pe-trace rcv.veb |.("send dupe message ack {} ok={}") :: last-acked (gth num-fragments.u.existing fragment-num) ?> =(num-fragments.u.existing num-fragments) :: u.existing :: =/ already-heard-fragment=? (~(has by fragments.partial-rcv-message) fragment-num) :: ack dupes except for the last fragment, in which case drop :: ?: already-heard-fragment ?: is-last-fragment %- %+ pe-trace rcv.veb |. =/ data [her seq=seq lh=last-heard.state la=last-acked.state] "hear last dupe {}" sink =. peer-core (send-shut-packet bone seq %| %& fragment-num) %. sink %+ pe-trace rcv.veb |.("send dupe ack {}") :: new fragment; store in state and check if message is done :: =. num-received.partial-rcv-message +(num-received.partial-rcv-message) :: =. fragments.partial-rcv-message (~(put by fragments.partial-rcv-message) fragment-num fragment) :: =. live-messages.state (~(put by live-messages.state) seq partial-rcv-message) :: ack any packet other than the last one, and continue either way :: =? peer-core !is-last-fragment %- %+ pe-trace rcv.veb |. =/ data [seq=seq fragment-num=fragment-num fragments=num-fragments] "send ack-2 {}" (send-shut-packet bone seq %| %& fragment-num) :: enqueue all completed messages starting at +(last-heard.state) :: |- ^+ sink :: if this is not the next message to ack, we're done :: ?. =(seq +(last-heard.state)) sink :: if we haven't heard anything from this message, we're done :: ?~ live=(~(get by live-messages.state) seq) sink :: if the message isn't done yet, we're done :: ?. =(num-received num-fragments):u.live sink :: we have whole message; update state, assemble, and send to vane :: =. last-heard.state +(last-heard.state) =. live-messages.state (~(del by live-messages.state) seq) :: %- %+ pe-trace msg.veb |.("hear {} {} {}kb") =/ message=* (assemble-fragments [num-fragments fragments]:u.live) =/ empty=? =(~ pending-vane-ack.state) :: enqueue message to be sent to local vane :: =. pending-vane-ack.state (~(put to pending-vane-ack.state) seq message) :: =? sink empty (handle-sink seq message ok) :: $(seq +(seq)) :: +done: handle confirmation of message processing from vane :: ++ done |= ok=? ^+ sink :: =^ pending pending-vane-ack.state ~(get to pending-vane-ack.state) =/ =message-num message-num.p.pending :: =. last-acked.state +(last-acked.state) =? nax.state !ok (~(put in nax.state) message-num) :: =. peer-core (send-shut-packet bone message-num %| %| ok lag=`@dr`0) ?~ next=~(top to pending-vane-ack.state) sink (handle-sink message-num.u.next message.u.next ok) :: +| %implementation :: +handle-sink: dispatch message :: ++ handle-sink |= [=message-num message=* ok=?] ^+ sink |^ ?-((received bone) %plea ha-plea, %boon ha-boon, %nack ha-nack) :: ++ ha-plea ^+ sink ?: |(closing corked) sink %- %+ pe-trace msg.veb =/ dat [her bone=bone message-num=message-num] |.("sink plea {}") =; pe=_peer-core =. peer-core pe =? sink !ok (call %done ok=%.n) sink ?. ok :: send nack-trace with blank .error for security :: =/ nack-bone=^bone (mix 0b10 bone) =/ =message-blob (jam [message-num *error]) abet:(call:(abed:mu nack-bone) %memo message-blob) =+ ;; =plea message =/ =wire (make-bone-wire her her-rift.channel bone) :: ?. =(vane.plea %$) ?+ vane.plea ~| %ames-evil-vane^our^her^vane.plea !! %c (pe-emit duct %pass wire %c %plea her plea) %g (pe-emit duct %pass wire %g %plea her plea) %j (pe-emit duct %pass wire %j %plea her plea) == ?: ?=(%pine -.path.plea) (pe-emit duct %pass wire %a %plea our plea(vane %a, path /cone)) :: a %cork plea is handled using %$ as the recipient vane to :: account for publishers that still handle ames-to-ames %pleas :: ?> &(?=([%cork *] payload.plea) ?=(%flow -.path.plea)) :: XX FIXME impure +abet pattern... :: =. closing.peer-state (~(put in closing.peer-state) bone) (pe-emit duct %pass wire %a %plea her [%a /close ~]) :: :: +ha-boon: handle response message, acking unconditionally :: :: .bone must be mapped in .ossuary.peer-state, or we crash. :: This means a malformed message will kill a flow. We :: could change this to a no-op if we had some sort of security :: reporting. :: :: Note that if we had several consecutive packets in the queue :: and crashed while processing any of them, the %hole card :: will turn *all* of them into losts/nacks. :: :: TODO: This handles a previous crash in the client vane, but :: not in %ames itself. :: ++ ha-boon ^+ sink ?: |(closing corked) sink %- %+ pe-trace msg.veb |. :: XX -.task not visible, FIXME :: =/ dat [her bone=bone message-num=message-num] ?:(ok "sink boon {}" "crashed on sink boon {}") =? moves !ok :: we previously crashed on this message; notify client vane :: %+ turn moves |= =move ?. ?=([* %give %boon *] move) move [duct.move %give %lost ~] :: =. peer-core (pe-emit (got-duct bone) %give %boon message) :: send ack unconditionally :: (call %done ok=%.y) :: ++ ha-nack ^+ sink :: if we get a naxplanation for a %cork, the publisher hasn't :: received the OTA. The /recork timer will retry eventually. :: %- %+ pe-trace msg.veb =/ dat [her bone=bone message-num=message-num] |.("sink naxplanation {}") :: flip .bone's second bit to find referenced flow :: =/ target=^bone (mix 0b10 bone) =. peer-core :: notify |message-pump that this message got naxplained :: abet:(call:(abed:mu target) %near ;;(naxplanation message)) :: ack nack-trace message (only applied if we don't later crash) :: (call %done ok=%.y) -- -- :: +fi: constructor for |fine remote scry core :: ++ fi => |% :: +gum: glue together a list of $byts into one :: :: TODO: move to hoon.hoon (see +cad in lib/tiny) :: ++ gum ::~/ %gum |= biz=(list byts) ^- byts :- (roll biz |=([[wid=@ *] acc=@] (add wid acc))) (can 3 biz) :: ++ etch-peep |= [=path num=@ud] ^- byts ?> (lth num (bex 32)) =+ (spit path) %- gum :~ 4^num :: fragment number 2^wid :: path size wid^`@`pat :: namespace path == :: ++ etch-keen |= [=path num=@ud] ^- hoot ^- @ =/ sic (mod life.ames-state 16) =/ ric (mod life.peer-state 16) =/ syn =/ bod (etch-peep path num) =/ sig 64^(sign:keys dat.bod) (can 3 sig bod ~) (etch-shot [our her] req=& sam=| sic ric ~ syn) :: ++ keys |% ++ mess |= [=ship life=@ud =path dat=$@(~ (cask))] (jam +<) :: ++ sign sigh:as:crypto-core.ames-state :: ++ veri-fra |= [=path fra=@ud dat=@ux sig=@] (veri sig (jam path fra dat)) :: ++ veri |= [sig=@ dat=@] ^- ? (safe:as:(com:nu:crub:crypto public-key.peer-state) sig dat) :: ++ meri |= [pax=path sig=@ dat=$@(~ (cask))] (veri sig (mess her life.peer-state pax dat)) -- -- :: |_ [=path keen-id=@ud keen=keen-state] :: +| %helpers :: ++ fine . ++ abed |= p=^path ~| no-keen-for-path/p =. keen-id (~(got by order.scry) p) fine(path p, keen (got:orm keens.scry keen-id)) :: ++ abed-id :: XX not used |= id=@ud %- abed ~| no-path-for-id/id %- need ^- (unit ^path) %- ~(rep by order.scry) |= [[p=^path i=@ud] out=(unit ^path)] ^- (unit ^path) ?^ out out ?:(=(id i) `p ~) :: ++ fi-abet ^+ peer-core ?: =, keen :: num-fragments is 0 when unknown (i.e. no response yet) :: if no-one is listening, kill request :: ?| =(~ listeners.keen) &(!=(0 num-fragments) =(num-fragments num-received)) == abet-gone =. fine fi-set-wake =. keens.scry (put:orm keens.scry keen-id keen) peer-core :: ++ abet-gone =? fine ?=(^ next-wake.keen) (fi-rest u.next-wake.keen) =. keens.scry +:(del:orm keens.scry keen-id) =. order.scry (~(del by order.scry) path) peer-core :: ++ fi-full-path :^ (scot %p her) (scot %ud rift.peer-state) (scot %ud life.peer-state) path :: ++ fi-show =, keen :* nex=(lent nex) hav=(lent hav) num-fragments=num-fragments num-received=num-received next-wake=next-wake metrics=metrics == :: ++ fi-trace |= [verb=? print=(trap tape)] ^+ same (trace %fine verb her ships.bug.ames-state print) :: ++ fi-deq (deq want) ++ fi-gauge (ga metrics.keen (wyt:fi-deq wan.keen)) ++ fi-wait |=(tim=@da (fi-pass-timer %b %wait tim)) ++ fi-rest |=(tim=@da (fi-pass-timer %b %rest tim)) ++ fi-etch-keen |=(frag=@ud (etch-keen fi-full-path frag)) ++ fi-send |=(=hoot fine(event-core (send-blob for=| her `@ux`hoot))) :: ++ fi-clean-up |= [=^duct core=_event-core] :: at the moment only %spider keeps state about remote scries :: ?+ duct core [[%gall %use %spider @ ship=@ %thread tid=@ *] *] =/ =cage spider-stop+!>([&7.-.duct |]) =/ poke=* [%0 %m [p q.q]:cage] =/ =plea [%g /ge/spider poke] (emit:core duct %pass /fine/unsub %g %plea our plea) == :: +| %entry-points :: ++ fi-start |= =^duct %- (fi-trace fin.veb |.("keen {(spud fi-full-path)}")) =. fine (fi-sub duct) ?> =(num-fragments.keen 0) =/ fra=@ 1 =/ req=hoot (fi-etch-keen fra) =/ =want [fra req last=now tries=1 skips=0] =. wan.keen (cons:fi-deq *(pha ^want) want) (fi-send req) :: ++ fi-rcv |= [[=full=^path num=@ud] =meow =lane:ames] ^+ fine =/ og fine =. peer-core (update-qos %fine %live last-contact=now) :: handle empty ?: =(0 num.meow) ?> =(~ dat.meow) (fi-done sig.meow ~) :: update congestion, or fill details :: =? fine =(0 num-fragments.keen) ?> =(num 1) (fi-first-rcv meow) :: ?. ?=([@ @ @ *] full-path) ~| fine-path-too-short+full-path !! ?. =(`her (slaw %p i.full-path)) ~| fine-path-bunk-ship+[full-path her] !! ?. =(`life.peer-state (slaw %ud i.t.t.full-path)) ~| fine-path-bunk-life+[full-path life.peer-state] !! ?. =(`rift.peer-state (slaw %ud i.t.full-path)) ~| fine-path-bunk-rift+[full-path rift.peer-state] !! ?. (veri-fra:keys [full-path num [dat sig]:meow]) ~| fine-purr-fail-signature/num^`@ux`sig.meow ~| life.peer-state !! :: =^ found=? fine (fi-on-ack num) ?. found (fi-fast-retransmit:og num) =: hav.keen [[num meow] hav.keen] num-received.keen +(num-received.keen) == ?. =(num-fragments num-received):keen fi-continue (fi-done [sig dat]:fi-sift-full) :: ++ fi-sub |=(=^duct fine(listeners.keen (~(put in listeners.keen) duct))) :: scry is autocancelled in +abet if no more listeners :: ++ fi-unsub |= [=^duct all=?] ?. |(all (~(has in listeners.keen) duct)) %. fine (fi-trace fin.veb |.("{} not a listener for {}")) =? event-core all :: notify all listeners by inspecting their :: ducts and sending appropiate clean up moves :: (~(rep in listeners.keen) fi-clean-up) %- (fi-trace fin.veb |.("deleting {}")) fine(listeners.keen ?:(all ~ (~(del in listeners.keen) duct))) :: +| %implementation :: ++ fi-on-ack =| marked=(list want) |= fra=@ud ^- [? _fine] =; [[found=? cor=_fine] wan=(pha want)] :- found ?.(found fine cor(wan.keen wan)) %^ (dip-left:fi-deq ,[found=? cor=_fine]) wan.keen [| fine] |= [[found=? cor=_fine] =want] ^- [(unit _want) stop=? [found=? cor=_fine]] =. fine cor ?: =(fra fra.want) =. metrics.keen (on-ack:fi-gauge +>.want) [~ %.y %.y fine] =. skips.want +(skips.want) =^ resend=? metrics.keen (on-skipped-packet:fi-gauge +>.want) ?. resend [`want %.n found fine] =. tries.want +(tries.want) =. last-sent.want now =. fine (fi-send hoot.want) [`want %.n found fine] :: ++ fi-done |= [sig=@ data=$@(~ (cask))] ?> (meri:keys fi-full-path sig data) %- (fi-trace fin.veb |.("done {(spud fi-full-path)}")) =/ listeners=(list ^duct) ~(tap in listeners.keen) =/ dat=(unit (cask)) ?~(data ~ `data) |- ^+ fine ?~ listeners fine =. event-core (emit i.listeners %give %tune fi-full-path sig dat) $(listeners t.listeners) :: ++ fi-first-rcv |= =meow ^+ fine :: =; paz=(list want) fine(keen keen(num-fragments num.meow, nex (tail paz))) %+ turn (gulf 1 num.meow) |= fra=@ud ^- want [fra (fi-etch-keen fra) now 0 0] :: +fi-continue: send packets based on normal congestion flow :: ++ fi-continue =| inx=@ud =| sent=(list @ud) =/ max num-slots:fi-gauge |- ^+ fine ?: |(=(~ nex.keen) =(inx max)) fine =^ =want nex.keen nex.keen =. last-sent.want now =. tries.want +(tries.want) =. wan.keen (snoc:fi-deq wan.keen want) =. fine (fi-send hoot.want) $(inx +(inx)) :: ++ fi-sift-full =, keen ~| %frag-mismatch ~| have/num-received ~| need/num-fragments ~| path/path ?> =(num-fragments num-received) ?> =((lent hav) num-received) (sift-roar num-fragments hav) :: ++ fi-fast-retransmit |= fra=@ud =; [cor=_fine wants=(pha want)] cor(wan.keen wants) %^ (dip-left:fi-deq ,cor=_fine) wan.keen fine |= [cor=_fine =want] ^- [(unit ^want) stop=? cor=_fine] ?. (lte fra.want fra) [`want & cor] ?: (gth (next-expiry:fi-gauge:cor +>.want) now) [`want & cor] =. last-sent.want now =. cor (fi-send:cor hoot.want) [`want | cor] :: ++ fi-pass-timer |= =note =/ =wire (welp /fine/behn/wake/(scot %p her) path) fine(event-core (emit unix-duct.ames-state %pass wire note)) :: ++ fi-set-wake ^+ fine =/ next-wake=(unit @da) ?~ want=(peek-left:fi-deq wan.keen) ~ `(next-expiry:fi-gauge +>:u.want) ?: =(next-wake next-wake.keen) fine =? fine !=(~ next-wake.keen) =/ old (need next-wake.keen) =. next-wake.keen ~ (fi-rest old) =? fine ?=(^ next-wake) =. next-wake.keen next-wake (fi-wait u.next-wake) fine :: +fi-take-wake: handle request packet timeout :: ++ fi-take-wake ^+ fine =. next-wake.keen ~ =. peer-core (update-qos %fine qos:(is-peer-dead now peer-state)) :: has the direct route expired? :: =. peer-state (update-peer-route her peer-state) =. metrics.keen on-timeout:fi-gauge =^ want=(unit want) wan.keen (pop-left:fi-deq wan.keen) ~| %took-wake-for-empty-want ?> ?=(^ want) =: tries.u.want +(tries.u.want) last-sent.u.want now == =. wan.keen (cons:fi-deq wan.keen u.want) (fi-send hoot.u.want) -- :: +ga: constructor for |pump-gauge congestion control core :: ++ ga |= [pump-metrics live-packets=@ud] =* ship her =* now now.channel =* metrics +<- |% +| %helpers :: ++ ga-trace |= [verb=? print=(trap tape)] ^+ same (trace %ames verb ship ships.bug.channel print) :: +next-expiry: when should a newly sent fresh packet time out? :: :: Use rtt + 4*sigma, where sigma is the mean deviation of rtt. :: This should make it unlikely that a packet would time out :: from a delay, as opposed to an actual packet loss. :: ++ next-expiry |= packet-state ^- @da (add last-sent rto) :: +num-slots: how many packets can we send right now? :: ++ num-slots ^- @ud (sub-safe cwnd live-packets) :: :: +clamp-rto: apply min and max to an .rto value :: ++ clamp-rto |= rto=@dr ^+ rto (min max-backoff (max ^~((div ~s1 5)) rto)) :: +max-backoff: calculate highest re-send interval :: :: Keeps pinhole to sponsors open by inspecting the duct (hack). :: ++ max-backoff ^- @dr ?:(?=([[%gall %use %ping *] *] duct) ~s25 ~m2) :: +in-slow-start: %.y iff we're in "slow-start" mode :: ++ in-slow-start ^- ? (lth cwnd ssthresh) :: +in-recovery: %.y iff we're recovering from a skipped packet :: :: We finish recovering when .live-packets finally dips back :: down to .cwnd. :: ++ in-recovery ^- ? (gth live-packets cwnd) :: +sub-safe: subtract with underflow protection :: ++ sub-safe |= [a=@ b=@] ^- @ ?:((lte a b) 0 (sub a b)) :: +show: produce a printable version of .metrics :: ++ show =/ ms (div ~s1 1.000) :: :* rto=(div rto ms) rtt=(div rtt ms) rttvar=(div rttvar ms) ssthresh=ssthresh cwnd=cwnd num-live=live-packets counter=counter == :: +| %entry-points :: +on-ack: adjust metrics based on a packet getting acknowledged :: ++ on-ack |= =packet-state ^- pump-metrics :: =. counter +(counter) :: if below congestion threshold, add 1; else, add avg 1 / cwnd :: =. cwnd ?: in-slow-start +(cwnd) (add cwnd !=(0 (mod (mug now) cwnd))) :: if this was a re-send, don't adjust rtt or downstream state :: ?: (gth tries.packet-state 1) metrics :: rtt-datum: new rtt measurement based on packet roundtrip :: =/ rtt-datum=@dr (sub-safe now last-sent.packet-state) :: rtt-error: difference between this measurement and expected :: =/ rtt-error=@dr ?: (gte rtt-datum rtt) (sub rtt-datum rtt) (sub rtt rtt-datum) :: exponential weighting ratio for .rtt and .rttvar :: =. rtt (div (add rtt-datum (mul rtt 7)) 8) =. rttvar (div (add rtt-error (mul rttvar 7)) 8) =. rto (clamp-rto (add rtt (mul 4 rttvar))) :: %. metrics %+ ga-trace ges.veb |. "ack update {}" :: +on-skipped-packet: handle misordered ack :: ++ on-skipped-packet |= packet-state ^- [resend=? pump-metrics] :: =/ resend=? &((lte tries 1) |(in-recovery (gte skips 3))) :- resend :: =? cwnd !in-recovery (max 2 (div cwnd 2)) %- %+ ga-trace snd.veb |.("skip {}") metrics :: +on-timeout: (re)enter slow-start mode on packet loss :: ++ on-timeout ^- pump-metrics :: %- (ga-trace ges.veb |.("timeout update {}")) =: ssthresh (max 1 (div cwnd 2)) cwnd 1 rto (clamp-rto (mul rto 2)) == metrics -- -- -- -- :: adult ames, after metamorphosis from larva :: =| =ames-state |= [now=@da eny=@ rof=roof] =* ames-gate . =* veb veb.bug.ames-state |% :: +call: handle request $task :: ++ call |= [=duct dud=(unit goof) wrapped-task=(hobo task)] ^- [(list move) _ames-gate] :: =/ =task ((harden task) wrapped-task) =/ event-core (ev [now eny rof] duct ames-state) :: =^ moves ames-state =< abet :: handle error notifications :: ?^ dud ?+ -.task (on-crud:event-core -.task tang.u.dud) %hear (on-hear:event-core lane.task blob.task dud) == :: ?- -.task %born on-born:event-core %hear (on-hear:event-core [lane blob ~]:task) %heed (on-heed:event-core ship.task) %init on-init:event-core %jilt (on-jilt:event-core ship.task) %prod (on-prod:event-core ships.task) %sift (on-sift:event-core ships.task) %snub (on-snub:event-core [form ships]:task) %spew (on-spew:event-core veb.task) %cong (on-cong:event-core [msg mem]:task) %stir (on-stir:event-core arg.task) %trim on-trim:event-core %vega on-vega:event-core %plea (on-plea:event-core [ship plea]:task) %cork (on-cork:event-core ship.task) %kroc (on-kroc:event-core dry.task) :: %pine (on-pine:event-core +.task) %keen (on-keen:event-core +.task) %yawn (on-cancel-scry:event-core | +.task) %wham (on-cancel-scry:event-core & +.task) == :: [moves ames-gate] :: +take: handle response $sign :: ++ take |= [=wire =duct dud=(unit goof) =sign] ^- [(list move) _ames-gate] ?^ dud ~|(%ames-take-dud (mean tang.u.dud)) :: =/ event-core (ev [now eny rof] duct ames-state) :: =^ moves ames-state =< abet ?- sign [@ %done *] (on-take-done:event-core wire error.sign) [@ %boon *] (on-take-boon:event-core wire payload.sign) :: [%behn %wake *] (on-take-wake:event-core wire error.sign) :: [%jael %turf *] (on-take-turf:event-core turfs.sign) [%jael %private-keys *] (on-priv:event-core [life vein]:sign) [%jael %public-keys *] (on-publ:event-core wire public-keys-result.sign) == :: [moves ames-gate] :: +stay: extract state before reload :: ++ stay [%13 %adult ames-state] :: +load: load in old state after reload :: ++ load =< |= $= old-state $% [%13 ^ames-state] == ^+ ames-gate ?> ?=(%13 -.old-state) ames-gate(ames-state +.old-state) :: all state transitions are called from larval ames :: |% :: ++ state-4-to-5 |= ames-state=ames-state-4 ^- ames-state-5 =. peers.ames-state %- ~(run by peers.ames-state) |= ship-state=ship-state-4 ?. ?=(%known -.ship-state) ship-state =. snd.ship-state %- ~(run by snd.ship-state) |= =message-pump-state =. num-live.metrics.packet-pump-state.message-pump-state ~(wyt in live.packet-pump-state.message-pump-state) message-pump-state ship-state ames-state :: ++ state-5-to-6 |= ames-state=ames-state-5 ^- ames-state-6 :_ +.ames-state %- ~(rut by peers.ames-state) |= [=ship ship-state=ship-state-5] ^- ship-state-6 ?. ?=(%known -.ship-state) ship-state =/ peer-state=peer-state-5 +.ship-state =/ =rift :: harcoded because %jael doesn't have data about comets :: ?: ?=(%pawn (clan:title ship)) 0 ;; @ud =< q.q %- need %- need (rof ~ %j `beam`[[our %rift %da now] /(scot %p ship)]) :- -.ship-state :_ +.peer-state =, -.peer-state [symmetric-key life rift public-key sponsor] :: ++ state-6-to-7 |= ames-state=ames-state-6 ^- ames-state-7 :_ +.ames-state %- ~(run by peers.ames-state) |= ship-state=ship-state-6 ^- ship-state-7 ?. ?=(%known -.ship-state) ship-state :- %known ^- peer-state-7 :- +<.ship-state [route qos ossuary snd rcv nax heeds ~ ~ ~]:ship-state :: ++ state-7-to-8 |= ames-state=ames-state-7 ^- ames-state-8 =, ames-state :* peers unix-duct life crypto-core bug *(set wire) == :: ++ state-8-to-9 |= ames-state=ames-state-8 ^- ames-state-9 =, ames-state :* peers unix-duct life crypto-core bug corks *(set ship) == :: ++ state-9-to-10 |= ames-state=ames-state-9 ^- ames-state-10 =, ames-state :* peers unix-duct life crypto-core %= bug.ames-state veb [&1 &2 &3 &4 &5 &6 |6 %.n]:veb.bug == corks snub == :: ++ state-10-to-11 |= ames-state=ames-state-10 ^- ames-state-11 =, ames-state :* peers unix-duct life crypto-core bug corks snub :: 5 messages and 100Kb of data outstanding :: [msg=5 mem=100.000] == :: ++ state-11-to-12 |= ames-state=ames-state-11 ^- ames-state-12 :_ =, ames-state :* unix-duct life crypto-core bug [%deny snub] cong == ^- (map ship ship-state-12) %- ~(run by peers.ames-state) |= ship-state=ship-state-7 ^- ship-state-12 ?. ?=(%known -.ship-state) ship-state %= ship-state +> [route qos ossuary snd rcv nax heeds closing corked]:+>.ship-state == :: ++ state-12-to-13 |= old=ames-state-12 ^- ^ames-state =+ !< =rift q:(need (need (rof ~ %j `beam`[[our %rift %da now] /(scot %p our)]))) :* peers=(~(run by peers.old) ship-state-12-to-13) unix-duct.old life.old rift crypto-core=(nol:nu:crub:crypto sec:ex:crypto-core.old) %= bug.old veb [&1 &2 &3 &4 &5 &6 &7 |7 %.n]:veb.bug.old == snub.old cong.old == :: ++ ship-state-12-to-13 |= old=ship-state-12 ^- ship-state ?: ?=(%alien -.old) old(heeds [heeds.old ~ ~]) old(corked [corked.old *scry-state]) :: -- :: +scry: dereference namespace :: ++ scry ^- roon |= [lyc=gang car=term bem=beam] ^- (unit (unit cage)) =* ren car =* why=shop &/p.bem =* syd q.bem =* lot=coin $/r.bem =* tyl s.bem :: ::TODO don't special-case whey scry :: ?: &(=(%$ ren) =(tyl /whey)) =/ maz=(list mass) =+ [known alien]=(skid ~(val by peers.ames-state) |=(^ =(%known +<-))) :~ peers-known+&+known peers-alien+&+alien == ``mass+!>(maz) :: only respond for the local identity, %$ desk, current timestamp :: ?. ?& =(&+our why) =([%$ %da now] lot) =(%$ syd) == ?. for.veb.bug.ames-state ~ ~> %slog.0^leaf/"ames: scry-fail {}" ~ :: /ax/protocol/version @ :: /ax/peers (map ship ?(%alien %known)) :: /ax/peers/[ship] ship-state :: /ax/peers/[ship]/last-contact (unit @da) :: /ax/peers/[ship]/forward-lane (list lane) :: /ax/bones/[ship] [snd=(set bone) rcv=(set bone)] :: /ax/snd-bones/[ship]/[bone] vase :: /ax/snubbed (?(%allow %deny) (list ship)) :: /ax/fine/hunk/[path/...] (list @ux) scry response fragments :: ?. ?=(%x ren) ~ => .(tyl `(pole knot)`tyl) ?+ tyl ~ [%protocol %version ~] ``noun+!>(protocol-version) :: [%peers ~] :^ ~ ~ %noun !> ^- (map ship ?(%alien %known)) (~(run by peers.ames-state) head) :: [%peers her=@ req=*] =/ who (slaw %p her.tyl) ?~ who [~ ~] =/ peer (~(get by peers.ames-state) u.who) ?+ req.tyl [~ ~] ~ ?~ peer [~ ~] ``noun+!>(u.peer) :: [%last-contact ~] :^ ~ ~ %noun !> ^- (unit @da) ?. ?=([~ %known *] peer) ~ `last-contact.qos.u.peer :: [%forward-lane ~] :: :: this duplicates the routing hack from +send-blob:event-core :: so long as neither the peer nor the peer's sponsoring galaxy is us, :: and the peer has been reached recently: :: :: - no route to the peer, or peer has not been contacted recently: :: send to the peer's sponsoring galaxy :: - direct route to the peer: use that :: - indirect route to the peer: send to both that route and the :: the peer's sponsoring galaxy :: :^ ~ ~ %noun !> ^- (list lane) ?: =(our u.who) ~ ?. ?=([~ %known *] peer) =/ sax (rof ~ %j `beam`[[our %saxo %da now] /(scot %p u.who)]) ?. ?=([~ ~ *] sax) ~ =/ gal (rear ;;((list ship) q.q.u.u.sax)) ?: =(our gal) ~ [%& gal]~ =; zar=(trap (list lane)) ?: (lth last-contact.qos.u.peer (sub now ~h1)) $:zar ?~ route.u.peer $:zar =* rot u.route.u.peer ?:(direct.rot [lane.rot ~] [lane.rot $:zar]) :: |. ^- (list lane) ?: ?=(%czar (clan:title sponsor.u.peer)) ?: =(our sponsor.u.peer) ~ [%& sponsor.u.peer]~ =/ next (~(get by peers.ames-state) sponsor.u.peer) ?. ?=([~ %known *] next) ~ $(peer next) == :: [%bones her=@ ~] =/ who (slaw %p her.tyl) ?~ who [~ ~] =/ per (~(get by peers.ames-state) u.who) ?. ?=([~ %known *] per) [~ ~] =/ res =, u.per [snd=~(key by snd) rcv=~(key by rcv)] ``noun+!>(res) :: [%snd-bones her=@ bon=@ ~] =/ who (slaw %p her.tyl) ?~ who [~ ~] =/ ost (slaw %ud bon.tyl) ?~ ost [~ ~] =/ per (~(get by peers.ames-state) u.who) ?. ?=([~ %known *] per) [~ ~] =/ mps (~(get by snd.u.per) u.ost) ?~ mps [~ ~] =/ res u.mps ``noun+!>(!>(res)) :: [%snubbed ~] ``noun+!>([form.snub.ames-state ~(tap in ships.snub.ames-state)]) :: [%fine %hunk lop=@t len=@t pax=^] ::TODO separate endpoint for the full message (instead of packet list) :: .pax is expected to be a scry path of the shape /vc/desk/rev/etc, :: so we need to give it the right shape :: ?~ blk=(de-path-soft:balk pax.tyl) ~ =+ nom=(en-roof:balk u.blk) ~| nom |^ =/ van ?@(vis.nom (end 3 vis.nom) way.vis.nom) ?+ van ~ %c =+ pem=(rof lyc nom(vis %cp)) ?. ?=(^ pem) ~ ?. ?=(^ u.pem) ~ ~| u.u.pem =+ per=!<([r=dict:clay w=dict:clay] q.u.u.pem) ?. =([%black ~ ~] rul.r.per) ~ (en-hunk (rof lyc nom)) :: %g =/ kyr ?@(vis.nom (rsh 3 vis.nom) car.vis.nom) %- en-hunk ?+ kyr ~ %x (rof lyc nom) == == :: ++ en-hunk |= res=(unit (unit cage)) ^+ res ?~ res ~ =/ =hunk [(slav %ud lop.tyl) (slav %ud len.tyl)] :: =/ hu-co (etch-hunk our [life crypto-core]:ames-state) ?- res [~ ~] ``noun+!>((etch:hu-co pax.tyl hunk ~)) [~ ~ *] ``noun+!>((etch:hu-co pax.tyl hunk [p q.q]:u.u.res)) == -- == --