Skeleton implementation of Ames with galaxy lookup.

Needs to be moved to RIO so that I can get logging data about this.
This commit is contained in:
Elliot Glaysher 2019-10-08 13:04:21 -07:00
parent fcdc5904e8
commit 4a0c2f0393

View File

@ -7,25 +7,28 @@ import Network.Socket hiding (recvFrom, sendTo)
import Network.Socket.ByteString
import Vere.Pier.Types
import qualified Data.Map as M
import qualified Urbit.Ob as Ob
import qualified Urbit.Time as Time
-- Types -----------------------------------------------------------------------
data AmesDrv = AmesDrv
{ aIsLive :: IORef Bool
, aSocket :: Socket
, aWakeTimer :: Async ()
, aListener :: Async ()
{ aIsLive :: IORef Bool
, aTurfs :: TVar [Turf]
, aGalaxies :: IORef (M.Map Galaxy (Async (), TQueue ByteString))
, aSocket :: Socket
, aWakeTimer :: Async ()
, aListener :: Async ()
, aSendingQueue :: TQueue (SockAddr, ByteString)
, aSendingThread :: Async ()
}
data NetworkMode = Fake | Real
deriving (Eq, Ord, Show)
{-
data GalaxyInfo = GalaxyInfo { ip :: Ipv4, age :: Time.Unix }
deriving (Eq, Ord, Show)
-}
-- data GalaxyInfo = GalaxyInfo { ip :: Ipv4, age :: Time.Unix }
-- deriving (Eq, Ord, Show)
-- Utils -----------------------------------------------------------------------
@ -36,7 +39,7 @@ galaxyPort Real (Galaxy g) = fromIntegral g + 13337
listenPort :: NetworkMode -> Ship -> PortNumber
listenPort m s | s < 256 = galaxyPort m (fromIntegral s)
listenPort m _ = 0
listenPort m _ = 0
localhost :: HostAddress
localhost = tupleToHostAddress (127,0,0,1)
@ -104,17 +107,25 @@ ames inst who mPort enqueueEv =
start :: IO AmesDrv
start = do
vLiv <- newIORef False
time <- async runTimer
sock <- bindSock
hear <- async (waitPacket sock)
pure $ AmesDrv vLiv sock time hear
vLiv <- newIORef False
vTurf <- newTVarIO []
vGalaxies <- newIORef mempty
time <- async runTimer
sock <- bindSock
hear <- async (waitPacket sock)
sendQueue <- newTQueueIO
sending <- async (sendingThread sendQueue sock)
pure $ AmesDrv vLiv vTurf vGalaxies sock time hear sendQueue sending
netMode :: NetworkMode
netMode = Fake
stop :: AmesDrv -> IO ()
stop (AmesDrv{..}) = do
galaxies <- readIORef aGalaxies
mapM_ (cancel . fst) galaxies
cancel aSendingThread
cancel aWakeTimer
cancel aListener
close' aSocket
@ -140,12 +151,101 @@ ames inst who mPort enqueueEv =
_ -> pure ()
handleEffect :: AmesDrv -> NewtEf -> IO ()
handleEffect AmesDrv{..} = \case
handleEffect drv@AmesDrv{..} = \case
NewtEfTurf (_id, ()) turfs -> do
writeIORef aIsLive True
atomically $ writeTVar aTurfs turfs
NewtEfSend (_id, ()) dest (MkBytes bs) -> do
live <- readIORef aIsLive
when live $ do
when (netMode == Real || okayFakeAddr dest) $ do
void $ sendTo aSocket bs $ destSockAddr netMode dest
whenM (readIORef aIsLive) (sendPacket drv netMode dest bs)
sendPacket :: AmesDrv -> NetworkMode -> AmesDest -> ByteString -> IO ()
sendPacket AmesDrv{..} Fake dest bs =
when (okayFakeAddr dest) $ do
atomically $ writeTQueue aSendingQueue ((destSockAddr Fake dest), bs)
sendPacket AmesDrv{..} Real (ADGala wen galaxy) bs = do
galaxies <- readIORef aGalaxies
queue <- case M.lookup galaxy galaxies of
Just (_, queue) -> pure queue
Nothing -> do
inQueue <- newTQueueIO
thread <- galaxyResolver galaxy aTurfs inQueue aSendingQueue
modifyIORef (aGalaxies) (M.insert galaxy (thread, inQueue))
pure inQueue
atomically $ writeTQueue queue bs
sendPacket AmesDrv{..} Real ip@(ADIpv4 _ _ _) bs =
atomically $ writeTQueue aSendingQueue ((destSockAddr Real ip), bs)
-- An outbound queue of messages. We can only write to a socket from one
-- thread, so coalesce those writes here.
sendingThread :: TQueue (SockAddr, ByteString) -> Socket -> IO ()
sendingThread queue socket = forever $ do
(dest, bs) <- atomically $ readTQueue queue
void $ sendTo socket bs dest
-- Asynchronous thread per galaxy which handles domain resolution, and can
-- block its own queue of ByteStrings to send.
--
-- Maybe perform the resolution asynchronously, injecting into the resolver
-- queue as a message.
--
-- TODO: Figure out how the real haskell time library works.
galaxyResolver :: Galaxy -> TVar [Turf] -> TQueue ByteString
-> TQueue (SockAddr, ByteString)
-> IO (Async ())
galaxyResolver galaxy turfVar incoming outgoing =
async $ loop Nothing Time.unixEpoch
where
loop :: Maybe SockAddr -> Time.Wen -> IO ()
loop lastGalaxyIP lastLookupTime = do
packet <- atomically $ readTQueue incoming
i <- checkIP lastGalaxyIP lastLookupTime
case i of
(Nothing, t) -> do
-- We've failed to lookup the IP. Drop the outbound packet
-- because we have no IP for our galaxy, including possible
-- previous IPs.
loop Nothing t
(Just ip, t) -> do
queueSendToGalaxy ip packet
loop (Just ip) t
checkIP :: Maybe SockAddr -> Time.Wen -> IO (Maybe SockAddr, Time.Wen)
checkIP lastIP lastLookupTime = do
current <- Time.now
if (Time.gap current lastLookupTime ^. Time.secs) < 300
then pure (lastIP, lastLookupTime)
else do
toCheck <- atomically $ readTVar turfVar
ip <- resolveFirstIP lastIP toCheck
timeAfterResolution <- Time.now
pure (ip, timeAfterResolution)
resolveFirstIP :: Maybe SockAddr -> [Turf] -> IO (Maybe SockAddr)
resolveFirstIP prevIP [] = do
-- print ("ames: czar at %s: not found (b)\n")
pure prevIP
resolveFirstIP prevIP (x:xs) = do
let hostname = buildDNS galaxy x
listIPs <- getAddrInfo Nothing (Just hostname) Nothing
case listIPs of
[] -> resolveFirstIP prevIP xs
(y:ys) -> pure $ Just $ addrAddress y
buildDNS :: Galaxy -> Turf -> String
buildDNS (Galaxy g) turf = name ++ "." ++ (unpack $ _turfText turf)
where
nameWithSig = Ob.renderPatp $ Ob.patp $ fromIntegral g
name = case stripPrefix "~" nameWithSig of
Nothing -> error "Urbit.ob didn't produce string with ~"
Just x -> (unpack x)
queueSendToGalaxy :: SockAddr -> ByteString -> IO ()
queueSendToGalaxy inet packet =
atomically $ writeTQueue outgoing (inet, packet)