ames: dedup new messages and fragments

This commit is contained in:
Philip Monk 2020-05-01 22:55:14 -07:00
parent b383037a8f
commit c50c34d8be
No known key found for this signature in database
GPG Key ID: B66E1F02604E44EC

View File

@ -2117,12 +2117,39 @@
==
now
::
=/ =message-blob (jam payload)
=/ =message-blob (dedup-message (jam payload))
=. peer-core (run-message-pump bone %memo message-blob)
::
?: &(=(%boon valence) ?=(?(%dead %unborn) -.qos.peer-state))
check-clog
peer-core
:: +dedup-message: replace with any existing copy of this message
::
++ dedup-message
|= =message-blob
^+ message-blob
=/ peers-l=(list [=ship =ship-state]) ~(tap by peers.ames-state)
|- ^+ message-blob
=* peer-loop $
?~ peers-l
message-blob
?. ?=(%known -.ship-state.i.peers-l)
peer-loop(peers-l t.peers-l)
=/ snd-l=(list [=bone =message-pump-state])
~(tap by snd.ship-state.i.peers-l)
|- ^+ message-blob
=* bone-loop $
?~ snd-l
peer-loop(peers-l t.peers-l)
=/ blob-l=(list ^message-blob)
~(tap to unsent-messages.message-pump-state.i.snd-l)
|- ^+ message-blob
=* blob-loop $
?~ blob-l
bone-loop(snd-l t.snd-l)
?: =(i.blob-l message-blob)
i.blob-l
blob-loop(blob-l t.blob-l)
:: +on-wake: handle timer expiration
::
++ on-wake
@ -2222,7 +2249,8 @@
=/ =message-pump-state
(~(gut by snd.peer-state) bone *message-pump-state)
::
=/ message-pump (make-message-pump message-pump-state channel)
=/ message-pump
(make-message-pump message-pump-state channel peers.ames-state)
=^ pump-gifts message-pump-state (work:message-pump task)
=. snd.peer-state (~(put by snd.peer-state) bone message-pump-state)
:: process effects from |message-pump
@ -2436,7 +2464,7 @@
:: +make-message-pump: constructor for |message-pump
::
++ make-message-pump
|= [state=message-pump-state =channel]
|= [state=message-pump-state =channel peers=(map ship ship-state)]
=* veb veb.bug.channel
=| gifts=(list message-pump-gift)
::
@ -2601,11 +2629,38 @@
=^ =message-blob unsent-messages.state ~(get to unsent-messages.state)
:: break .message into .chunks and set as .unsent-fragments
::
=. unsent-fragments.state (split-message next.state message-blob)
=. unsent-fragments.state
(turn (split-message next.state message-blob) dedup-fragment)
:: try to feed packets from the next message
::
=. next.state +(next.state)
feed-packets
:: +dedup-fragment: replace with any existing copy of this packet
::
++ dedup-fragment
|= =static-fragment
^+ static-fragment
=/ peers-l=(list [=ship =ship-state]) ~(tap by peers)
|- ^+ static-fragment
=* peer-loop $
?~ peers-l
static-fragment
?. ?=(%known -.ship-state.i.peers-l)
peer-loop(peers-l t.peers-l)
=/ snd-l=(list [=bone =message-pump-state])
~(tap by snd.ship-state.i.peers-l)
|- ^+ static-fragment
=* bone-loop $
?~ snd-l
peer-loop(peers-l t.peers-l)
=* frag-l unsent-fragments.message-pump-state.i.snd-l
|- ^+ static-fragment
=* frag-loop $
?~ frag-l
bone-loop(snd-l t.snd-l)
?: =(fragment.i.frag-l fragment.static-fragment)
static-fragment(fragment fragment.i.frag-l)
frag-loop(frag-l t.frag-l)
:: +run-packet-pump: call +work:packet-pump and process results
::
++ run-packet-pump