From 06dd05d727af66a5489a445ae1179a868061ca9e Mon Sep 17 00:00:00 2001 From: Benjamin Summers Date: Wed, 24 Jul 2019 18:59:45 -0700 Subject: [PATCH] Implement the Ames IO driver (for fake networking only). --- pkg/hs-urbit/lib/Arvo/Common.hs | 20 ++- pkg/hs-urbit/lib/Arvo/Effect.hs | 14 +- pkg/hs-urbit/lib/Arvo/Event.hs | 2 +- pkg/hs-urbit/lib/Noun.hs | 2 + pkg/hs-urbit/lib/Noun/Conversions.hs | 2 +- pkg/hs-urbit/lib/Urbit/Ames.hs | 187 +++++++++++++++++++++------ pkg/hs-urbit/test/ArvoTests.hs | 1 + pkg/urbit/vere/ames.c | 54 +++++++- 8 files changed, 225 insertions(+), 57 deletions(-) diff --git a/pkg/hs-urbit/lib/Arvo/Common.hs b/pkg/hs-urbit/lib/Arvo/Common.hs index cb612a141..29ea9fa64 100644 --- a/pkg/hs-urbit/lib/Arvo/Common.hs +++ b/pkg/hs-urbit/lib/Arvo/Common.hs @@ -15,8 +15,8 @@ import qualified Network.HTTP.Types.Method as H -- Misc Types ------------------------------------------------------------------ -type AtomIf = Atom -- @if (TODO: What does this mean?) -type AtomIs = Atom -- @is (TODO: What does this mean?) +type AtomIf = Word32 -- Ipv4 Address (@if) +type AtomIs = Word128 -- Ipv6 Address (@is) -- Domain Name newtype Turf = Turf { unTurf :: [Cord] } @@ -129,13 +129,17 @@ deriveNoun ''JsonNode -- Lanes ----------------------------------------------------------------------- -- Network Port -newtype Port = Port { unPort :: Word } - deriving newtype (Eq, Ord, Show, ToNoun, FromNoun) +newtype Port = Port { unPort :: Word16 } + deriving newtype (Eq, Ord, Show, Enum, Real, Integral, Num, ToNoun, FromNoun) +{- + The `Wen` field is (probably) the last time that we were sure that + this DNS lookup worked. This is set when we receive a %hear event. +-} data Lane - = If Wen Port AtomIf - | Is Atom (Maybe Lane) AtomIs - | Ix Wen Port AtomIf + = If Wen Port AtomIf -- Ipv4 + | Is Atom (Maybe Lane) AtomIs -- Ipv6 with fallback + | Ix Wen Port AtomIf -- Not used (Same behavior as `If`) deriving (Eq, Ord, Show) deriveNoun ''Lane @@ -180,6 +184,8 @@ deriveNoun ''Lane [//term/1 [%init ~]] + TODO The reverse translation is not done yet. + -} data ReOrg = ReOrg Cord Cord Cord EvilPath Noun diff --git a/pkg/hs-urbit/lib/Arvo/Effect.hs b/pkg/hs-urbit/lib/Arvo/Effect.hs index 7c032f961..e92806b64 100644 --- a/pkg/hs-urbit/lib/Arvo/Effect.hs +++ b/pkg/hs-urbit/lib/Arvo/Effect.hs @@ -7,11 +7,11 @@ import Arvo.Common (Header, HttpEvent, HttpServerConf, Lane, Method, Mime, Turf) import Arvo.Common (ReOrg(..), reorgThroughNoun) --- Newt Effects -- Todo What are these? ---------------------------------------- +-- Newt Effects ---------------------------------------------------------------- {- - %turf -- TODO - %send -- TODO + %turf -- Set which domain names we've bound. + %send -- Send a UDP packet. -} data NewtEf = NewtEfTurf (Atom, ()) [Turf] @@ -79,9 +79,9 @@ deriveNoun ''SyncEf -- UDP Effects ----------------------------------------------------------------- {- - %init -- TODO - %west -- TODO - %woot -- TODO + %init -- "I don't think that's something that can happen" + %west -- "Those also shouldn't happen" + %woot -- "Those also shouldn't happen" -} data AmesEf = AmesEfInit Path () @@ -135,7 +135,7 @@ data Blit %init -- TODO %logo -- Shutdown %mass -- Measure memory usage (unused) - %send -- TODO + %send -- Send a UDP packet (duplicate of ames %send) -} data TermEf = TermEfBbye Path () diff --git a/pkg/hs-urbit/lib/Arvo/Event.hs b/pkg/hs-urbit/lib/Arvo/Event.hs index ba0d5d52c..9182e688e 100644 --- a/pkg/hs-urbit/lib/Arvo/Event.hs +++ b/pkg/hs-urbit/lib/Arvo/Event.hs @@ -137,7 +137,7 @@ deriveNoun ''HttpServerReq -- Ames ------------------------------------------------------------------------ data AmesEv - = AmesEvHear () Lane Atom + = AmesEvHear () Lane Bytes | AmesEvWake () () | AmesEvWant Path Ship Path Noun | AmesEvCrud Path Cord Tang diff --git a/pkg/hs-urbit/lib/Noun.hs b/pkg/hs-urbit/lib/Noun.hs index ba5a8ca97..3c5434ffb 100644 --- a/pkg/hs-urbit/lib/Noun.hs +++ b/pkg/hs-urbit/lib/Noun.hs @@ -6,6 +6,7 @@ module Noun , module Noun.Jam , module Noun.Cue , module Noun.TH + , module Data.Word , _Cue , loadFile ) where @@ -20,6 +21,7 @@ import Noun.Core import Noun.Cue import Noun.Jam import Noun.TH +import Data.Word -------------------------------------------------------------------------------- diff --git a/pkg/hs-urbit/lib/Noun/Conversions.hs b/pkg/hs-urbit/lib/Noun/Conversions.hs index d9c15457c..0bea3cfba 100644 --- a/pkg/hs-urbit/lib/Noun/Conversions.hs +++ b/pkg/hs-urbit/lib/Noun/Conversions.hs @@ -389,7 +389,7 @@ instance FromNoun Term where -- XX TODO -- Ship ------------------------------------------------------------------------ newtype Ship = Ship Word128 -- @p - deriving newtype (Eq, Ord, Show, Num, ToNoun, FromNoun) + deriving newtype (Eq, Ord, Show, Enum, Real, Integral, Num, ToNoun, FromNoun) -- Path ------------------------------------------------------------------------ diff --git a/pkg/hs-urbit/lib/Urbit/Ames.hs b/pkg/hs-urbit/lib/Urbit/Ames.hs index 99cde86ee..28995a00c 100644 --- a/pkg/hs-urbit/lib/Urbit/Ames.hs +++ b/pkg/hs-urbit/lib/Urbit/Ames.hs @@ -1,35 +1,164 @@ -{-# OPTIONS_GHC -Wwarn #-} - module Urbit.Ames where import ClassyPrelude -import Data.IP -import Network.Socket +import Arvo +import Data.Acquire +import Network.Socket hiding (recvFrom, sendTo) +import Network.Socket.ByteString import Noun -import qualified Data.Vector as V -import qualified Urbit.Time as Time -import qualified Vere.Ames as VA +import Control.Concurrent (threadDelay) +import Control.Lens ((&)) + +import qualified Urbit.Time as Time -------------------------------------------------------------------------------- -data GalaxyInfo = GalaxyInfo { ip :: IPv4, age :: Time.Unix } +{- + On startup (u3_ames_ef_bake): + *_ef_bake means "send any initial events" + Send event: [//newt/u3A->sen [%barn ~]] -data Ames = Ames - { live :: Bool -- ^ whether the listener is on - , ourPort :: Maybe Int --- , threadId :: Thread - , globalDomain :: Maybe Text -- ^ something like "urbit.org" - , imperial :: V.Vector (Maybe GalaxyInfo) + On driver init (u3_ames_io_init): + Basically just allocation. + Set %wake timer. + Record that the UDP listener is not running. + + u3_ames_ef_turf: Called on turf effect. + If we're not live then start the listener. + For now, just use the first turf in the list. + Turf is TLD-first domain name + /org/urbit/dns -> dns.urbit.org + + TODO If we're not live, we should always drop packet sends. + + On u3_ames_io_talk? + *_io_talk is called after everything is up. + Does nothing. + (Normally, this would be where you bring up the UDP listener) + TODO If we're not live, we should always drop packet sends. + + On driver shutdown: + Kill the timer (TODO what is the timer for?) + uv_close(&sam_u->had_u, 0); +-} + +-- TODO Move these to a common module ------------------------------------------ + +type QueueEv = Ev -> STM () + +type EffCb a = a -> IO () + +newtype KingInstance = KingInst Atom + deriving newtype (Eq, Ord, Num, Real, Enum, Integral, FromNoun, ToNoun) + +-------------------------------------------------------------------------------- + +data AmesDrv = AmesDrv + { aIsLive :: IORef Bool + , aWakeTimer :: Async () + , aListener :: Async () } -init :: Ames -init = Ames { live = False - , ourPort = Nothing - , globalDomain = Nothing - , imperial = V.replicate 256 Nothing - } +-------------------------------------------------------------------------------- + +{- + inst -- Process instance number. + who -- Which ship are we? + enqueueEv -- Queue-event action. + mPort -- Explicit port override from command line arguments. + + We ignore the %turf arguments for now. We only have fake ships, + so we don't implement the DNS stuff yet. + + TODO Handle socket exceptions in waitPacket + + 4096 is a reasonable number for recvFrom. Packets of that size are + not possible on the internet. + + TODO log when `sendTo` sent fewer bytes than requested. + + TODO verify that the KingInstances match on effects. +-} +ames :: KingInstance -> Ship -> Maybe Port -> QueueEv + -> ([Ev], Acquire (EffCb NewtEf)) +ames inst who mPort enqueueEv = + ([barnEv], callback . aIsLive <$> mkAcquire start stop) + where + start :: IO AmesDrv + start = do + vLiv <- newIORef False + time <- async runTimer + hear <- async waitPacket + pure $ AmesDrv vLiv time hear + + stop :: AmesDrv -> IO () + stop (AmesDrv{..}) = do + cancel aWakeTimer + cancel aListener + + barnEv, wakeEv :: Ev + barnEv = EvBlip $ BlipEvNewt $ NewtEvBarn (fromIntegral inst, ()) () + wakeEv = EvBlip $ BlipEvAmes $ AmesEvWake () () + + hearEv :: Time.Wen -> PortNumber -> HostAddress -> ByteString -> Ev + hearEv w p a bs = EvBlip $ BlipEvAmes $ AmesEvHear () lane (MkBytes bs) + where lane = If w (fromIntegral p) a + + runTimer :: IO () + runTimer = forever $ do + threadDelay (300 * 1000000) -- 300 seconds + atomically (enqueueEv wakeEv) + + ourPort :: PortNumber + ourPort = mPort & \case Nothing -> shipPort who + Just p -> fromIntegral p + + waitPacket :: IO () + waitPacket = do + s <- socket AF_INET Datagram defaultProtocol + () <- bind s (SockAddrInet ourPort localhost) + forever $ do + (bs, addr) <- recvFrom s 4096 + wen <- Time.now + case addr of + SockAddrInet p a -> atomically $ enqueueEv $ hearEv wen p a bs + _ -> pure () + + callback :: IORef Bool -> NewtEf -> IO () + callback vLive = \case + NewtEfTurf (_id, ()) turfs -> + writeIORef vLive True + + NewtEfSend (_id, ()) lane (MkBytes bs) -> do + live <- readIORef vLive + when live $ do + s <- socket AF_INET Datagram defaultProtocol + laneSockAddr lane & \case + Nothing -> pure () + Just sa -> void (sendTo s bs sa) + +localhost :: HostAddress +localhost = tupleToHostAddress (127,0,0,1) + +laneSockAddr :: Lane -> Maybe SockAddr +laneSockAddr = \case + If _ p a -> pure (SockAddrInet (fromIntegral p) a) + Ix _ p a -> pure (SockAddrInet (fromIntegral p) a) + Is _ mLane _ -> mLane >>= laneSockAddr + +ipv4Addr :: SockAddr -> Maybe (PortNumber, HostAddress) +ipv4Addr = \case + SockAddrInet p a -> Just (p, a) + _ -> Nothing + +shipPort :: Ship -> PortNumber +shipPort s | s < 256 = fromIntegral (31337 + s) +shipPort _ = 0 + +{- +data GalaxyInfo = GalaxyInfo { ip :: IPv4, age :: Time.Unix } turf :: Ames -> [VA.Turf] -> IO Ames turf ames [] = undefined @@ -37,27 +166,11 @@ turf ames (turf:_) = do let t = (mconcat . intersperse "." . fmap unCord . VA.unTurf) turf pure (ames {globalDomain = Just t}) - data NetworkMode = LocalOnlyNetworking | GlobalNetworking -ioStart :: Ames -> NetworkMode -> Int -> Noun -> IO Ames -ioStart ames isLocal defaultPort (Cell _ _) = undefined -ioStart ames isLocal defaultPort (Atom who) = do - let _port = if who < 256 - then computePort isLocal who - else defaultPort - - -- TODO: set up another thread to own the recv socket, which makes the Ovums - -- which get put into the computeQueue, like in _ames_recv_cb. - withSocketsDo $ do - s <- socket AF_INET Datagram 17 - -- bind s (SockAddrInet port ) - pure () - - pure ames - computePort :: NetworkMode -> Atom -> Int computePort LocalOnlyNetworking who = 31337 + (fromIntegral who) computePort GlobalNetworking who = 13337 + (fromIntegral who) +-} diff --git a/pkg/hs-urbit/test/ArvoTests.hs b/pkg/hs-urbit/test/ArvoTests.hs index bbf59be79..bdfd975b5 100644 --- a/pkg/hs-urbit/test/ArvoTests.hs +++ b/pkg/hs-urbit/test/ArvoTests.hs @@ -135,6 +135,7 @@ instance Arbitrary VaneEv where instance Arbitrary ZuseEv where arbitrary = ZEVeer () <$> arb <*> arb <*> arb + -- Generate Arbitrary Values --------------------------------------------------- arb :: Arbitrary a => Gen a diff --git a/pkg/urbit/vere/ames.c b/pkg/urbit/vere/ames.c index b79902250..a5cf29422 100644 --- a/pkg/urbit/vere/ames.c +++ b/pkg/urbit/vere/ames.c @@ -154,6 +154,10 @@ _ames_czar_cb(uv_getaddrinfo_t* adr_u, break; } + // If valid result. + // parse info from result and set address and last time. + // If failed, + // set address to 255.255.255.255 to indicate lookup failure. if ( (AF_INET == rai_u->ai_family) ) { struct sockaddr_in* add_u = (struct sockaddr_in *)rai_u->ai_addr; c3_w old_w = sam_u->imp_w[pac_u->imp_y]; @@ -162,6 +166,8 @@ _ames_czar_cb(uv_getaddrinfo_t* adr_u, sam_u->imp_t[pac_u->imp_y] = now; #if 1 + // If the address lookup gives a new result, or if the last lookup failed: + // Log the change (even if the new lookup failed too?) if ( sam_u->imp_w[pac_u->imp_y] != old_w && sam_u->imp_w[pac_u->imp_y] != 0xffffffff ) { u3_noun wad = u3i_words(1, &sam_u->imp_w[pac_u->imp_y]); @@ -186,7 +192,7 @@ _ames_czar_cb(uv_getaddrinfo_t* adr_u, } -/* _ames_czar(): galaxy address resolution. +/* _ames_czar(): Sent a packet to a galaxy (using DNS address resolution). */ static void _ames_czar(u3_pact* pac_u, c3_c* bos_c) @@ -195,8 +201,10 @@ _ames_czar(u3_pact* pac_u, c3_c* bos_c) u3_pier* pir_u = u3_pier_stub(); u3_ames* sam_u = pir_u->sam_u; + // Determine port based on galaxy port. pac_u->por_s = _ames_czar_port(pac_u->imp_y); + // If fake, send to localhost. if ( c3n == u3_Host.ops_u.net ) { pac_u->pip_w = 0x7f000001; _ames_send(pac_u); @@ -205,6 +213,8 @@ _ames_czar(u3_pact* pac_u, c3_c* bos_c) // if we don't have a galaxy domain, no-op // + // If DNS stuff not set, then we can't resolve galaxy addresses. + // if ( 0 == bos_c ) { u3_noun nam = u3dc("scot", 'p', pac_u->imp_y); c3_c* nam_c = u3r_string(nam); @@ -218,12 +228,16 @@ _ames_czar(u3_pact* pac_u, c3_c* bos_c) time_t now = time(0); // backoff + // if lookup failed recently, drop the packet. if ( (0xffffffff == sam_u->imp_w[pac_u->imp_y]) && (now - sam_u->imp_t[pac_u->imp_y]) < 300 ) { _ames_pact_free(pac_u); return; } + // If we don't already know the galaxy IP. + // Construct the string $galaxyname.urbit.org + // Do a dns lookup if ( (0 == sam_u->imp_w[pac_u->imp_y]) || (now - sam_u->imp_t[pac_u->imp_y]) > 300 ) { /* 5 minute TTL */ u3_noun nam = u3dc("scot", 'p', pac_u->imp_y); @@ -246,12 +260,16 @@ _ames_czar(u3_pact* pac_u, c3_c* bos_c) if ( 0 != (sas_i = uv_getaddrinfo(u3L, adr_u, _ames_czar_cb, pac_u->dns_c, 0, 0)) ) { + // TODO Not sure what this condition is: + // libuv was unable to attempt to do a DNS lookup. u3l_log("ames: %s\n", uv_strerror(sas_i)); _ames_czar_gone(pac_u, now); return; } } } + + // Otherwise send to known address. else { pac_u->pip_w = sam_u->imp_w[pac_u->imp_y]; _ames_send(pac_u); @@ -271,12 +289,16 @@ _ames_lane_ip(u3_noun lan, c3_s* por_s, c3_w* pip_w) return c3y; } break; + + // Never use IPv6, always use fallback and fail if no fallback. case c3__is: { u3_noun pq_lan = u3h(u3t(u3t(lan))); if ( u3_nul == pq_lan ) return c3n; else return _ames_lane_ip(u3t(pq_lan), por_s, pip_w); } break; + + // same behavior as c3__if. case c3__ix: { *por_s = (c3_s) u3h(u3t(u3t(lan))); *pip_w = u3r_word(0, u3t(u3t(u3t(lan)))); @@ -311,22 +333,31 @@ u3_ames_ef_send(u3_pier* pir_u, u3_noun lan, u3_noun pac) u3_pact* pac_u = c3_calloc(sizeof(*pac_u)); + // Parse ipv4 address and port from lane. + // We don't support ipv6, we always use the fallback + // We fail if it doesn't exist. if ( c3y == _ames_lane_ip(lan, &pac_u->por_s, &pac_u->pip_w) ) { pac_u->len_w = u3r_met(3, pac); pac_u->hun_y = c3_malloc(pac_u->len_w); + // Read bytestring from packet atom (length, and byte ptr) u3r_bytes(0, pac_u->len_w, pac_u->hun_y, pac); - if ( 0 == pac_u->pip_w ) { - pac_u->pip_w = 0x7f000001; - pac_u->por_s = pir_u->por_s; + if ( 0 == pac_u->pip_w ) { // if ip address is 0, this is to ourselves. + pac_u->pip_w = 0x7f000001; // set to 127.0.0.1 + pac_u->por_s = pir_u->por_s; // set port to our own port? } + // if ip address is 0.0.1.$x + // Then $x is the galaxy ship number. + // Call ames_czar instead of _ames_send. if ( (0 == (pac_u->pip_w >> 16)) && (1 == (pac_u->pip_w >> 8)) ) { pac_u->imp_y = (pac_u->pip_w & 0xff); _ames_czar(pac_u, sam_u->dns_c); } + + // Otherwise if real OR address is localhost, then send. else if ( (c3y == u3_Host.ops_u.net) || (0x7f000001 == pac_u->pip_w) ) { _ames_send(pac_u); } @@ -395,7 +426,20 @@ _ames_io_start(u3_pier* pir_u) { u3_ames* sam_u = pir_u->sam_u; c3_s por_s = pir_u->por_s; + + // Get the ship name. u3_noun who = u3i_chubs(2, pir_u->who_d); + + // To determinte the port that we will run in: + // + // If galaxy + // If fake: 31337 + ship (also, bind on localhost only) + // If real: 13337 + ship + // + // If not galaxy + // Then use por_s from pier structure. + // This will be zero if -p was not set. + u3_noun rac = u3do("clan:title", u3k(who)); if ( c3__czar == rac ) { @@ -535,6 +579,7 @@ u3_ames_ef_turf(u3_pier* pir_u, u3_noun tuf) u3z(tuf); } + else if ( (c3n == pir_u->fak_o) && (0 == sam_u->dns_c) ) { u3l_log("ames: turf: no domains\n"); } @@ -552,6 +597,7 @@ u3_ames_io_init(u3_pier* pir_u) u3_ames* sam_u = pir_u->sam_u; sam_u->liv = c3n; + // Set %wake timer. uv_timer_init(u3L, &sam_u->tim_u); }