added save/restoreReceive, needed for implementation of Remote.receive-async

This commit is contained in:
Paul Chiusano 2016-08-25 01:46:29 -04:00
parent 58dc81e2c2
commit 221079d7d0
3 changed files with 55 additions and 21 deletions

View File

@ -53,7 +53,12 @@ type IsSubscription = Bool
data Callbacks = data Callbacks =
Callbacks (M.Map B.ByteString (B.ByteString -> IO ())) (TVar Word64) Callbacks (M.Map B.ByteString (B.ByteString -> IO ())) (TVar Word64)
type Env = (STM Packet -> STM (), Callbacks, IO B.ByteString, L.Logger) type Env =
( STM Packet -> STM ()
, Callbacks
, IO B.ByteString
, M.Map B.ByteString (Multiplex B.ByteString)
, L.Logger)
newtype Multiplex a = Multiplex (ReaderT Env IO a) newtype Multiplex a = Multiplex (ReaderT Env IO a)
deriving (Applicative, Alternative, Functor, Monad, MonadIO, MonadPlus, MonadReader Env) deriving (Applicative, Alternative, Functor, Monad, MonadIO, MonadPlus, MonadReader Env)
@ -78,7 +83,8 @@ runStandardIO logger sleepAfter rem interrupt m = do
output <- atomically Q.empty :: IO (Q.Queue (Maybe Packet)) output <- atomically Q.empty :: IO (Q.Queue (Maybe Packet))
input <- atomically newTQueue :: IO (TQueue (Maybe Packet)) input <- atomically newTQueue :: IO (TQueue (Maybe Packet))
cb0@(Callbacks cbm cba) <- Callbacks <$> atomically M.new <*> atomically (newTVar 0) cb0@(Callbacks cbm cba) <- Callbacks <$> atomically M.new <*> atomically (newTVar 0)
let env = (Q.enqueue output . (Just <$>), cb0, fresh, logger) recvs0 <- atomically M.new
let env = (Q.enqueue output . (Just <$>), cb0, fresh, recvs0, logger)
activity <- atomically $ newTVar 0 activity <- atomically $ newTVar 0
let bump = atomically $ modifyTVar' activity (1+) let bump = atomically $ modifyTVar' activity (1+)
_ <- Async.async $ do _ <- Async.async $ do
@ -159,7 +165,7 @@ ask = Multiplex Reader.ask
bumpActivity :: Multiplex () bumpActivity :: Multiplex ()
bumpActivity = do bumpActivity = do
(_, Callbacks _ cba, _, _) <- ask (_, Callbacks _ cba, _, _, _) <- ask
liftIO $ bumpActivity' cba liftIO $ bumpActivity' cba
bumpActivity' :: TVar Word64 -> IO () bumpActivity' :: TVar Word64 -> IO ()
@ -167,12 +173,12 @@ bumpActivity' cba = atomically $ modifyTVar' cba (1+)
logger :: Multiplex L.Logger logger :: Multiplex L.Logger
logger = do logger = do
~(_, _, _, logger) <- ask ~(_, _, _, _, logger) <- ask
pure logger pure logger
scope :: String -> Multiplex a -> Multiplex a scope :: String -> Multiplex a -> Multiplex a
scope msg = local tweak where scope msg = local tweak where
tweak (a,b,c,logger) = (a,b,c,L.scope msg logger) tweak (a,b,c,d,logger) = (a,b,c,d,L.scope msg logger)
-- | Crash with a message. Include the current logging scope. -- | Crash with a message. Include the current logging scope.
crash :: String -> Multiplex a crash :: String -> Multiplex a
@ -187,7 +193,7 @@ debug msg = logger >>= \logger -> liftIO $ L.debug logger msg
process :: IO (Maybe Packet) -> Multiplex () process :: IO (Maybe Packet) -> Multiplex ()
process recv = scope "Mux.process" $ do process recv = scope "Mux.process" $ do
(_, Callbacks cbs cba, _, logger) <- ask (_, Callbacks cbs cba, _, _, logger) <- ask
liftIO . repeatWhile $ do liftIO . repeatWhile $ do
packet <- recv packet <- recv
case packet of case packet of
@ -199,7 +205,7 @@ process recv = scope "Mux.process" $ do
L.warn logger $ "dropped packet @ " ++ show (Base64.encode destination) L.warn logger $ "dropped packet @ " ++ show (Base64.encode destination)
pure True pure True
Just callback -> do Just callback -> do
L.debug logger $ "packet delivered @ " ++ show (Base64.encode destination) L.warn logger $ "packet delivered @ " ++ show (Base64.encode destination)
bumpActivity' cba bumpActivity' cba
callback content callback content
pure True pure True
@ -337,13 +343,13 @@ fork m = do
nest :: Serial k => k -> Multiplex a -> Multiplex a nest :: Serial k => k -> Multiplex a -> Multiplex a
nest outer m = Reader.local tweak m where nest outer m = Reader.local tweak m where
tweak (send,cbs,fresh,log) = (send' send,cbs,fresh,log) tweak (send,cbs,fresh,recvs,log) = (send' send,cbs,fresh,recvs,log)
kbytes = Put.runPutS (serialize outer) kbytes = Put.runPutS (serialize outer)
send' send p = send $ (\p -> Packet kbytes (Put.runPutS (serialize p))) <$> p send' send p = send $ (\p -> Packet kbytes (Put.runPutS (serialize p))) <$> p
channel :: Multiplex (Channel a) channel :: Multiplex (Channel a)
channel = do channel = do
~(_,_,fresh,_) <- ask ~(_,_,fresh,_,_) <- ask
Channel Type <$> liftIO fresh Channel Type <$> liftIO fresh
send :: Serial a => Channel a -> a -> Multiplex () send :: Serial a => Channel a -> a -> Multiplex ()
@ -351,12 +357,12 @@ send chan a = send' chan (pure a)
send' :: Serial a => Channel a -> STM a -> Multiplex () send' :: Serial a => Channel a -> STM a -> Multiplex ()
send' (Channel _ key) a = do send' (Channel _ key) a = do
~(send,_,_,_) <- ask ~(send,_,_,_,_) <- ask
liftIO . atomically $ send (Packet key . Put.runPutS . serialize <$> a) liftIO . atomically $ send (Packet key . Put.runPutS . serialize <$> a)
receiveCancellable :: Serial a => Channel a -> Multiplex (Multiplex a, String -> Multiplex ()) receiveCancellable :: Serial a => Channel a -> Multiplex (Multiplex a, String -> Multiplex ())
receiveCancellable (Channel _ key) = do receiveCancellable (Channel _ key) = do
(_,Callbacks cbs cba,_,_) <- ask (_,Callbacks cbs cba,_,_,_) <- ask
result <- liftIO newEmptyMVar result <- liftIO newEmptyMVar
liftIO . atomically $ M.insert (putMVar result . Right) key cbs liftIO . atomically $ M.insert (putMVar result . Right) key cbs
liftIO $ bumpActivity' cba liftIO $ bumpActivity' cba
@ -378,6 +384,27 @@ receiveTimed msg micros chan = do
run env (cancel $ "receiveTimed timeout during " ++ msg) run env (cancel $ "receiveTimed timeout during " ++ msg)
pure $ scope "receiveTimed" (force <* liftIO (C.killThread watchdog) <* cancel ("receiveTimed completed" ++ msg)) pure $ scope "receiveTimed" (force <* liftIO (C.killThread watchdog) <* cancel ("receiveTimed completed" ++ msg))
-- Save a receive future as part of
saveReceive :: Microseconds
-> B.ByteString -> Multiplex B.ByteString -> Multiplex ()
saveReceive micros chan force = do
(_,_,_,recvs,_) <- ask
tid <- liftIO . C.forkIO $ do
C.threadDelay micros
atomically $ M.delete chan recvs
let force' = do
liftIO $ C.killThread tid
liftIO $ atomically (M.delete chan recvs)
force
liftIO . atomically $ M.insert force' chan recvs
restoreReceive :: B.ByteString -> Multiplex B.ByteString
restoreReceive chan = do
(_,_,_,recvs,_) <- ask
o <- liftIO . atomically $ M.lookup chan recvs
fromMaybe (crash $ "chan could not be restored: " ++ show (Base64.encode chan))
o
timeout' :: Microseconds -> a -> Multiplex a -> Multiplex a timeout' :: Microseconds -> a -> Multiplex a -> Multiplex a
timeout' micros onTimeout m = fromMaybe onTimeout <$> timeout micros m timeout' micros onTimeout m = fromMaybe onTimeout <$> timeout micros m
@ -422,7 +449,7 @@ subscribeTimed micros chan = do
subscribe :: Serial a => Channel a -> Multiplex (Multiplex a, Multiplex ()) subscribe :: Serial a => Channel a -> Multiplex (Multiplex a, Multiplex ())
subscribe (Channel _ key) = scope "subscribe" $ do subscribe (Channel _ key) = scope "subscribe" $ do
(_, Callbacks cbs cba, _, _) <- ask (_, Callbacks cbs cba, _, _, _) <- ask
q <- liftIO . atomically $ newTQueue q <- liftIO . atomically $ newTQueue
liftIO . atomically $ M.insert (atomically . writeTQueue q) key cbs liftIO . atomically $ M.insert (atomically . writeTQueue q) key cbs
liftIO $ bumpActivity' cba liftIO $ bumpActivity' cba

View File

@ -193,19 +193,23 @@ handle crypto allow env lang p r = Mux.debug (show r) >> case r of
runLocal (Pure t) = do runLocal (Pure t) = do
Mux.debug $ "runLocal Pure" Mux.debug $ "runLocal Pure"
liftIO $ eval lang t liftIO $ eval lang t
runLocal (Send (Channel cid) a) = do runLocal (Send c@(Channel cid) a) = do
Mux.debug $ "runLocal Send " ++ show cid Mux.warn $ "runLocal Send " ++ show c
Mux.process1 (Mux.Packet cid (Put.runPutS (serialize a))) Mux.process1 (Mux.Packet cid (Put.runPutS (serialize a)))
pure (unit lang) pure (unit lang)
runLocal (ReceiveAsync chan@(Channel cid) (Seconds seconds)) = do runLocal (ReceiveAsync chan@(Channel cid) (Seconds seconds)) = do
Mux.debug $ "runLocal ReceiveAsync " ++ show (seconds, cid) Mux.debug $ "runLocal ReceiveAsync " ++ show (seconds, cid)
_ <- Mux.receiveTimed ("receiveAsync on " ++ show chan) forceChan <- Mux.channel
(floor $ seconds * 1000 * 1000) ((Mux.Channel Mux.Type cid) :: Mux.Channel (Maybe B.ByteString)) Mux.warn $ "ReceiveAsync force channel " ++ show forceChan
pure (remote lang (Step (Local (Receive chan)))) let micros = floor $ seconds * 1000 * 1000
force <- Mux.receiveTimed ("receiveAsync on " ++ show chan)
micros ((Mux.Channel Mux.Type cid) :: Mux.Channel B.ByteString)
Mux.saveReceive micros (Mux.channelId forceChan) force
pure (remote lang (Step (Local (Receive (Channel $ Mux.channelId forceChan)))))
runLocal (Receive (Channel cid)) = do runLocal (Receive (Channel cid)) = do
Mux.debug $ "runLocal Receive " ++ show cid Mux.warn $ "runLocal Receive " ++ show cid
(recv,_) <- Mux.receiveCancellable (Mux.Channel Mux.Type cid) bytes <- Mux.restoreReceive cid
bytes <- recv Mux.warn $ "runLocal Receive got bytes " ++ show cid
case Get.runGetS deserialize bytes of case Get.runGetS deserialize bytes of
Left err -> fail err Left err -> fail err
Right r -> pure r Right r -> pure r

View File

@ -168,7 +168,10 @@ instance Hashable Node where
instance Show Node where instance Show Node where
show (Node host key) = "http://" ++ Text.unpack host ++ "/" ++ Text.unpack (decodeUtf8 (Base64.encode key)) show (Node host key) = "http://" ++ Text.unpack host ++ "/" ++ Text.unpack (decodeUtf8 (Base64.encode key))
newtype Channel = Channel ByteString deriving (Eq,Ord,Generic,Show) newtype Channel = Channel ByteString deriving (Eq,Ord,Generic)
instance Show Channel where
show (Channel id) = Text.unpack (decodeUtf8 (Base64.encode id))
instance ToJSON Channel where toJSON (Channel c) = toJSON (decodeUtf8 (Base64.encode c)) instance ToJSON Channel where toJSON (Channel c) = toJSON (decodeUtf8 (Base64.encode c))
instance FromJSON Channel where instance FromJSON Channel where