diff --git a/node/src/Unison/Runtime/Multiplex.hs b/node/src/Unison/Runtime/Multiplex.hs index aa1ff3f42..73e96c86b 100644 --- a/node/src/Unison/Runtime/Multiplex.hs +++ b/node/src/Unison/Runtime/Multiplex.hs @@ -53,7 +53,12 @@ type IsSubscription = Bool data Callbacks = Callbacks (M.Map B.ByteString (B.ByteString -> IO ())) (TVar Word64) -type Env = (STM Packet -> STM (), Callbacks, IO B.ByteString, L.Logger) +type Env = + ( STM Packet -> STM () + , Callbacks + , IO B.ByteString + , M.Map B.ByteString (Multiplex B.ByteString) + , L.Logger) newtype Multiplex a = Multiplex (ReaderT Env IO a) deriving (Applicative, Alternative, Functor, Monad, MonadIO, MonadPlus, MonadReader Env) @@ -78,7 +83,8 @@ runStandardIO logger sleepAfter rem interrupt m = do output <- atomically Q.empty :: IO (Q.Queue (Maybe Packet)) input <- atomically newTQueue :: IO (TQueue (Maybe Packet)) cb0@(Callbacks cbm cba) <- Callbacks <$> atomically M.new <*> atomically (newTVar 0) - let env = (Q.enqueue output . (Just <$>), cb0, fresh, logger) + recvs0 <- atomically M.new + let env = (Q.enqueue output . (Just <$>), cb0, fresh, recvs0, logger) activity <- atomically $ newTVar 0 let bump = atomically $ modifyTVar' activity (1+) _ <- Async.async $ do @@ -159,7 +165,7 @@ ask = Multiplex Reader.ask bumpActivity :: Multiplex () bumpActivity = do - (_, Callbacks _ cba, _, _) <- ask + (_, Callbacks _ cba, _, _, _) <- ask liftIO $ bumpActivity' cba bumpActivity' :: TVar Word64 -> IO () @@ -167,12 +173,12 @@ bumpActivity' cba = atomically $ modifyTVar' cba (1+) logger :: Multiplex L.Logger logger = do - ~(_, _, _, logger) <- ask + ~(_, _, _, _, logger) <- ask pure logger scope :: String -> Multiplex a -> Multiplex a scope msg = local tweak where - tweak (a,b,c,logger) = (a,b,c,L.scope msg logger) + tweak (a,b,c,d,logger) = (a,b,c,d,L.scope msg logger) -- | Crash with a message. Include the current logging scope. crash :: String -> Multiplex a @@ -187,7 +193,7 @@ debug msg = logger >>= \logger -> liftIO $ L.debug logger msg process :: IO (Maybe Packet) -> Multiplex () process recv = scope "Mux.process" $ do - (_, Callbacks cbs cba, _, logger) <- ask + (_, Callbacks cbs cba, _, _, logger) <- ask liftIO . repeatWhile $ do packet <- recv case packet of @@ -199,7 +205,7 @@ process recv = scope "Mux.process" $ do L.warn logger $ "dropped packet @ " ++ show (Base64.encode destination) pure True Just callback -> do - L.debug logger $ "packet delivered @ " ++ show (Base64.encode destination) + L.warn logger $ "packet delivered @ " ++ show (Base64.encode destination) bumpActivity' cba callback content pure True @@ -337,13 +343,13 @@ fork m = do nest :: Serial k => k -> Multiplex a -> Multiplex a nest outer m = Reader.local tweak m where - tweak (send,cbs,fresh,log) = (send' send,cbs,fresh,log) + tweak (send,cbs,fresh,recvs,log) = (send' send,cbs,fresh,recvs,log) kbytes = Put.runPutS (serialize outer) send' send p = send $ (\p -> Packet kbytes (Put.runPutS (serialize p))) <$> p channel :: Multiplex (Channel a) channel = do - ~(_,_,fresh,_) <- ask + ~(_,_,fresh,_,_) <- ask Channel Type <$> liftIO fresh send :: Serial a => Channel a -> a -> Multiplex () @@ -351,12 +357,12 @@ send chan a = send' chan (pure a) send' :: Serial a => Channel a -> STM a -> Multiplex () send' (Channel _ key) a = do - ~(send,_,_,_) <- ask + ~(send,_,_,_,_) <- ask liftIO . atomically $ send (Packet key . Put.runPutS . serialize <$> a) receiveCancellable :: Serial a => Channel a -> Multiplex (Multiplex a, String -> Multiplex ()) receiveCancellable (Channel _ key) = do - (_,Callbacks cbs cba,_,_) <- ask + (_,Callbacks cbs cba,_,_,_) <- ask result <- liftIO newEmptyMVar liftIO . atomically $ M.insert (putMVar result . Right) key cbs liftIO $ bumpActivity' cba @@ -378,6 +384,27 @@ receiveTimed msg micros chan = do run env (cancel $ "receiveTimed timeout during " ++ msg) pure $ scope "receiveTimed" (force <* liftIO (C.killThread watchdog) <* cancel ("receiveTimed completed" ++ msg)) +-- Save a receive future as part of +saveReceive :: Microseconds + -> B.ByteString -> Multiplex B.ByteString -> Multiplex () +saveReceive micros chan force = do + (_,_,_,recvs,_) <- ask + tid <- liftIO . C.forkIO $ do + C.threadDelay micros + atomically $ M.delete chan recvs + let force' = do + liftIO $ C.killThread tid + liftIO $ atomically (M.delete chan recvs) + force + liftIO . atomically $ M.insert force' chan recvs + +restoreReceive :: B.ByteString -> Multiplex B.ByteString +restoreReceive chan = do + (_,_,_,recvs,_) <- ask + o <- liftIO . atomically $ M.lookup chan recvs + fromMaybe (crash $ "chan could not be restored: " ++ show (Base64.encode chan)) + o + timeout' :: Microseconds -> a -> Multiplex a -> Multiplex a timeout' micros onTimeout m = fromMaybe onTimeout <$> timeout micros m @@ -422,7 +449,7 @@ subscribeTimed micros chan = do subscribe :: Serial a => Channel a -> Multiplex (Multiplex a, Multiplex ()) subscribe (Channel _ key) = scope "subscribe" $ do - (_, Callbacks cbs cba, _, _) <- ask + (_, Callbacks cbs cba, _, _, _) <- ask q <- liftIO . atomically $ newTQueue liftIO . atomically $ M.insert (atomically . writeTQueue q) key cbs liftIO $ bumpActivity' cba diff --git a/node/src/Unison/Runtime/Remote.hs b/node/src/Unison/Runtime/Remote.hs index b79aeeb88..43457ca54 100644 --- a/node/src/Unison/Runtime/Remote.hs +++ b/node/src/Unison/Runtime/Remote.hs @@ -193,19 +193,23 @@ handle crypto allow env lang p r = Mux.debug (show r) >> case r of runLocal (Pure t) = do Mux.debug $ "runLocal Pure" liftIO $ eval lang t - runLocal (Send (Channel cid) a) = do - Mux.debug $ "runLocal Send " ++ show cid + runLocal (Send c@(Channel cid) a) = do + Mux.warn $ "runLocal Send " ++ show c Mux.process1 (Mux.Packet cid (Put.runPutS (serialize a))) pure (unit lang) runLocal (ReceiveAsync chan@(Channel cid) (Seconds seconds)) = do Mux.debug $ "runLocal ReceiveAsync " ++ show (seconds, cid) - _ <- Mux.receiveTimed ("receiveAsync on " ++ show chan) - (floor $ seconds * 1000 * 1000) ((Mux.Channel Mux.Type cid) :: Mux.Channel (Maybe B.ByteString)) - pure (remote lang (Step (Local (Receive chan)))) + forceChan <- Mux.channel + Mux.warn $ "ReceiveAsync force channel " ++ show forceChan + let micros = floor $ seconds * 1000 * 1000 + force <- Mux.receiveTimed ("receiveAsync on " ++ show chan) + micros ((Mux.Channel Mux.Type cid) :: Mux.Channel B.ByteString) + Mux.saveReceive micros (Mux.channelId forceChan) force + pure (remote lang (Step (Local (Receive (Channel $ Mux.channelId forceChan))))) runLocal (Receive (Channel cid)) = do - Mux.debug $ "runLocal Receive " ++ show cid - (recv,_) <- Mux.receiveCancellable (Mux.Channel Mux.Type cid) - bytes <- recv + Mux.warn $ "runLocal Receive " ++ show cid + bytes <- Mux.restoreReceive cid + Mux.warn $ "runLocal Receive got bytes " ++ show cid case Get.runGetS deserialize bytes of Left err -> fail err Right r -> pure r diff --git a/shared/src/Unison/Remote.hs b/shared/src/Unison/Remote.hs index af2817196..5bcfd6367 100644 --- a/shared/src/Unison/Remote.hs +++ b/shared/src/Unison/Remote.hs @@ -168,7 +168,10 @@ instance Hashable Node where instance Show Node where show (Node host key) = "http://" ++ Text.unpack host ++ "/" ++ Text.unpack (decodeUtf8 (Base64.encode key)) -newtype Channel = Channel ByteString deriving (Eq,Ord,Generic,Show) +newtype Channel = Channel ByteString deriving (Eq,Ord,Generic) +instance Show Channel where + show (Channel id) = Text.unpack (decodeUtf8 (Base64.encode id)) + instance ToJSON Channel where toJSON (Channel c) = toJSON (decodeUtf8 (Base64.encode c)) instance FromJSON Channel where