spider: fix tracking remote-scry requests

Prevously we were tracking remote scry requests using a map, assuming
that every thread would do just one remote scry request. This is not
right. A thread that did multiple +keen:strandio was treated as
if just the last call existed, overwritten previous entries in the map.

Now we track remote scries using a jug that accounts for multiple %keen
tasks per thread.

The logic for sending %yawns to %ames has been updated for the following
scenarios:

- +thread-fail will always send a %yawn task
- +thread-done doesn't send %yawn tasks
  - unless a running thread is stopped
- if %spider is reloaded:
  - %yawn tasks will be sent for any running or starting thread

/lib/strandio also removes +take-tune from +keen,  decoupling
sending %tasks and receiving %signs. This allows for clients
to request multiple paths at future cases, without blocking.
This commit is contained in:
yosoyubik 2023-04-27 13:55:06 +02:00
parent b98b71e368
commit cfc1a58559
3 changed files with 74 additions and 36 deletions

View File

@ -15,7 +15,7 @@
running=(axal thread-form)
tid=(map tid yarn)
serving=(map tid [(unit @ta) =mark =desk])
scries=(map tid [=ship =path])
scrying=(jug tid [=wire =ship =path])
==
::
+$ clean-slate-any
@ -25,16 +25,26 @@
clean-slate-2
clean-slate-3
clean-slate-4
clean-slate-5
clean-slate
==
::
+$ clean-slate
$: %6
starting=(map yarn [=trying =vase])
running=(list yarn)
tid=(map tid yarn)
serving=(map tid [(unit @ta) =mark =desk])
scrying=(jug tid [wire ship path])
==
::
+$ clean-slate-5
$: %5
starting=(map yarn [=trying =vase])
running=(list yarn)
tid=(map tid yarn)
serving=(map tid [(unit @ta) =mark =desk])
scries=(map tid [ship path])
scrying=(map tid [ship path])
==
::
+$ clean-slate-4
@ -110,17 +120,23 @@
=. any (old-to-3 any)
=. any (old-to-4 any)
=. any (old-to-5 any)
?> ?=(%5 -.any)
=. any (old-to-6 any)
?> ?=(%6 -.any)
::
=. tid.state tid.any
=/ yarns=(list yarn)
%+ welp running.any
~(tap in ~(key by starting.any))
~& yarns+yarns
|- ^- (quip card _this)
?~ yarns
?~ yarns
[~[bind-eyre:sc] this]
=^ cards-1 state
(handle-stop-thread:sc (yarn-to-tid i.yarns) |)
%. [(yarn-to-tid i.yarns) nice=%.n]
:: the |sc core needs to now about the previous
:: scrying state in order to send $yawns to %ames
::
%*(handle-stop-thread sc scrying.state scrying.any)
=^ cards-2 this
$(yarns t.yarns)
[:(weld upgrade-cards cards-1 cards-2) this]
@ -133,8 +149,8 @@
++ old-to-2
|= old=clean-slate-any
^- (quip card clean-slate-any)
?> ?=(?(%1 %2 %3 %4 %5) -.old)
?: ?=(?(%2 %3 %4 %5) -.old)
?> ?=(?(%1 %2 %3 %4 %5 %6) -.old)
?: ?=(?(%2 %3 %4 %5 %6) -.old)
`old
:- ~[bind-eyre:sc]
:* %2
@ -147,8 +163,8 @@
++ old-to-3
|= old=clean-slate-any
^- clean-slate-any
?> ?=(?(%2 %3 %4 %5) -.old)
?: ?=(?(%3 %4 %5) -.old)
?> ?=(?(%2 %3 %4 %5 %6) -.old)
?: ?=(?(%3 %4 %5 %6) -.old)
old
:* %3
starting.old
@ -156,11 +172,12 @@
tid.old
(~(run by serving.old) |=([id=@ta =mark] [id mark q.byk.bowl]))
==
::
++ old-to-4
|= old=clean-slate-any
^- clean-slate-any
?> ?=(?(%3 %4 %5) -.old)
?: ?=(?(%4 %5) -.old)
?> ?=(?(%3 %4 %5 %6) -.old)
?: ?=(?(%4 %5 %6) -.old)
old
:* %4
starting.old
@ -171,10 +188,27 @@
::
++ old-to-5
|= old=clean-slate-any
^- clean-slate
?> ?=(?(%4 %5) -.old)
?: ?=(%5 -.old) old
^- clean-slate-any
?> ?=(?(%4 %5 %6) -.old)
?: ?=(?(%5 %6) -.old) old
[%5 +.old(serving [serving.old ~])]
::
++ old-to-6
|= old=clean-slate-any
^- clean-slate
?> ?=(?(%5 %6) -.old)
?: ?=(%6 -.old) old
:- %6
%= +.old
scrying
%- ~(run by scrying.old)
|= [=ship =path]
%- ~(gas in *(set [wire ^ship ^path]))
:: XX +keen:strandio used /keen as the default wire
:: this assumes that any old thread used that as well
::
[/keen ship path]~
==
--
::
++ on-poke
@ -421,14 +455,13 @@
?: (~(has of running.state) u.yarn)
?. nice
(thread-fail u.yarn %cancelled ~)
=^ cancel-cards state (cancel-scry tid &)
=^ done-cards state (thread-done u.yarn *vase)
[(weld cancel-cards done-cards) state]
=^ done-cards state (thread-done u.yarn *vase silent=%.n)
[done-cards state]
?: (~(has by starting.state) u.yarn)
(thread-fail-not-running tid %stopped-before-started ~)
~& [%thread-not-started u.yarn]
?: nice
(thread-done u.yarn *vase)
(thread-done u.yarn *vase silent=%.y)
(thread-fail u.yarn %cancelled ~)
::
++ take-input
@ -457,8 +490,8 @@
^- [(list card) _state]
%+ roll cards.r
|= [=card cards=(list card) s=_state]
:_ =? scries.s ?=([%pass ^ %arvo %a %keen @ *] card)
(~(put by scries.s) tid &6.card +>+>+>.card)
:_ =? scrying.s ?=([%pass ^ %arvo %a %keen @ *] card)
(~(put ju scrying.s) tid [&2 &6 |6]:card)
s
:_ cards
^- ^card
@ -476,7 +509,7 @@
?- -.eval-result.r
%next `state
%fail (thread-fail yarn err.eval-result.r)
%done (thread-done yarn value.eval-result.r)
%done (thread-done yarn value.eval-result.r silent=%.y)
==
[(weld cards final-cards) state]
::
@ -500,12 +533,15 @@
++ cancel-scry
|= [=tid silent=?]
^- (quip card _state)
?~ scry=(~(get by scries.state) tid)
?~ scrying=(~(get ju scrying.state) tid)
`state
:_ state(scries (~(del by scries.state) tid))
:_ state(scrying (~(del by scrying.state) tid))
?: silent ~
%- (slog leaf+"cancelling {<tid>}: [{<[ship path]:u.scry>}]" ~)
[%pass /thread/[tid]/keen %arvo %a %yawn [ship path]:u.scry]~
%- ~(rep in `(set [wire ship path])`scrying)
|= [[=wire =ship =path] cards=(list card)]
%- (slog leaf+"cancelling {<tid>}: [{<[wire ship path]>}]" ~)
:_ cards
[%pass (welp /thread/[tid] wire) %arvo %a %yawn ship path]
::
++ thread-http-fail
|= [=tid =term =tang]
@ -535,9 +571,9 @@
::%- (slog leaf+"strand {<yarn>} failed" leaf+<term> tang)
=/ =tid (yarn-to-tid yarn)
=/ fail-cards (thread-say-fail tid term tang)
=^ cards state (thread-clean yarn)
=^ cards state (thread-clean yarn)
=^ http-cards state (thread-http-fail tid term tang)
=^ scry-card state (cancel-scry tid |)
=^ scry-card state (cancel-scry tid silent=%.n)
:_ state
:(weld fail-cards cards http-cards scry-card)
::
@ -556,7 +592,7 @@
(json-response:gen:server !<(json (tube vase)))
::
++ thread-done
|= [=yarn =vase]
|= [=yarn =vase silent=?]
^- (quip card ^state)
:: %- (slog leaf+"strand {<yarn>} finished" (sell vase) ~)
=/ =tid (yarn-to-tid yarn)
@ -566,8 +602,8 @@
==
=^ http-cards state
(thread-http-response tid vase)
=^ scry-card state (cancel-scry tid &)
=^ cards state (thread-clean yarn)
=^ scry-card state (cancel-scry tid silent)
=^ cards state (thread-clean yarn)
[:(weld done-cards cards http-cards scry-card) state]
::
++ thread-clean
@ -640,7 +676,7 @@
::
++ clean-state
!> ^- clean-slate
5+state(running (turn ~(tap of running.state) head))
6+state(running (turn ~(tap of running.state) head))
::
++ convert-tube
|= [from=mark to=mark =desk =bowl:gall]

View File

@ -6,7 +6,10 @@
=/ m (strand ,vase)
^- form:m
=+ !<([~ =spar:ames] arg)
;< [* roar=(unit roar:ames)] bind:m (keen:strandio spar)
;< ~ bind:m
(keen:strandio /keen spar)
;< [* roar=(unit roar:ames)] bind:m
(take-tune:strandio /keen)
?~ roar
(pure:m !>(~))
?~ data=q.dat.u.roar

View File

@ -332,11 +332,10 @@
(take-wake `until)
::
++ keen
|= =spar:ames
=/ m (strand ,[spar:ames (unit roar:ames)])
|= [=wire =spar:ames]
=/ m (strand ,~)
^- form:m
;< ~ bind:m (send-raw-card %pass /keen %arvo %a %keen spar)
(take-tune /keen)
(send-raw-card %pass wire %arvo %a %keen spar)
::
++ sleep
|= for=@dr