mirror of
https://github.com/urbit/shrub.git
synced 2024-12-23 19:05:48 +03:00
First Ames test passes.
Wrote a simple test for Ames and fixed some bugs: - Properly handle to-galaxy lanes. - Use the same socket for send and recv. Otherwise, sending happens from a different port. - Properly close the socket on shutdown. Otherwise only the first test works.
This commit is contained in:
parent
06dd05d727
commit
f08b8ae8e5
@ -2,9 +2,10 @@ module Arvo.Common
|
||||
( NounTree(..), NounMap, NounSet
|
||||
, Json, JsonNode(..)
|
||||
, Desk(..), Mime(..)
|
||||
, AtomIf, AtomIs, Lane(..), Port(..), Turf(..)
|
||||
, Lane(..), Port(..), Turf(..)
|
||||
, HttpServerConf(..), HttpEvent(..), PEM, Method, Header
|
||||
, ReOrg(..), reorgThroughNoun
|
||||
, AmesDest(..), Ipv4(..), Ipv6(..), Galaxy(..)
|
||||
) where
|
||||
|
||||
import Urbit.Time
|
||||
@ -15,10 +16,11 @@ import qualified Network.HTTP.Types.Method as H
|
||||
|
||||
-- Misc Types ------------------------------------------------------------------
|
||||
|
||||
type AtomIf = Word32 -- Ipv4 Address (@if)
|
||||
type AtomIs = Word128 -- Ipv6 Address (@is)
|
||||
{-
|
||||
Domain Name in TLD order:
|
||||
|
||||
-- Domain Name
|
||||
["org", "urbit", "dns"] -> dns.urbit.org
|
||||
-}
|
||||
newtype Turf = Turf { unTurf :: [Cord] }
|
||||
deriving newtype (Eq, Ord, Show, ToNoun, FromNoun)
|
||||
|
||||
@ -126,24 +128,57 @@ data JsonNode
|
||||
deriveNoun ''JsonNode
|
||||
|
||||
|
||||
-- Lanes -----------------------------------------------------------------------
|
||||
-- Lanes and Ames Destinations -------------------------------------------------
|
||||
|
||||
-- Network Port
|
||||
newtype Port = Port { unPort :: Word16 }
|
||||
deriving newtype (Eq, Ord, Show, Enum, Real, Integral, Num, ToNoun, FromNoun)
|
||||
|
||||
-- @if
|
||||
newtype Ipv4 = Ipv4 { unIpv4 :: Word32 }
|
||||
deriving newtype (Eq, Ord, Show, Enum, Real, Integral, Num, ToNoun, FromNoun)
|
||||
|
||||
-- @is
|
||||
newtype Ipv6 = Ipv6 { unIpv6 :: Word128 }
|
||||
deriving newtype (Eq, Ord, Show, Enum, Real, Integral, Num, ToNoun, FromNoun)
|
||||
|
||||
newtype Galaxy = Galaxy { unGalaxy :: Word8 }
|
||||
deriving newtype (Eq, Ord, Show, Enum, Real, Integral, Num, ToNoun, FromNoun)
|
||||
|
||||
{-
|
||||
The `Wen` field is (probably) the last time that we were sure that
|
||||
this DNS lookup worked. This is set when we receive a %hear event.
|
||||
The `Wen` field is (IIUC) the last time that we were sure that this
|
||||
DNS lookup worked. This is set when we receive a %hear event.
|
||||
-}
|
||||
data Lane
|
||||
= If Wen Port AtomIf -- Ipv4
|
||||
| Is Atom (Maybe Lane) AtomIs -- Ipv6 with fallback
|
||||
| Ix Wen Port AtomIf -- Not used (Same behavior as `If`)
|
||||
= If Wen Port Ipv4 -- Ipv4
|
||||
| Is Port (Maybe Lane) Ipv6 -- Ipv6 with fallback
|
||||
| Ix Wen Port Ipv4 -- Not used (Same behavior as `If`)
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
deriveNoun ''Lane
|
||||
|
||||
data AmesDest
|
||||
= ADGala Wen Galaxy
|
||||
| ADIpv4 Wen Port Ipv4
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
instance ToNoun AmesDest where
|
||||
toNoun = toNoun . \case
|
||||
ADGala w g -> If w 0 (256 + fromIntegral g)
|
||||
ADIpv4 w p a -> If w p a
|
||||
|
||||
instance FromNoun AmesDest where
|
||||
parseNoun = named "AmesDest" . (parseNoun >=> parseLane)
|
||||
where
|
||||
parseLane :: Lane -> Parser AmesDest
|
||||
parseLane = \case
|
||||
If w _ 0 -> fail "Sending to 0.0.0.0 is not supported"
|
||||
If w _ a | a>255 && a<512 -> pure $ ADGala w $ fromIntegral $ a `mod` 256
|
||||
If w p a -> pure $ ADIpv4 w p a
|
||||
Ix w p a -> parseLane (If w p a)
|
||||
Is _ (Just fb) _ -> parseLane fb
|
||||
Is _ Nothing _ -> fail "ipv6 is not supported"
|
||||
|
||||
|
||||
-- Path+Tagged Restructuring ---------------------------------------------------
|
||||
|
||||
|
@ -3,7 +3,8 @@ module Arvo.Effect where
|
||||
import Urbit.Time
|
||||
import UrbitPrelude
|
||||
|
||||
import Arvo.Common (Header, HttpEvent, HttpServerConf, Lane, Method, Mime, Turf)
|
||||
import Arvo.Common (Header, HttpEvent, HttpServerConf, Method, Mime)
|
||||
import Arvo.Common (AmesDest, Turf)
|
||||
import Arvo.Common (ReOrg(..), reorgThroughNoun)
|
||||
|
||||
|
||||
@ -15,7 +16,7 @@ import Arvo.Common (ReOrg(..), reorgThroughNoun)
|
||||
-}
|
||||
data NewtEf
|
||||
= NewtEfTurf (Atom, ()) [Turf]
|
||||
| NewtEfSend (Atom, ()) Lane Bytes
|
||||
| NewtEfSend (Atom, ()) AmesDest Bytes
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
deriveNoun ''NewtEf
|
||||
@ -143,7 +144,7 @@ data TermEf
|
||||
| TermEfInit (Decimal, ()) ()
|
||||
| TermEfLogo Path ()
|
||||
| TermEfMass Path Noun -- Irrelevant
|
||||
| TermEfSend Path Lane Bytes
|
||||
| TermEfSend Path AmesDest Bytes
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
deriveNoun ''Blit
|
||||
|
@ -4,7 +4,8 @@ import UrbitPrelude hiding (Term)
|
||||
import Urbit.Time
|
||||
|
||||
import Arvo.Common (NounMap, NounSet)
|
||||
import Arvo.Common (AtomIf, AtomIs, Desk, Lane, Mime, Turf)
|
||||
import Arvo.Common (Desk, Mime)
|
||||
import Arvo.Common (Ipv4, Ipv6, AmesDest, Turf)
|
||||
import Arvo.Common (HttpEvent, HttpServerConf)
|
||||
import Arvo.Common (ReOrg(..), reorgThroughNoun)
|
||||
|
||||
@ -20,7 +21,7 @@ type Oath = Atom -- Signature
|
||||
|
||||
-- Parsed URLs -----------------------------------------------------------------
|
||||
|
||||
type Host = Either Turf AtomIf
|
||||
type Host = Either Turf Ipv4
|
||||
type Hart = (Bool, Maybe Atom, Host)
|
||||
type Pork = (Maybe Knot, [Cord])
|
||||
type Quay = [(Cord, Cord)]
|
||||
@ -93,8 +94,8 @@ deriveNoun ''Dawn
|
||||
type ServerId = Atom
|
||||
|
||||
data Address
|
||||
= AIpv4 AtomIf
|
||||
| AIpv6 AtomIs
|
||||
= AIpv4 Ipv4
|
||||
| AIpv6 Ipv6
|
||||
| AAmes Ship
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
@ -137,7 +138,7 @@ deriveNoun ''HttpServerReq
|
||||
-- Ames ------------------------------------------------------------------------
|
||||
|
||||
data AmesEv
|
||||
= AmesEvHear () Lane Bytes
|
||||
= AmesEvHear () AmesDest Bytes
|
||||
| AmesEvWake () ()
|
||||
| AmesEvWant Path Ship Path Noun
|
||||
| AmesEvCrud Path Cord Tang
|
||||
|
@ -13,36 +13,9 @@ import Control.Lens ((&))
|
||||
|
||||
import qualified Urbit.Time as Time
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
{-
|
||||
On startup (u3_ames_ef_bake):
|
||||
*_ef_bake means "send any initial events"
|
||||
Send event: [//newt/u3A->sen [%barn ~]]
|
||||
-- Lane Destinations -----------------------------------------------------------
|
||||
|
||||
On driver init (u3_ames_io_init):
|
||||
Basically just allocation.
|
||||
Set %wake timer.
|
||||
Record that the UDP listener is not running.
|
||||
|
||||
u3_ames_ef_turf: Called on turf effect.
|
||||
If we're not live then start the listener.
|
||||
For now, just use the first turf in the list.
|
||||
Turf is TLD-first domain name
|
||||
/org/urbit/dns -> dns.urbit.org
|
||||
|
||||
TODO If we're not live, we should always drop packet sends.
|
||||
|
||||
On u3_ames_io_talk?
|
||||
*_io_talk is called after everything is up.
|
||||
Does nothing.
|
||||
(Normally, this would be where you bring up the UDP listener)
|
||||
TODO If we're not live, we should always drop packet sends.
|
||||
|
||||
On driver shutdown:
|
||||
Kill the timer (TODO what is the timer for?)
|
||||
uv_close(&sam_u->had_u, 0);
|
||||
-}
|
||||
|
||||
-- TODO Move these to a common module ------------------------------------------
|
||||
|
||||
@ -53,14 +26,41 @@ type EffCb a = a -> IO ()
|
||||
newtype KingInstance = KingInst Atom
|
||||
deriving newtype (Eq, Ord, Num, Real, Enum, Integral, FromNoun, ToNoun)
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
-- Utils -----------------------------------------------------------------------
|
||||
|
||||
data AmesDrv = AmesDrv
|
||||
{ aIsLive :: IORef Bool
|
||||
, aSocket :: Socket
|
||||
, aWakeTimer :: Async ()
|
||||
, aListener :: Async ()
|
||||
}
|
||||
|
||||
galaxyPort :: Galaxy -> PortNumber
|
||||
galaxyPort (Galaxy g) = fromIntegral g + 31337
|
||||
|
||||
listenPort :: Ship -> PortNumber
|
||||
listenPort s | s < 256 = galaxyPort (fromIntegral s)
|
||||
listenPort _ = 0
|
||||
|
||||
localhost :: HostAddress
|
||||
localhost = tupleToHostAddress (127,0,0,1)
|
||||
|
||||
okayFakeAddr :: AmesDest -> Bool
|
||||
okayFakeAddr = \case
|
||||
ADGala _ _ -> True
|
||||
ADIpv4 _ p (Ipv4 a) -> a == localhost
|
||||
|
||||
destSockAddr :: AmesDest -> SockAddr
|
||||
destSockAddr = \case
|
||||
ADGala _ g -> SockAddrInet (galaxyPort g) localhost
|
||||
ADIpv4 _ p a -> SockAddrInet (fromIntegral p) (unIpv4 a)
|
||||
|
||||
ipv4Addr :: SockAddr -> Maybe (PortNumber, HostAddress)
|
||||
ipv4Addr = \case
|
||||
SockAddrInet p a -> Just (p, a)
|
||||
_ -> Nothing
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
{-
|
||||
@ -84,27 +84,39 @@ data AmesDrv = AmesDrv
|
||||
ames :: KingInstance -> Ship -> Maybe Port -> QueueEv
|
||||
-> ([Ev], Acquire (EffCb NewtEf))
|
||||
ames inst who mPort enqueueEv =
|
||||
([barnEv], callback . aIsLive <$> mkAcquire start stop)
|
||||
(initialEvents, runAmes)
|
||||
where
|
||||
initialEvents :: [Ev]
|
||||
initialEvents = [barnEv]
|
||||
|
||||
runAmes :: Acquire (EffCb NewtEf)
|
||||
runAmes = do
|
||||
drv <- mkAcquire start stop
|
||||
pure (handleEffect drv)
|
||||
|
||||
start :: IO AmesDrv
|
||||
start = do
|
||||
vLiv <- newIORef False
|
||||
time <- async runTimer
|
||||
hear <- async waitPacket
|
||||
pure $ AmesDrv vLiv time hear
|
||||
sock <- bindSock
|
||||
hear <- async (waitPacket sock)
|
||||
pure $ AmesDrv vLiv sock time hear
|
||||
|
||||
stop :: AmesDrv -> IO ()
|
||||
stop (AmesDrv{..}) = do
|
||||
cancel aWakeTimer
|
||||
cancel aListener
|
||||
close' aSocket
|
||||
|
||||
barnEv, wakeEv :: Ev
|
||||
barnEv :: Ev
|
||||
barnEv = EvBlip $ BlipEvNewt $ NewtEvBarn (fromIntegral inst, ()) ()
|
||||
|
||||
wakeEv :: Ev
|
||||
wakeEv = EvBlip $ BlipEvAmes $ AmesEvWake () ()
|
||||
|
||||
hearEv :: Time.Wen -> PortNumber -> HostAddress -> ByteString -> Ev
|
||||
hearEv w p a bs = EvBlip $ BlipEvAmes $ AmesEvHear () lane (MkBytes bs)
|
||||
where lane = If w (fromIntegral p) a
|
||||
hearEv w p a bs = EvBlip $ BlipEvAmes $ AmesEvHear () dest (MkBytes bs)
|
||||
where dest = ADIpv4 w (fromIntegral p) (Ipv4 a)
|
||||
|
||||
runTimer :: IO ()
|
||||
runTimer = forever $ do
|
||||
@ -112,50 +124,33 @@ ames inst who mPort enqueueEv =
|
||||
atomically (enqueueEv wakeEv)
|
||||
|
||||
ourPort :: PortNumber
|
||||
ourPort = mPort & \case Nothing -> shipPort who
|
||||
ourPort = mPort & \case Nothing -> listenPort who
|
||||
Just p -> fromIntegral p
|
||||
|
||||
waitPacket :: IO ()
|
||||
waitPacket = do
|
||||
bindSock :: IO Socket
|
||||
bindSock = do
|
||||
s <- socket AF_INET Datagram defaultProtocol
|
||||
() <- bind s (SockAddrInet ourPort localhost)
|
||||
forever $ do
|
||||
(bs, addr) <- recvFrom s 4096
|
||||
wen <- Time.now
|
||||
case addr of
|
||||
SockAddrInet p a -> atomically $ enqueueEv $ hearEv wen p a bs
|
||||
_ -> pure ()
|
||||
pure s
|
||||
|
||||
callback :: IORef Bool -> NewtEf -> IO ()
|
||||
callback vLive = \case
|
||||
NewtEfTurf (_id, ()) turfs ->
|
||||
writeIORef vLive True
|
||||
waitPacket :: Socket -> IO ()
|
||||
waitPacket s = forever $ do
|
||||
(bs, addr) <- recvFrom s 4096
|
||||
wen <- Time.now
|
||||
case addr of
|
||||
SockAddrInet p a -> atomically (enqueueEv $ hearEv wen p a bs)
|
||||
_ -> pure ()
|
||||
|
||||
NewtEfSend (_id, ()) lane (MkBytes bs) -> do
|
||||
live <- readIORef vLive
|
||||
handleEffect :: AmesDrv -> NewtEf -> IO ()
|
||||
handleEffect AmesDrv{..} = \case
|
||||
NewtEfTurf (_id, ()) turfs -> do
|
||||
writeIORef aIsLive True
|
||||
|
||||
NewtEfSend (_id, ()) dest (MkBytes bs) -> do
|
||||
live <- readIORef aIsLive
|
||||
when live $ do
|
||||
s <- socket AF_INET Datagram defaultProtocol
|
||||
laneSockAddr lane & \case
|
||||
Nothing -> pure ()
|
||||
Just sa -> void (sendTo s bs sa)
|
||||
|
||||
localhost :: HostAddress
|
||||
localhost = tupleToHostAddress (127,0,0,1)
|
||||
|
||||
laneSockAddr :: Lane -> Maybe SockAddr
|
||||
laneSockAddr = \case
|
||||
If _ p a -> pure (SockAddrInet (fromIntegral p) a)
|
||||
Ix _ p a -> pure (SockAddrInet (fromIntegral p) a)
|
||||
Is _ mLane _ -> mLane >>= laneSockAddr
|
||||
|
||||
ipv4Addr :: SockAddr -> Maybe (PortNumber, HostAddress)
|
||||
ipv4Addr = \case
|
||||
SockAddrInet p a -> Just (p, a)
|
||||
_ -> Nothing
|
||||
|
||||
shipPort :: Ship -> PortNumber
|
||||
shipPort s | s < 256 = fromIntegral (31337 + s)
|
||||
shipPort _ = 0
|
||||
when (okayFakeAddr dest) $ do
|
||||
void $ sendTo aSocket bs $ destSockAddr dest
|
||||
|
||||
{-
|
||||
data GalaxyInfo = GalaxyInfo { ip :: IPv4, age :: Time.Unix }
|
||||
|
106
pkg/hs-urbit/test/AmesTests.hs
Normal file
106
pkg/hs-urbit/test/AmesTests.hs
Normal file
@ -0,0 +1,106 @@
|
||||
module AmesTests (tests) where
|
||||
|
||||
import Data.Acquire
|
||||
import Test.QuickCheck hiding ((.&.))
|
||||
import Test.Tasty
|
||||
import Test.Tasty.QuickCheck
|
||||
import Test.Tasty.TH
|
||||
import UrbitPrelude
|
||||
import Vere.Log
|
||||
import Vere.Pier.Types
|
||||
import Data.Conduit
|
||||
import Data.Conduit.List
|
||||
import Urbit.Ames
|
||||
import Arvo
|
||||
import Noun
|
||||
|
||||
import Control.Concurrent (threadDelay, runInBoundThread)
|
||||
import Data.LargeWord (LargeKey(..))
|
||||
import GHC.Natural (Natural)
|
||||
|
||||
import Urbit.Time
|
||||
import qualified Vere.Log as Log
|
||||
|
||||
|
||||
-- Utils -----------------------------------------------------------------------
|
||||
|
||||
proc :: KingInstance
|
||||
proc = KingInst 0
|
||||
|
||||
turfEf :: NewtEf
|
||||
turfEf = NewtEfTurf (0, ()) []
|
||||
|
||||
sendEf :: Wen -> Bytes -> NewtEf
|
||||
sendEf w bs = NewtEfSend (0, ()) (ADGala w 0) bs
|
||||
|
||||
zodSelfMsg :: Property
|
||||
zodSelfMsg = forAll arbitrary (ioProperty . runTest)
|
||||
where
|
||||
runTest :: Natural -> IO Bool
|
||||
runTest val = do
|
||||
q <- newTQueueIO
|
||||
|
||||
let (amesBorn, driver) =
|
||||
ames proc (Ship 0) Nothing (writeTQueue q)
|
||||
|
||||
with driver $ \cb -> do
|
||||
|
||||
cb turfEf
|
||||
|
||||
let asdf = MkBytes "asdf"
|
||||
|
||||
tSend <- async $ forever $ do
|
||||
threadDelay 1_000
|
||||
wen <- now
|
||||
cb (sendEf wen asdf)
|
||||
threadDelay 10_000
|
||||
|
||||
let loop = do
|
||||
atomically (readTQueue q) >>= \case
|
||||
EvBlip (BlipEvAmes (AmesEvWake () ())) -> loop
|
||||
EvBlip (BlipEvAmes (AmesEvHear () _ bs)) -> pure (bs == asdf)
|
||||
_ -> pure False
|
||||
res <- loop
|
||||
cancel tSend
|
||||
pure res
|
||||
|
||||
|
||||
tests :: TestTree
|
||||
tests =
|
||||
testGroup "Ames"
|
||||
[ localOption (QuickCheckTests 10) $
|
||||
testProperty "Zod can send a message to itself" $
|
||||
zodSelfMsg
|
||||
]
|
||||
|
||||
|
||||
-- Generate Arbitrary Values ---------------------------------------------------
|
||||
|
||||
arb :: Arbitrary a => Gen a
|
||||
arb = arbitrary
|
||||
|
||||
instance Arbitrary Ipv4 where arbitrary = Ipv4 <$> arb
|
||||
instance Arbitrary Port where arbitrary = Port <$> arb
|
||||
instance Arbitrary Wen where arbitrary = Wen <$> arb
|
||||
instance Arbitrary Gap where arbitrary = Gap . abs <$> arb
|
||||
instance Arbitrary Galaxy where arbitrary = Galaxy <$> arb
|
||||
|
||||
instance Arbitrary ByteString where
|
||||
arbitrary = pack <$> arbitrary
|
||||
|
||||
instance Arbitrary Natural where
|
||||
arbitrary = fromIntegral . abs <$> (arbitrary :: Gen Integer)
|
||||
|
||||
instance (Arbitrary a, Arbitrary b) => Arbitrary (LargeKey a b) where
|
||||
arbitrary = LargeKey <$> arb <*> arb
|
||||
|
||||
instance Arbitrary AmesDest where
|
||||
arbitrary = oneof [ ADGala <$> arb <*> arb
|
||||
, ADIpv4 <$> arb <*> arb <*> arb
|
||||
]
|
||||
|
||||
instance Arbitrary Ship where
|
||||
arbitrary = Ship <$> arb
|
||||
|
||||
instance Arbitrary LogIdentity where
|
||||
arbitrary = LogIdentity <$> arb <*> arb <*> arb
|
@ -83,9 +83,16 @@ instance Arbitrary File where arbitrary = File <$> arb
|
||||
instance Arbitrary Cord where arbitrary = Cord <$> arb
|
||||
instance Arbitrary Wen where arbitrary = Wen <$> arb
|
||||
instance Arbitrary Gap where arbitrary = Gap . abs <$> arb
|
||||
instance Arbitrary Galaxy where arbitrary = Galaxy <$> arb
|
||||
instance Arbitrary Port where arbitrary = Port <$> arb
|
||||
instance Arbitrary Ship where arbitrary = Ship <$> arb
|
||||
instance Arbitrary Address where arbitrary = AAmes <$> arb
|
||||
instance Arbitrary Ipv4 where arbitrary = Ipv4 <$> arb
|
||||
|
||||
instance Arbitrary AmesDest where
|
||||
arbitrary = oneof [ ADGala <$> arb <*> arb
|
||||
, ADIpv4 <$> arb <*> arb <*> arb
|
||||
]
|
||||
|
||||
instance Arbitrary Lane where
|
||||
arbitrary = If <$> arb <*> arb <*> arb
|
||||
|
@ -12,11 +12,13 @@ import Control.Concurrent
|
||||
import qualified LogTests
|
||||
import qualified DeriveNounTests
|
||||
import qualified ArvoTests
|
||||
import qualified AmesTests
|
||||
|
||||
main :: IO ()
|
||||
main =
|
||||
defaultMain $ testGroup "Urbit"
|
||||
[ LogTests.tests
|
||||
, DeriveNounTests.tests
|
||||
, ArvoTests.tests
|
||||
[ -- LogTests.tests
|
||||
-- , DeriveNounTests.tests
|
||||
-- , ArvoTests.tests
|
||||
AmesTests.tests
|
||||
]
|
||||
|
Loading…
Reference in New Issue
Block a user