diff --git a/pkg/hs/urbit-king/lib/Urbit/Vere/Ports.hs b/pkg/hs/urbit-king/lib/Urbit/Vere/Ports.hs index 588cf74fe3..873d7e1d30 100644 --- a/pkg/hs/urbit-king/lib/Urbit/Vere/Ports.hs +++ b/pkg/hs/urbit-king/lib/Urbit/Vere/Ports.hs @@ -41,7 +41,7 @@ buildNATPorts stderr = do let addRequest port = do resp <- newEmptyTMVarIO atomically $ - writeTQueue q (PTMInitialRequestOpen port (putTMVar resp True)) + writeTQueue q (PTMRequestOpen port (putTMVar resp True)) atomically $ takeTMVar resp pure () @@ -58,19 +58,21 @@ portRenewalTime = portLeaseLifetime - 60 -- Messages sent from the main thread to the port mapping communication thread. data PortThreadMsg - = PTMInitialRequestOpen Word16 (STM ()) + = PTMRequestOpen Word16 (STM ()) -- ^ Does the open request, and then calls the passed in stm action to -- singal completion to the main thread. We want to block on the initial -- setting opening because we want the forwarding set up before we actually -- start using the port. - | PTMRequestOpen Word16 - -- ^ Repeating open command we send to ourselves. - | PTMRequestClose Word16 -- ^ Close command. No synchronization because there's nothing we can do if -- it fails. +-- We get requests to acquire a port as an RAII condition, but the actual APIs +-- are timeout based, so we have to maintain a heap of the next timer to +-- rerequest port access. +data RenewAction = RenewAction Word16 + -- The port thread is an async which reads commands from an STM queue and then -- executes them. This thread is here to bind the semantics that we want to how -- NAT-PMP sees the world. We want for an RAcquire to be able to start a @@ -109,7 +111,7 @@ portThread q stderr = do "." ++ (tshow b) ++ "." ++ (tshow c) ++ "." ++ (tshow d) loop pmp mempty - loop :: NatPmpHandle -> MinPrioHeap POSIXTime PortThreadMsg -> RIO e () + loop :: NatPmpHandle -> MinPrioHeap POSIXTime RenewAction -> RIO e () loop pmp nextRenew = do now <- io $ getPOSIXTime delay <- case viewHead nextRenew of @@ -121,20 +123,15 @@ portThread q stderr = do command <- atomically $ (Left <$> fini delay) <|> (Right <$> readTQueue q) case command of - Left () -> do - -- the timeout has fired, meaning the top of the heap should be - -- popped and rerun. - case (Data.Heap.view nextRenew) of - Nothing -> error "Internal heap managing error." - Just ((_, msg), rest) -> handlePTM pmp msg rest + Left () -> handleRenew pmp nextRenew Right msg -> handlePTM pmp msg nextRenew handlePTM :: NatPmpHandle -> PortThreadMsg - -> MinPrioHeap POSIXTime PortThreadMsg + -> MinPrioHeap POSIXTime RenewAction -> RIO e () handlePTM pmp msg nextRenew = case msg of - PTMInitialRequestOpen p notifyComplete -> do + PTMRequestOpen p notifyComplete -> do logInfo $ displayShow ("port: sending initial request to NAT-PMP for port ", p) setPortMapping pmp PTUdp p p portLeaseLifetime >>= \case @@ -144,33 +141,16 @@ portThread q stderr = do ":", err, ", disabling NAT-PMP") loopErr q Right _ -> do + -- Filter any existing references to this port on the heap to ensure + -- we don't double up on tasks. let filteredHeap = filterPort p nextRenew now <- io $ getPOSIXTime - let repeatMsg = PTMRequestOpen p let withRenew = - insert (now + fromIntegral portRenewalTime, repeatMsg) + insert (now + fromIntegral portRenewalTime, RenewAction p) filteredHeap atomically notifyComplete loop pmp withRenew - PTMRequestOpen p -> do - logInfo $ - displayShow ("port: sending renewing request to NAT-PMP for port ", - p) - ret <- setPortMapping pmp PTUdp p p portLeaseLifetime - case ret of - Left err -> do - logError $ - displayShow ("port: failed to request NAT-PMP for port ", p, - ":", err, ", disabling NAT-PMP") - loopErr q - Right _ -> do - let filteredHeap = filterPort p nextRenew - now <- io $ getPOSIXTime - let withRenew = - insert (now + (fromIntegral portRenewalTime), msg) filteredHeap - loop pmp withRenew - PTMRequestClose p -> do logInfo $ displayShow ("port: releasing lease for ", p) @@ -178,15 +158,36 @@ portThread q stderr = do let removed = filterPort p nextRenew loop pmp removed + handleRenew :: NatPmpHandle + -> MinPrioHeap POSIXTime RenewAction + -> RIO e () + handleRenew pmp nextRenew = do + case (Data.Heap.view nextRenew) of + Nothing -> error "Internal heap managing error." + Just ((_, RenewAction p), rest) -> do + logInfo $ + displayShow ("port: sending renewing request to NAT-PMP for port ", + p) + setPortMapping pmp PTUdp p p portLeaseLifetime >>= \case + Left err -> do + logError $ + displayShow ("port: failed to request NAT-PMP for port ", p, + ":", err, ", disabling NAT-PMP") + loopErr q + Right _ -> do + -- We don't need to filter the port because we just did. + now <- io $ getPOSIXTime + let withRenew = + insert (now + (fromIntegral portRenewalTime), RenewAction p) + rest + loop pmp withRenew + filterPort :: Word16 - -> MinPrioHeap POSIXTime PortThreadMsg - -> MinPrioHeap POSIXTime PortThreadMsg + -> MinPrioHeap POSIXTime RenewAction + -> MinPrioHeap POSIXTime RenewAction filterPort p = Data.Heap.filter okPort where - -- initial requests should never be in the heap - okPort (_, PTMInitialRequestOpen _ _) = False - okPort (_, PTMRequestOpen x) = p /= x - okPort (_, PTMRequestClose x) = p /= x + okPort (_, RenewAction x) = p /= x -- block (retry) until the delay TVar is set to True fini :: TVar Bool -> STM () @@ -195,9 +196,8 @@ portThread q stderr = do -- The NAT system is considered "off" but we still need to signal back to -- the main thread that blocking actions are complete. loopErr q = forever $ do - readTQueueIO q >>= \case - PTMInitialRequestOpen _ onComplete -> atomically onComplete - PTMRequestOpen _ -> pure () + (atomically $ readTQueue q) >>= \case + PTMRequestOpen _ onComplete -> atomically onComplete PTMRequestClose _ -> pure () -- When we were unable to connect to a router, get the ip address on the