::  eth-watcher: ethereum event log collector
::
/-  *eth-watcher, spider
/+  ethereum, default-agent, verb, dbug
=,  ethereum-types
=,  jael
::
=>  |%
    +$  card  card:agent:gall
    +$  app-state
      $:  %4
          dogs=(map path watchdog)
      ==
    ::
    +$  context  [=path dog=watchdog]
    +$  watchdog
      $:  config
          running=(unit [since=@da =tid:spider])
          =number:block
          =pending-logs
          =history
          blocks=(list block)
      ==
    ::
    ::  history: newest block first, oldest event first
    +$  history       (list loglist)
    --
::
::  Helpers
::
=>  |%
    ++  wait
      |=  [=path now=@da time=@dr]
      ^-  card
      [%pass [%timer path] %arvo %b %wait (add now time)]
    ::
    ++  wait-shortcut
      |=  [=path now=@da]
      ^-  card
      [%pass [%timer path] %arvo %b %wait now]
    ::
    ++  poke-spider
      |=  [=path our=@p =cage]
      ^-  card
      [%pass [%running path] %agent [our %spider] %poke cage]
    ::
    ++  watch-spider
      |=  [=path our=@p =sub=path]
      ^-  card
      [%pass [%running path] %agent [our %spider] %watch sub-path]
    ::
    ++  leave-spider
      |=  [=path our=@p]
      ^-  card
      [%pass [%running path] %agent [our %spider] %leave ~]
    --
::
::  Main
::
%-  agent:dbug
^-  agent:gall
=|  state=app-state
%+  verb  |
|_  =bowl:gall
+*  this  .
    def   ~(. (default-agent this %|) bowl)
::
++  on-init
  ^-  (quip card _this)
  [~ this]
::
++  on-save   !>(state)
++  on-load
  |=  old=vase
  |^
  =+  !<(old-state=app-states old)
  =?  old-state  ?=(%0 -.old-state)
    %-  (slog leaf+"upgrading eth-watcher from %0" ~)
    ^-  app-state-1
    %=    old-state
        -  %1
        dogs
      %-  ~(run by dogs.old-state)
      |=  dog=watchdog-0
      %=  dog
        ->  [~m5 ->.dog]
      ==
    ==
  ::
  =^  cards-1=(list card)  old-state
    ?.  ?=(%1 -.old-state)
      `old-state
    %-  (slog leaf+"upgrading eth-watcher from %1" ~)
    :_  old-state(- %2)
    %+  turn  ~(tap by dogs.old-state)
    |=  [=path dog=watchdog-1]
    (wait-shortcut path now.bowl)
  ::
  =?  old-state  ?=(%2 -.old-state)
    %-  (slog leaf+"upgrading eth-watcher from %2" ~)
    ^-  app-state-3
    %=    old-state
        -  %3
        dogs
      %-  ~(run by dogs.old-state)
      |=  dog=watchdog-1
      %=  dog
        ->  [| ->.dog]
      ==
    ==
  ::
  =?  old-state  ?=(%3 -.old-state)
    %-  (slog leaf+"upgrading eth-watcher from %3" ~)
    ^-  app-state
    %=    old-state
        -  %4
        dogs
      %-  ~(run by dogs.old-state)
      |=  dog=watchdog-3
      %=  dog
          -
        =,  -.dog
        [url eager refresh-rate (mul refresh-rate 6) from contracts topics]
      ::
          running
        ?~  running.dog  ~
        `[now.bowl u.running.dog]
      ==
    ==
  ::
  [cards-1 this(state ?>(?=(%4 -.old-state) old-state))]
  ::
  +$  app-states
    $%(app-state-0 app-state-1 app-state-2 app-state-3 app-state)
  ::
  +$  app-state-3
    $:  %3
        dogs=(map path watchdog-3)
    ==
  ::
  +$  watchdog-3
    $:  config-3
        running=(unit =tid:spider)
        =number:block
        =pending-logs
        =history
        blocks=(list block)
    ==
  ::
  +$  config-3
    $:  url=@ta
        eager=?
        refresh-rate=@dr
        from=number:block
        contracts=(list address:ethereum)
        =topics
    ==
  ::
  +$  app-state-2
    $:  %2
        dogs=(map path watchdog-1)
    ==
  ::
  +$  app-state-1
    $:  %1
        dogs=(map path watchdog-1)
    ==
  ::
  +$  watchdog-1
    $:  config-1
        running=(unit =tid:spider)
        =number:block
        =pending-logs
        =history
        blocks=(list block)
    ==
  ::
  +$  config-1
    $:  url=@ta
        refresh-rate=@dr
        from=number:block
        contracts=(list address:ethereum)
        =topics
    ==
  ::
  +$  app-state-0
    $:  %0
        dogs=(map path watchdog-0)
    ==
  ::
  +$  watchdog-0
    $:  config-0
        running=(unit =tid:spider)
        =number:block
        =pending-logs
        =history
        blocks=(list block)
    ==
  ::
  +$  config-0
    $:  url=@ta
        from=number:block
        contracts=(list address:ethereum)
        =topics
    ==
  --
::
++  on-poke
  |=  [=mark =vase]
  ?:  ?=(%noun mark)
    ~&  state
    `this
  ?.  ?=(%eth-watcher-poke mark)
    (on-poke:def mark vase)
  ::
  =+  !<(=poke vase)
  ?-  -.poke
      %watch
    ::  fully restart the watchdog if it doesn't exist yet,
    ::  or if result-altering parts of the config changed.
    =/  restart=?
      ?|  !(~(has by dogs.state) path.poke)
          ?!  .=  ->+>+:(~(got by dogs.state) path.poke)
                   +>+>.config.poke
      ==
    ::
    =/  already  (~(has by dogs.state) path.poke)
    ~?  &(already restart)
      [dap.bowl 'overwriting existing watchdog on' path.poke]
    =/  wait-cards=(list card)
      ?:  already
        ~
      [(wait-shortcut path.poke now.bowl) ~]
    ::
    =/  restart-cards=(list card)
      =/  dog  (~(get by dogs.state) path.poke)
      ?.  ?&  restart
              ?=(^ dog)
              ?=(^ running.u.dog)
          ==
        ~
      =/  =cage  [%spider-stop !>([tid.u.running.u.dog &])]
      :_  ~
      `card`[%pass [%starting path.poke] %agent [our.bowl %spider] %poke cage]
    =/  new-dog
      =/  dog=watchdog
        ?:  restart  *watchdog
        (~(got by dogs.state) path.poke)
      %_  dog
        -       config.poke
        number  from.config.poke
      ==
    =.  dogs.state  (~(put by dogs.state) path.poke new-dog)
    [(weld wait-cards restart-cards) this]
  ::
      %clear
    =.  dogs.state  (~(del by dogs.state) path.poke)
    [~ this]
  ==
::
::  +on-watch: subscribe & get initial subscription data
::
::    /logs/some-path:
::
++  on-watch
  |=  =path
  ^-  (quip card agent:gall)
  ?.  ?=([%logs ^] path)
    ~|  [%invalid-subscription-path path]
    !!
  :_  this  :_  ~
  :*  %give  %fact  ~  %eth-watcher-diff  !>
      :-  %history
      ^-  loglist
      %-  zing
      %-  flop
      =<  history
      (~(gut by dogs.state) t.path *watchdog)
  ==
::
++  on-leave  on-leave:def
::
::  +on-peek: get diagnostics data
::
::    /block/some-path: get next block number to check for /some-path
::
++  on-peek
  |=  =path
  ^-  (unit (unit cage))
  ?+    path  ~
      [%x %block ^]
    ?.  (~(has by dogs.state) t.t.path)  ~
    :+  ~  ~
    :-  %atom
    !>(number:(~(got by dogs.state) t.t.path))
  ::
      [%x %dogs ~]
    ``noun+!>(~(key by dogs.state))
  ::
      [%x %dogs %configs ~]
    ``noun+!>((~(run by dogs.state) |=(=watchdog -.watchdog)))
  ==
::
++  on-agent
  |=  [=wire =sign:agent:gall]
  |^
  ^-  (quip card agent:gall)
  ?.  ?=([%running *] wire)
    (on-agent:def wire sign)
  ?-    -.sign
      %poke-ack
    ?~  p.sign
      [~ this]
    %-  (slog leaf+"eth-watcher couldn't start thread" u.p.sign)
    :_  (clear-running t.wire)  :_  ~
    (leave-spider t.wire our.bowl)
  ::
      %watch-ack
    ?~  p.sign
      [~ this]
    %-  (slog leaf+"eth-watcher couldn't start listening to thread" u.p.sign)
    ::  TODO: kill thread that may have started, although it may not
    ::  have started yet since we get this response before the
    ::  %start-spider poke is processed
    ::
    [~ (clear-running t.wire)]
  ::
      %kick  [~ (clear-running t.wire)]
      %fact
    =*  path  t.wire
    =/  dog  (~(get by dogs.state) path)
    ?~  dog
      [~ this]
    ?+    p.cage.sign  (on-agent:def wire sign)
        %thread-fail
      =+  !<([=term =tang] q.cage.sign)
      %-  (slog leaf+"eth-watcher failed; will retry" leaf+<term> tang)
      [~ this(dogs.state (~(put by dogs.state) path u.dog(running ~)))]
    ::
        %thread-done
      =+  !<([vows=disavows pup=watchpup] q.cage.sign)
      =.  u.dog
        %_  u.dog
          -             -.pup
          number        number.pup
          blocks        blocks.pup
          pending-logs  pending-logs.pup
        ==
      =^  cards-1  u.dog  (disavow path u.dog vows)
      =^  cards-2  u.dog  (release-logs path u.dog)
      =.  dogs.state  (~(put by dogs.state) path u.dog(running ~))
      [(weld cards-1 cards-2) this]
    ==
  ==
  ::
  ++  clear-running
    |=  =path
    =/  dog  (~(get by dogs.state) path)
    ?~  dog
      this
    this(dogs.state (~(put by dogs.state) path u.dog(running ~)))
  ::
  ++  disavow
    |=  [=path dog=watchdog vows=disavows]
    ^-  (quip card watchdog)
    =/  history-ids=(list [id:block loglist])
      %+  murn  history.dog
      |=  logs=loglist
      ^-  (unit [id:block loglist])
      ?~  logs
         ~
      `[[block-hash block-number]:(need mined.i.logs) logs]
    =/  actual-vows=disavows
      %+  skim  vows
      |=  =id:block
      (lien history-ids |=([=history=id:block *] =(id history-id)))
    =/  actual-history=history
      %+  murn  history-ids
      |=  [=id:block logs=loglist]
      ^-  (unit loglist)
      ?:  (lien actual-vows |=(=vow=id:block =(id vow-id)))
        ~
      `logs
    :_  dog(history actual-history)
    %+  turn  actual-vows
    |=  =id:block
    [%give %fact [%logs path]~ %eth-watcher-diff !>([%disavow id])]
  ::
  ++  release-logs
    |=  [=path dog=watchdog]
    ^-  (quip card watchdog)
    ?:  (lth number.dog 30)
      `dog
    =/  rel-number  (sub number.dog 30)
    =/  numbers=(list number:block)  ~(tap in ~(key by pending-logs.dog))
    =.  numbers  (sort numbers lth)
    =^  logs=(list event-log:rpc:ethereum)  dog
      |-  ^-  (quip event-log:rpc:ethereum watchdog)
      ?~  numbers
        `dog
      ?:  (gth i.numbers rel-number)
        $(numbers t.numbers)
      =^  rel-logs-1  dog
        =/  =loglist  (~(get ja pending-logs.dog) i.numbers)
        =.  pending-logs.dog  (~(del by pending-logs.dog) i.numbers)
        ?~  loglist
          `dog
        =.  history.dog  [loglist history.dog]
        [loglist dog]
      =^  rel-logs-2  dog  $(numbers t.numbers)
      [(weld rel-logs-1 rel-logs-2) dog]
    :_  dog
    ?~  logs
      ~
    ^-  (list card:agent:gall)
    [%give %fact [%logs path]~ %eth-watcher-diff !>([%logs logs])]~
  --
::
++  on-arvo
  |=  [=wire =sign-arvo]
  ^-  (quip card agent:gall)
  ?+    +<.sign-arvo  ~|([%strange-sign-arvo -.sign-arvo] !!)
      %wake
    ?.  ?=([%timer *] wire)  ~&  weird-wire=wire  [~ this]
    =*  path  t.wire
    ?.  (~(has by dogs.state) path)
      [~ this]
    =/  dog=watchdog
      (~(got by dogs.state) path)
    ?^  error.sign-arvo
      ::  failed, try again.  maybe should tell user if fails more than
      ::  5 times.
      ::
      %-  (slog leaf+"eth-watcher failed; will retry" ~)
      [[(wait path now.bowl refresh-rate.dog)]~ this]
    ::  maybe kill a timed-out update thread, maybe start a new one
    ::
    =^  stop-cards=(list card)  dog
      ::  if still running beyond timeout time, kill it
      ::
      ?.  ?&  ?=(^ running.dog)
            ::
              %+  gth  now.bowl
              (add since.u.running.dog timeout-time.dog)
          ==
        `dog
      ::
      %-  (slog leaf+"eth-watcher {(spud path)} timed out; will restart" ~)
      =/  =cage  [%spider-stop !>([tid.u.running.dog |])]
      :_  dog(running ~)
      :~  (leave-spider path our.bowl)
          [%pass [%starting path] %agent [our.bowl %spider] %poke cage]
      ==
    ::
    =^  start-cards=(list card)  dog
      ::  if not (or no longer) running, start a new thread
      ::
      ?^  running.dog
        `dog
      ::
      =/  new-tid=@ta
        (cat 3 'eth-watcher--' (scot %uv eny.bowl))
      :_  dog(running `[now.bowl new-tid])
      =/  args
        :^  ~  `new-tid  %eth-watcher
        !>([~ `watchpup`[- number pending-logs blocks]:dog])
      :~  (watch-spider path our.bowl /thread-result/[new-tid])
          (poke-spider path our.bowl %spider-start !>(args))
      ==
    ::
    :-  [(wait path now.bowl refresh-rate.dog) (weld stop-cards start-cards)]
    this(dogs.state (~(put by dogs.state) path dog))
  ==
::
++  on-fail   on-fail:def
--