mirror of
https://github.com/ilyakooo0/urbit.git
synced 2024-12-16 02:22:12 +03:00
Merge pull request #4744 from urbit/pp/alley
king: fix lanes yet again, also reuse udp port
This commit is contained in:
commit
dd230875cd
@ -25,10 +25,10 @@ module Urbit.Arvo.Common
|
||||
import Urbit.Prelude
|
||||
|
||||
import Control.Monad.Fail (fail)
|
||||
import Data.Bits
|
||||
import Data.Serialize
|
||||
|
||||
import qualified Network.HTTP.Types.Method as H
|
||||
import qualified Network.Socket as N
|
||||
import qualified Urbit.Ob as Ob
|
||||
|
||||
|
||||
@ -159,6 +159,19 @@ deriveNoun ''JsonNode
|
||||
|
||||
-- Ames Destinations -------------------------------------------------
|
||||
|
||||
serializeToNoun :: Serialize a => a -> Noun
|
||||
serializeToNoun = A . bytesAtom . encode
|
||||
|
||||
serializeParseNoun :: Serialize a => String -> Int -> Noun -> Parser a
|
||||
serializeParseNoun desc len = named (pack desc) . \case
|
||||
A (atomBytes -> bs)
|
||||
-- Atoms lose leading 0s, but since lsb, these become trailing NULs
|
||||
| length bs <= len -> case decode $ bs <> replicate (len - length bs) 0 of
|
||||
Right aa -> pure aa
|
||||
Left msg -> fail msg
|
||||
| otherwise -> fail ("putative " <> desc <> " " <> show bs <> " too long")
|
||||
C{} -> fail ("unexpected cell in " <> desc)
|
||||
|
||||
newtype Patp a = Patp { unPatp :: a }
|
||||
deriving newtype (Eq, Ord, Enum, Real, Integral, Num, ToNoun, FromNoun)
|
||||
|
||||
@ -167,17 +180,29 @@ newtype Port = Port { unPort :: Word16 }
|
||||
deriving newtype (Eq, Ord, Show, Enum, Real, Integral, Num, ToNoun, FromNoun)
|
||||
|
||||
-- @if
|
||||
newtype Ipv4 = Ipv4 { unIpv4 :: Word32 }
|
||||
deriving newtype (Eq, Ord, Enum, Real, Integral, Num, ToNoun, FromNoun)
|
||||
newtype Ipv4 = Ipv4 { unIpv4 :: N.HostAddress }
|
||||
deriving newtype (Eq, Ord, Enum)
|
||||
|
||||
instance Serialize Ipv4 where
|
||||
get = (\a b c d -> Ipv4 $ N.tupleToHostAddress $ (d, c, b, a))
|
||||
<$> getWord8 <*> getWord8 <*> getWord8 <*> getWord8
|
||||
put (Ipv4 (N.hostAddressToTuple -> (a, b, c, d))) = for_ [d, c, b, a] putWord8
|
||||
|
||||
instance ToNoun Ipv4 where
|
||||
toNoun = serializeToNoun
|
||||
|
||||
instance FromNoun Ipv4 where
|
||||
parseNoun = serializeParseNoun "Ipv4" 4
|
||||
|
||||
instance Show Ipv4 where
|
||||
show (Ipv4 i) =
|
||||
show ((shiftR i 24) .&. 0xff) ++ "." ++
|
||||
show ((shiftR i 16) .&. 0xff) ++ "." ++
|
||||
show ((shiftR i 8) .&. 0xff) ++ "." ++
|
||||
show (i .&. 0xff)
|
||||
show (Ipv4 (N.hostAddressToTuple -> (a, b, c, d))) =
|
||||
show a ++ "." ++
|
||||
show b ++ "." ++
|
||||
show c ++ "." ++
|
||||
show d
|
||||
|
||||
-- @is
|
||||
-- should probably use hostAddress6ToTuple here, but no one uses it right now
|
||||
newtype Ipv6 = Ipv6 { unIpv6 :: Word128 }
|
||||
deriving newtype (Eq, Ord, Show, Enum, Real, Integral, Num, ToNoun, FromNoun)
|
||||
|
||||
@ -190,21 +215,14 @@ data AmesAddress = AAIpv4 Ipv4 Port
|
||||
deriving (Eq, Ord, Show)
|
||||
|
||||
instance Serialize AmesAddress where
|
||||
get = AAIpv4 <$> (Ipv4 <$> getWord32le) <*> (Port <$> getWord16le)
|
||||
put (AAIpv4 (Ipv4 ip) (Port port)) = putWord32le ip >> putWord16le port
|
||||
get = AAIpv4 <$> get <*> (Port <$> getWord16le)
|
||||
put (AAIpv4 ip (Port port)) = put ip >> putWord16le port
|
||||
|
||||
instance FromNoun AmesAddress where
|
||||
parseNoun = named "AmesAddress" . \case
|
||||
A (atomBytes -> bs)
|
||||
-- Atoms lose leading 0s, but since lsb, these become trailing NULs
|
||||
| length bs <= 6 -> case decode $ bs <> replicate (6 - length bs) 0 of
|
||||
Right aa -> pure aa
|
||||
Left msg -> fail msg
|
||||
| otherwise -> fail ("putative address " <> show bs <> " too long")
|
||||
C{} -> fail "unexpected cell in ames address"
|
||||
parseNoun = serializeParseNoun "AmesAddress" 6
|
||||
|
||||
instance ToNoun AmesAddress where
|
||||
toNoun = A . bytesAtom . encode
|
||||
toNoun = serializeToNoun
|
||||
|
||||
type AmesDest = Each Galaxy AmesAddress
|
||||
|
||||
|
@ -80,10 +80,6 @@ data ShipClass
|
||||
muk :: ByteString -> Word20
|
||||
muk bs = mugBS bs .&. (2 ^ 20 - 1)
|
||||
|
||||
-- XX check this
|
||||
getAmesAddress :: Get AmesAddress
|
||||
getAmesAddress = AAIpv4 <$> (Ipv4 <$> getWord32le) <*> (Port <$> getWord16le)
|
||||
|
||||
putAmesAddress :: Putter AmesAddress
|
||||
putAmesAddress = \case
|
||||
AAIpv4 (Ipv4 ip) (Port port) -> putWord32le ip >> putWord16le port
|
||||
@ -104,7 +100,7 @@ instance Serialize Packet where
|
||||
guard isAmes
|
||||
|
||||
pktOrigin <- if isRelayed
|
||||
then Just <$> getAmesAddress
|
||||
then Just <$> get
|
||||
else pure Nothing
|
||||
|
||||
-- body
|
||||
@ -157,9 +153,10 @@ instance Serialize Packet where
|
||||
|
||||
putWord32le head
|
||||
case pktOrigin of
|
||||
Just o -> putAmesAddress o
|
||||
Just o -> put o
|
||||
Nothing -> pure ()
|
||||
putByteString body
|
||||
|
||||
where
|
||||
putShipGetRank s@(Ship (LargeKey p q)) = case () of
|
||||
_ | s < 2 ^ 16 -> (0, putWord16le $ fromIntegral s) -- lord
|
||||
|
@ -4,8 +4,12 @@
|
||||
1. Opens a UDP socket and makes sure that it stays open.
|
||||
|
||||
- If can't open the port, wait and try again repeatedly.
|
||||
- If there is an error reading or writting from the open socket,
|
||||
close it and open another.
|
||||
- If there is an error reading to or writing from the open socket,
|
||||
close it and open another, making sure, however, to reuse the
|
||||
same port
|
||||
NOTE: It's not clear what, if anything, closing and reopening
|
||||
the socket does. We're keeping this behavior out of conservatism
|
||||
until we understand it better.
|
||||
|
||||
2. Receives packets from the socket.
|
||||
|
||||
@ -158,7 +162,7 @@ realUdpServ
|
||||
-> HostAddress
|
||||
-> AmesStat
|
||||
-> RIO e UdpServ
|
||||
realUdpServ por hos sat = do
|
||||
realUdpServ startPort hos sat = do
|
||||
logInfo $ displayShow ("AMES", "UDP", "Starting real UDP server.")
|
||||
|
||||
env <- ask
|
||||
@ -202,23 +206,30 @@ realUdpServ por hos sat = do
|
||||
did <- atomically (tryWriteTBQueue qSend (a, b))
|
||||
when (did == False) $ do
|
||||
logWarn "AMES: UDP: Dropping outbound packet because queue is full."
|
||||
let opener por = do
|
||||
logInfo $ displayShow $ ("AMES", "UDP", "Trying to open socket, port",)
|
||||
por
|
||||
sk <- forceBind por hos
|
||||
sn <- io $ getSocketName sk
|
||||
sp <- io $ socketPort sk
|
||||
logInfo $ displayShow $ ("AMES", "UDP", "Got socket", sn, sp)
|
||||
|
||||
tOpen <- async $ forever $ do
|
||||
sk <- forceBind por hos
|
||||
sn <- io $ getSocketName sk
|
||||
let waitForRelease = do
|
||||
atomically (writeTVar vSock (Just sk))
|
||||
broken <- atomically (takeTMVar vFail)
|
||||
logWarn "AMES: UDP: Closing broken socket."
|
||||
io (close broken)
|
||||
|
||||
let waitForRelease = do
|
||||
atomically (writeTVar vSock (Just sk))
|
||||
broken <- atomically (takeTMVar vFail)
|
||||
logWarn "AMES: UDP: Closing broken socket."
|
||||
io (close broken)
|
||||
case sn of
|
||||
(SockAddrInet boundPort _) ->
|
||||
-- When we're on IPv4, maybe port forward at the NAT.
|
||||
rwith (requestPortAccess $ fromIntegral boundPort) $
|
||||
\() -> waitForRelease
|
||||
_ -> waitForRelease
|
||||
|
||||
case sn of
|
||||
(SockAddrInet boundPort _) ->
|
||||
-- When we're on IPv4, maybe port forward at the NAT.
|
||||
rwith (requestPortAccess $ fromIntegral boundPort) $
|
||||
\() -> waitForRelease
|
||||
_ -> waitForRelease
|
||||
opener sp
|
||||
|
||||
tOpen <- async $ opener startPort
|
||||
|
||||
tSend <- async $ forever $ join $ atomically $ do
|
||||
(adr, byt) <- readTBQueue qSend
|
||||
|
Loading…
Reference in New Issue
Block a user