Implement the Ames IO driver (for fake networking only).

This commit is contained in:
Benjamin Summers 2019-07-24 18:59:45 -07:00
parent ba3c06e4d3
commit 06dd05d727
8 changed files with 225 additions and 57 deletions

View File

@ -15,8 +15,8 @@ import qualified Network.HTTP.Types.Method as H
-- Misc Types ------------------------------------------------------------------
type AtomIf = Atom -- @if (TODO: What does this mean?)
type AtomIs = Atom -- @is (TODO: What does this mean?)
type AtomIf = Word32 -- Ipv4 Address (@if)
type AtomIs = Word128 -- Ipv6 Address (@is)
-- Domain Name
newtype Turf = Turf { unTurf :: [Cord] }
@ -129,13 +129,17 @@ deriveNoun ''JsonNode
-- Lanes -----------------------------------------------------------------------
-- Network Port
newtype Port = Port { unPort :: Word }
deriving newtype (Eq, Ord, Show, ToNoun, FromNoun)
newtype Port = Port { unPort :: Word16 }
deriving newtype (Eq, Ord, Show, Enum, Real, Integral, Num, ToNoun, FromNoun)
{-
The `Wen` field is (probably) the last time that we were sure that
this DNS lookup worked. This is set when we receive a %hear event.
-}
data Lane
= If Wen Port AtomIf
| Is Atom (Maybe Lane) AtomIs
| Ix Wen Port AtomIf
= If Wen Port AtomIf -- Ipv4
| Is Atom (Maybe Lane) AtomIs -- Ipv6 with fallback
| Ix Wen Port AtomIf -- Not used (Same behavior as `If`)
deriving (Eq, Ord, Show)
deriveNoun ''Lane
@ -180,6 +184,8 @@ deriveNoun ''Lane
[//term/1 [%init ~]]
TODO The reverse translation is not done yet.
-}
data ReOrg = ReOrg Cord Cord Cord EvilPath Noun

View File

@ -7,11 +7,11 @@ import Arvo.Common (Header, HttpEvent, HttpServerConf, Lane, Method, Mime, Turf)
import Arvo.Common (ReOrg(..), reorgThroughNoun)
-- Newt Effects -- Todo What are these? ----------------------------------------
-- Newt Effects ----------------------------------------------------------------
{-
%turf -- TODO
%send -- TODO
%turf -- Set which domain names we've bound.
%send -- Send a UDP packet.
-}
data NewtEf
= NewtEfTurf (Atom, ()) [Turf]
@ -79,9 +79,9 @@ deriveNoun ''SyncEf
-- UDP Effects -----------------------------------------------------------------
{-
%init -- TODO
%west -- TODO
%woot -- TODO
%init -- "I don't think that's something that can happen"
%west -- "Those also shouldn't happen"
%woot -- "Those also shouldn't happen"
-}
data AmesEf
= AmesEfInit Path ()
@ -135,7 +135,7 @@ data Blit
%init -- TODO
%logo -- Shutdown
%mass -- Measure memory usage (unused)
%send -- TODO
%send -- Send a UDP packet (duplicate of ames %send)
-}
data TermEf
= TermEfBbye Path ()

View File

@ -137,7 +137,7 @@ deriveNoun ''HttpServerReq
-- Ames ------------------------------------------------------------------------
data AmesEv
= AmesEvHear () Lane Atom
= AmesEvHear () Lane Bytes
| AmesEvWake () ()
| AmesEvWant Path Ship Path Noun
| AmesEvCrud Path Cord Tang

View File

@ -6,6 +6,7 @@ module Noun
, module Noun.Jam
, module Noun.Cue
, module Noun.TH
, module Data.Word
, _Cue
, loadFile
) where
@ -20,6 +21,7 @@ import Noun.Core
import Noun.Cue
import Noun.Jam
import Noun.TH
import Data.Word
--------------------------------------------------------------------------------

View File

@ -389,7 +389,7 @@ instance FromNoun Term where -- XX TODO
-- Ship ------------------------------------------------------------------------
newtype Ship = Ship Word128 -- @p
deriving newtype (Eq, Ord, Show, Num, ToNoun, FromNoun)
deriving newtype (Eq, Ord, Show, Enum, Real, Integral, Num, ToNoun, FromNoun)
-- Path ------------------------------------------------------------------------

View File

@ -1,35 +1,164 @@
{-# OPTIONS_GHC -Wwarn #-}
module Urbit.Ames where
import ClassyPrelude
import Data.IP
import Network.Socket
import Arvo
import Data.Acquire
import Network.Socket hiding (recvFrom, sendTo)
import Network.Socket.ByteString
import Noun
import qualified Data.Vector as V
import qualified Urbit.Time as Time
import qualified Vere.Ames as VA
import Control.Concurrent (threadDelay)
import Control.Lens ((&))
import qualified Urbit.Time as Time
--------------------------------------------------------------------------------
data GalaxyInfo = GalaxyInfo { ip :: IPv4, age :: Time.Unix }
{-
On startup (u3_ames_ef_bake):
*_ef_bake means "send any initial events"
Send event: [//newt/u3A->sen [%barn ~]]
data Ames = Ames
{ live :: Bool -- ^ whether the listener is on
, ourPort :: Maybe Int
-- , threadId :: Thread
, globalDomain :: Maybe Text -- ^ something like "urbit.org"
, imperial :: V.Vector (Maybe GalaxyInfo)
On driver init (u3_ames_io_init):
Basically just allocation.
Set %wake timer.
Record that the UDP listener is not running.
u3_ames_ef_turf: Called on turf effect.
If we're not live then start the listener.
For now, just use the first turf in the list.
Turf is TLD-first domain name
/org/urbit/dns -> dns.urbit.org
TODO If we're not live, we should always drop packet sends.
On u3_ames_io_talk?
*_io_talk is called after everything is up.
Does nothing.
(Normally, this would be where you bring up the UDP listener)
TODO If we're not live, we should always drop packet sends.
On driver shutdown:
Kill the timer (TODO what is the timer for?)
uv_close(&sam_u->had_u, 0);
-}
-- TODO Move these to a common module ------------------------------------------
type QueueEv = Ev -> STM ()
type EffCb a = a -> IO ()
newtype KingInstance = KingInst Atom
deriving newtype (Eq, Ord, Num, Real, Enum, Integral, FromNoun, ToNoun)
--------------------------------------------------------------------------------
data AmesDrv = AmesDrv
{ aIsLive :: IORef Bool
, aWakeTimer :: Async ()
, aListener :: Async ()
}
init :: Ames
init = Ames { live = False
, ourPort = Nothing
, globalDomain = Nothing
, imperial = V.replicate 256 Nothing
}
--------------------------------------------------------------------------------
{-
inst -- Process instance number.
who -- Which ship are we?
enqueueEv -- Queue-event action.
mPort -- Explicit port override from command line arguments.
We ignore the %turf arguments for now. We only have fake ships,
so we don't implement the DNS stuff yet.
TODO Handle socket exceptions in waitPacket
4096 is a reasonable number for recvFrom. Packets of that size are
not possible on the internet.
TODO log when `sendTo` sent fewer bytes than requested.
TODO verify that the KingInstances match on effects.
-}
ames :: KingInstance -> Ship -> Maybe Port -> QueueEv
-> ([Ev], Acquire (EffCb NewtEf))
ames inst who mPort enqueueEv =
([barnEv], callback . aIsLive <$> mkAcquire start stop)
where
start :: IO AmesDrv
start = do
vLiv <- newIORef False
time <- async runTimer
hear <- async waitPacket
pure $ AmesDrv vLiv time hear
stop :: AmesDrv -> IO ()
stop (AmesDrv{..}) = do
cancel aWakeTimer
cancel aListener
barnEv, wakeEv :: Ev
barnEv = EvBlip $ BlipEvNewt $ NewtEvBarn (fromIntegral inst, ()) ()
wakeEv = EvBlip $ BlipEvAmes $ AmesEvWake () ()
hearEv :: Time.Wen -> PortNumber -> HostAddress -> ByteString -> Ev
hearEv w p a bs = EvBlip $ BlipEvAmes $ AmesEvHear () lane (MkBytes bs)
where lane = If w (fromIntegral p) a
runTimer :: IO ()
runTimer = forever $ do
threadDelay (300 * 1000000) -- 300 seconds
atomically (enqueueEv wakeEv)
ourPort :: PortNumber
ourPort = mPort & \case Nothing -> shipPort who
Just p -> fromIntegral p
waitPacket :: IO ()
waitPacket = do
s <- socket AF_INET Datagram defaultProtocol
() <- bind s (SockAddrInet ourPort localhost)
forever $ do
(bs, addr) <- recvFrom s 4096
wen <- Time.now
case addr of
SockAddrInet p a -> atomically $ enqueueEv $ hearEv wen p a bs
_ -> pure ()
callback :: IORef Bool -> NewtEf -> IO ()
callback vLive = \case
NewtEfTurf (_id, ()) turfs ->
writeIORef vLive True
NewtEfSend (_id, ()) lane (MkBytes bs) -> do
live <- readIORef vLive
when live $ do
s <- socket AF_INET Datagram defaultProtocol
laneSockAddr lane & \case
Nothing -> pure ()
Just sa -> void (sendTo s bs sa)
localhost :: HostAddress
localhost = tupleToHostAddress (127,0,0,1)
laneSockAddr :: Lane -> Maybe SockAddr
laneSockAddr = \case
If _ p a -> pure (SockAddrInet (fromIntegral p) a)
Ix _ p a -> pure (SockAddrInet (fromIntegral p) a)
Is _ mLane _ -> mLane >>= laneSockAddr
ipv4Addr :: SockAddr -> Maybe (PortNumber, HostAddress)
ipv4Addr = \case
SockAddrInet p a -> Just (p, a)
_ -> Nothing
shipPort :: Ship -> PortNumber
shipPort s | s < 256 = fromIntegral (31337 + s)
shipPort _ = 0
{-
data GalaxyInfo = GalaxyInfo { ip :: IPv4, age :: Time.Unix }
turf :: Ames -> [VA.Turf] -> IO Ames
turf ames [] = undefined
@ -37,27 +166,11 @@ turf ames (turf:_) = do
let t = (mconcat . intersperse "." . fmap unCord . VA.unTurf) turf
pure (ames {globalDomain = Just t})
data NetworkMode
= LocalOnlyNetworking
| GlobalNetworking
ioStart :: Ames -> NetworkMode -> Int -> Noun -> IO Ames
ioStart ames isLocal defaultPort (Cell _ _) = undefined
ioStart ames isLocal defaultPort (Atom who) = do
let _port = if who < 256
then computePort isLocal who
else defaultPort
-- TODO: set up another thread to own the recv socket, which makes the Ovums
-- which get put into the computeQueue, like in _ames_recv_cb.
withSocketsDo $ do
s <- socket AF_INET Datagram 17
-- bind s (SockAddrInet port )
pure ()
pure ames
computePort :: NetworkMode -> Atom -> Int
computePort LocalOnlyNetworking who = 31337 + (fromIntegral who)
computePort GlobalNetworking who = 13337 + (fromIntegral who)
-}

View File

@ -135,6 +135,7 @@ instance Arbitrary VaneEv where
instance Arbitrary ZuseEv where
arbitrary = ZEVeer () <$> arb <*> arb <*> arb
-- Generate Arbitrary Values ---------------------------------------------------
arb :: Arbitrary a => Gen a

View File

@ -154,6 +154,10 @@ _ames_czar_cb(uv_getaddrinfo_t* adr_u,
break;
}
// If valid result.
// parse info from result and set address and last time.
// If failed,
// set address to 255.255.255.255 to indicate lookup failure.
if ( (AF_INET == rai_u->ai_family) ) {
struct sockaddr_in* add_u = (struct sockaddr_in *)rai_u->ai_addr;
c3_w old_w = sam_u->imp_w[pac_u->imp_y];
@ -162,6 +166,8 @@ _ames_czar_cb(uv_getaddrinfo_t* adr_u,
sam_u->imp_t[pac_u->imp_y] = now;
#if 1
// If the address lookup gives a new result, or if the last lookup failed:
// Log the change (even if the new lookup failed too?)
if ( sam_u->imp_w[pac_u->imp_y] != old_w
&& sam_u->imp_w[pac_u->imp_y] != 0xffffffff ) {
u3_noun wad = u3i_words(1, &sam_u->imp_w[pac_u->imp_y]);
@ -186,7 +192,7 @@ _ames_czar_cb(uv_getaddrinfo_t* adr_u,
}
/* _ames_czar(): galaxy address resolution.
/* _ames_czar(): Sent a packet to a galaxy (using DNS address resolution).
*/
static void
_ames_czar(u3_pact* pac_u, c3_c* bos_c)
@ -195,8 +201,10 @@ _ames_czar(u3_pact* pac_u, c3_c* bos_c)
u3_pier* pir_u = u3_pier_stub();
u3_ames* sam_u = pir_u->sam_u;
// Determine port based on galaxy port.
pac_u->por_s = _ames_czar_port(pac_u->imp_y);
// If fake, send to localhost.
if ( c3n == u3_Host.ops_u.net ) {
pac_u->pip_w = 0x7f000001;
_ames_send(pac_u);
@ -205,6 +213,8 @@ _ames_czar(u3_pact* pac_u, c3_c* bos_c)
// if we don't have a galaxy domain, no-op
//
// If DNS stuff not set, then we can't resolve galaxy addresses.
//
if ( 0 == bos_c ) {
u3_noun nam = u3dc("scot", 'p', pac_u->imp_y);
c3_c* nam_c = u3r_string(nam);
@ -218,12 +228,16 @@ _ames_czar(u3_pact* pac_u, c3_c* bos_c)
time_t now = time(0);
// backoff
// if lookup failed recently, drop the packet.
if ( (0xffffffff == sam_u->imp_w[pac_u->imp_y]) &&
(now - sam_u->imp_t[pac_u->imp_y]) < 300 ) {
_ames_pact_free(pac_u);
return;
}
// If we don't already know the galaxy IP.
// Construct the string $galaxyname.urbit.org
// Do a dns lookup
if ( (0 == sam_u->imp_w[pac_u->imp_y]) ||
(now - sam_u->imp_t[pac_u->imp_y]) > 300 ) { /* 5 minute TTL */
u3_noun nam = u3dc("scot", 'p', pac_u->imp_y);
@ -246,12 +260,16 @@ _ames_czar(u3_pact* pac_u, c3_c* bos_c)
if ( 0 != (sas_i = uv_getaddrinfo(u3L, adr_u,
_ames_czar_cb,
pac_u->dns_c, 0, 0)) ) {
// TODO Not sure what this condition is:
// libuv was unable to attempt to do a DNS lookup.
u3l_log("ames: %s\n", uv_strerror(sas_i));
_ames_czar_gone(pac_u, now);
return;
}
}
}
// Otherwise send to known address.
else {
pac_u->pip_w = sam_u->imp_w[pac_u->imp_y];
_ames_send(pac_u);
@ -271,12 +289,16 @@ _ames_lane_ip(u3_noun lan, c3_s* por_s, c3_w* pip_w)
return c3y;
} break;
// Never use IPv6, always use fallback and fail if no fallback.
case c3__is: {
u3_noun pq_lan = u3h(u3t(u3t(lan)));
if ( u3_nul == pq_lan ) return c3n;
else return _ames_lane_ip(u3t(pq_lan), por_s, pip_w);
} break;
// same behavior as c3__if.
case c3__ix: {
*por_s = (c3_s) u3h(u3t(u3t(lan)));
*pip_w = u3r_word(0, u3t(u3t(u3t(lan))));
@ -311,22 +333,31 @@ u3_ames_ef_send(u3_pier* pir_u, u3_noun lan, u3_noun pac)
u3_pact* pac_u = c3_calloc(sizeof(*pac_u));
// Parse ipv4 address and port from lane.
// We don't support ipv6, we always use the fallback
// We fail if it doesn't exist.
if ( c3y == _ames_lane_ip(lan, &pac_u->por_s, &pac_u->pip_w) ) {
pac_u->len_w = u3r_met(3, pac);
pac_u->hun_y = c3_malloc(pac_u->len_w);
// Read bytestring from packet atom (length, and byte ptr)
u3r_bytes(0, pac_u->len_w, pac_u->hun_y, pac);
if ( 0 == pac_u->pip_w ) {
pac_u->pip_w = 0x7f000001;
pac_u->por_s = pir_u->por_s;
if ( 0 == pac_u->pip_w ) { // if ip address is 0, this is to ourselves.
pac_u->pip_w = 0x7f000001; // set to 127.0.0.1
pac_u->por_s = pir_u->por_s; // set port to our own port?
}
// if ip address is 0.0.1.$x
// Then $x is the galaxy ship number.
// Call ames_czar instead of _ames_send.
if ( (0 == (pac_u->pip_w >> 16)) && (1 == (pac_u->pip_w >> 8)) ) {
pac_u->imp_y = (pac_u->pip_w & 0xff);
_ames_czar(pac_u, sam_u->dns_c);
}
// Otherwise if real OR address is localhost, then send.
else if ( (c3y == u3_Host.ops_u.net) || (0x7f000001 == pac_u->pip_w) ) {
_ames_send(pac_u);
}
@ -395,7 +426,20 @@ _ames_io_start(u3_pier* pir_u)
{
u3_ames* sam_u = pir_u->sam_u;
c3_s por_s = pir_u->por_s;
// Get the ship name.
u3_noun who = u3i_chubs(2, pir_u->who_d);
// To determinte the port that we will run in:
//
// If galaxy
// If fake: 31337 + ship (also, bind on localhost only)
// If real: 13337 + ship
//
// If not galaxy
// Then use por_s from pier structure.
// This will be zero if -p was not set.
u3_noun rac = u3do("clan:title", u3k(who));
if ( c3__czar == rac ) {
@ -535,6 +579,7 @@ u3_ames_ef_turf(u3_pier* pir_u, u3_noun tuf)
u3z(tuf);
}
else if ( (c3n == pir_u->fak_o) && (0 == sam_u->dns_c) ) {
u3l_log("ames: turf: no domains\n");
}
@ -552,6 +597,7 @@ u3_ames_io_init(u3_pier* pir_u)
u3_ames* sam_u = pir_u->sam_u;
sam_u->liv = c3n;
// Set %wake timer.
uv_timer_init(u3L, &sam_u->tim_u);
}