natpmp: Separate out the PortThreadMsgs from the heap actions.

This commit is contained in:
Elliot Glaysher 2020-08-13 13:36:20 -04:00
parent a3336fde41
commit b4878a8b03

View File

@ -41,7 +41,7 @@ buildNATPorts stderr = do
let addRequest port = do let addRequest port = do
resp <- newEmptyTMVarIO resp <- newEmptyTMVarIO
atomically $ atomically $
writeTQueue q (PTMInitialRequestOpen port (putTMVar resp True)) writeTQueue q (PTMRequestOpen port (putTMVar resp True))
atomically $ takeTMVar resp atomically $ takeTMVar resp
pure () pure ()
@ -58,19 +58,21 @@ portRenewalTime = portLeaseLifetime - 60
-- Messages sent from the main thread to the port mapping communication thread. -- Messages sent from the main thread to the port mapping communication thread.
data PortThreadMsg data PortThreadMsg
= PTMInitialRequestOpen Word16 (STM ()) = PTMRequestOpen Word16 (STM ())
-- ^ Does the open request, and then calls the passed in stm action to -- ^ Does the open request, and then calls the passed in stm action to
-- singal completion to the main thread. We want to block on the initial -- singal completion to the main thread. We want to block on the initial
-- setting opening because we want the forwarding set up before we actually -- setting opening because we want the forwarding set up before we actually
-- start using the port. -- start using the port.
| PTMRequestOpen Word16
-- ^ Repeating open command we send to ourselves.
| PTMRequestClose Word16 | PTMRequestClose Word16
-- ^ Close command. No synchronization because there's nothing we can do if -- ^ Close command. No synchronization because there's nothing we can do if
-- it fails. -- it fails.
-- We get requests to acquire a port as an RAII condition, but the actual APIs
-- are timeout based, so we have to maintain a heap of the next timer to
-- rerequest port access.
data RenewAction = RenewAction Word16
-- The port thread is an async which reads commands from an STM queue and then -- The port thread is an async which reads commands from an STM queue and then
-- executes them. This thread is here to bind the semantics that we want to how -- executes them. This thread is here to bind the semantics that we want to how
-- NAT-PMP sees the world. We want for an RAcquire to be able to start a -- NAT-PMP sees the world. We want for an RAcquire to be able to start a
@ -109,7 +111,7 @@ portThread q stderr = do
"." ++ (tshow b) ++ "." ++ (tshow c) ++ "." ++ (tshow d) "." ++ (tshow b) ++ "." ++ (tshow c) ++ "." ++ (tshow d)
loop pmp mempty loop pmp mempty
loop :: NatPmpHandle -> MinPrioHeap POSIXTime PortThreadMsg -> RIO e () loop :: NatPmpHandle -> MinPrioHeap POSIXTime RenewAction -> RIO e ()
loop pmp nextRenew = do loop pmp nextRenew = do
now <- io $ getPOSIXTime now <- io $ getPOSIXTime
delay <- case viewHead nextRenew of delay <- case viewHead nextRenew of
@ -121,20 +123,15 @@ portThread q stderr = do
command <- atomically $ command <- atomically $
(Left <$> fini delay) <|> (Right <$> readTQueue q) (Left <$> fini delay) <|> (Right <$> readTQueue q)
case command of case command of
Left () -> do Left () -> handleRenew pmp nextRenew
-- the timeout has fired, meaning the top of the heap should be
-- popped and rerun.
case (Data.Heap.view nextRenew) of
Nothing -> error "Internal heap managing error."
Just ((_, msg), rest) -> handlePTM pmp msg rest
Right msg -> handlePTM pmp msg nextRenew Right msg -> handlePTM pmp msg nextRenew
handlePTM :: NatPmpHandle handlePTM :: NatPmpHandle
-> PortThreadMsg -> PortThreadMsg
-> MinPrioHeap POSIXTime PortThreadMsg -> MinPrioHeap POSIXTime RenewAction
-> RIO e () -> RIO e ()
handlePTM pmp msg nextRenew = case msg of handlePTM pmp msg nextRenew = case msg of
PTMInitialRequestOpen p notifyComplete -> do PTMRequestOpen p notifyComplete -> do
logInfo $ logInfo $
displayShow ("port: sending initial request to NAT-PMP for port ", p) displayShow ("port: sending initial request to NAT-PMP for port ", p)
setPortMapping pmp PTUdp p p portLeaseLifetime >>= \case setPortMapping pmp PTUdp p p portLeaseLifetime >>= \case
@ -144,33 +141,16 @@ portThread q stderr = do
":", err, ", disabling NAT-PMP") ":", err, ", disabling NAT-PMP")
loopErr q loopErr q
Right _ -> do Right _ -> do
-- Filter any existing references to this port on the heap to ensure
-- we don't double up on tasks.
let filteredHeap = filterPort p nextRenew let filteredHeap = filterPort p nextRenew
now <- io $ getPOSIXTime now <- io $ getPOSIXTime
let repeatMsg = PTMRequestOpen p
let withRenew = let withRenew =
insert (now + fromIntegral portRenewalTime, repeatMsg) insert (now + fromIntegral portRenewalTime, RenewAction p)
filteredHeap filteredHeap
atomically notifyComplete atomically notifyComplete
loop pmp withRenew loop pmp withRenew
PTMRequestOpen p -> do
logInfo $
displayShow ("port: sending renewing request to NAT-PMP for port ",
p)
ret <- setPortMapping pmp PTUdp p p portLeaseLifetime
case ret of
Left err -> do
logError $
displayShow ("port: failed to request NAT-PMP for port ", p,
":", err, ", disabling NAT-PMP")
loopErr q
Right _ -> do
let filteredHeap = filterPort p nextRenew
now <- io $ getPOSIXTime
let withRenew =
insert (now + (fromIntegral portRenewalTime), msg) filteredHeap
loop pmp withRenew
PTMRequestClose p -> do PTMRequestClose p -> do
logInfo $ logInfo $
displayShow ("port: releasing lease for ", p) displayShow ("port: releasing lease for ", p)
@ -178,15 +158,36 @@ portThread q stderr = do
let removed = filterPort p nextRenew let removed = filterPort p nextRenew
loop pmp removed loop pmp removed
handleRenew :: NatPmpHandle
-> MinPrioHeap POSIXTime RenewAction
-> RIO e ()
handleRenew pmp nextRenew = do
case (Data.Heap.view nextRenew) of
Nothing -> error "Internal heap managing error."
Just ((_, RenewAction p), rest) -> do
logInfo $
displayShow ("port: sending renewing request to NAT-PMP for port ",
p)
setPortMapping pmp PTUdp p p portLeaseLifetime >>= \case
Left err -> do
logError $
displayShow ("port: failed to request NAT-PMP for port ", p,
":", err, ", disabling NAT-PMP")
loopErr q
Right _ -> do
-- We don't need to filter the port because we just did.
now <- io $ getPOSIXTime
let withRenew =
insert (now + (fromIntegral portRenewalTime), RenewAction p)
rest
loop pmp withRenew
filterPort :: Word16 filterPort :: Word16
-> MinPrioHeap POSIXTime PortThreadMsg -> MinPrioHeap POSIXTime RenewAction
-> MinPrioHeap POSIXTime PortThreadMsg -> MinPrioHeap POSIXTime RenewAction
filterPort p = Data.Heap.filter okPort filterPort p = Data.Heap.filter okPort
where where
-- initial requests should never be in the heap okPort (_, RenewAction x) = p /= x
okPort (_, PTMInitialRequestOpen _ _) = False
okPort (_, PTMRequestOpen x) = p /= x
okPort (_, PTMRequestClose x) = p /= x
-- block (retry) until the delay TVar is set to True -- block (retry) until the delay TVar is set to True
fini :: TVar Bool -> STM () fini :: TVar Bool -> STM ()
@ -195,9 +196,8 @@ portThread q stderr = do
-- The NAT system is considered "off" but we still need to signal back to -- The NAT system is considered "off" but we still need to signal back to
-- the main thread that blocking actions are complete. -- the main thread that blocking actions are complete.
loopErr q = forever $ do loopErr q = forever $ do
readTQueueIO q >>= \case (atomically $ readTQueue q) >>= \case
PTMInitialRequestOpen _ onComplete -> atomically onComplete PTMRequestOpen _ onComplete -> atomically onComplete
PTMRequestOpen _ -> pure ()
PTMRequestClose _ -> pure () PTMRequestClose _ -> pure ()
-- When we were unable to connect to a router, get the ip address on the -- When we were unable to connect to a router, get the ip address on the