Vamshi Surabhi ce243f5899
multiplexed subscriptions (#1934)
* add types to represent unparsed http gql requests

This will help when we add caching of frequently used ASTs

* query plan caching

* move livequery to execute

* add multiplexed module

* session variable can be customised depending on the context

Previously the value was always "current_setting('hasura.user')"

* get rid of typemap requirement in reusable plan

* subscriptions are multiplexed when possible

* use lazytx for introspection to avoid acquiring a pg connection

* refactor to make execute a completely decoupled module

* don't issue a transaction for a query

* don't use current setting for explained sql

* move postgres related types to a different module

* validate variableValues on postgres before multiplexing subs

* don't user current_setting for queries over ws

* plan_cache is only visible when developer flag is enabled

* introduce 'batch size' when multiplexing subscriptions

* bump stackage to 13.16

* fix schema_stitching test case error code

* store hashes instead of actual responses for subscriptions

* internal api to dump subscriptions state

* remove PlanCache from SchemaCacheRef

* allow live query options to be configured on server startup

* capture metrics for multiplexed subscriptions

* more metrics captured for multiplexed subs

* switch to tvar based hashmap for faster snapshotting

* livequery modules do not expose internal details

* fix typo in live query env vars

* switch to hasura's pg-client-hs
2019-04-17 15:18:41 +05:30

250 lines
6.9 KiB

module Hasura.GraphQL.Execute.LiveQuery.Fallback
( RefetchInterval
, refetchIntervalFromMilli
, FallbackOpts
, mkFallbackOpts
, LiveQueriesState
, initLiveQueriesState
, dumpLiveQueriesState
, FallbackOp
, mkFallbackOp
, LiveQueryId
, addLiveQuery
, removeLiveQuery
) where
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM as STM
import qualified Data.Aeson as J
import qualified ListT
import qualified StmContainers.Map as STMMap
import Control.Concurrent (threadDelay)
import Hasura.EncJSON
import Hasura.GraphQL.Execute.LiveQuery.Types
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Prelude
import Hasura.RQL.Types
data LiveQuery
= LiveQuery
{ _lqUser :: !UserInfo
, _lqRequest :: !GQLReqUnparsed
} deriving (Show, Eq, Generic)
instance J.ToJSON LiveQuery where
toJSON (LiveQuery user req) =
J.object [ "user" J..= userVars user
, "request" J..= req
instance Hashable LiveQuery
data LQHandler
= LQHandler
-- the tx to be executed
{ _lqhRespTx :: !LazyRespTx
-- previous result
, _lqhPrevRes :: !RespTV
-- the actions that have been run previously
-- we run these if the response changes
, _lqhCurOps :: !Sinks
-- we run these operations regardless
-- and then merge them with current operations
, _lqhNewOps :: !Sinks
data FallbackOpts
= FallbackOpts
{ _foRefetchInterval :: !RefetchInterval
} deriving (Show, Eq)
instance J.ToJSON FallbackOpts where
toJSON (FallbackOpts refetchInterval) =
J.object [ "refetch_delay" J..= refetchInterval
-- 1 second
defaultRefetchInterval :: RefetchInterval
defaultRefetchInterval =
refetchIntervalFromMilli 1000
:: Maybe RefetchInterval
-> FallbackOpts
mkFallbackOpts refetchIntervalM =
(fromMaybe defaultRefetchInterval refetchIntervalM)
data LiveQueriesState
= LiveQueriesState
{ _lqsOptions :: !FallbackOpts
, _lqsLiveQueryMap :: !LiveQueryMap
dumpLiveQueriesState :: LiveQueriesState -> IO J.Value
dumpLiveQueriesState (LiveQueriesState opts lqMap) = do
lqMapJ <- dumpLiveQueryMap lqMap
return $ J.object
[ "options" J..= opts
, "live_queries_map" J..= lqMapJ
:: FallbackOpts
-> STM.STM LiveQueriesState
initLiveQueriesState lqOptions =
data LiveQueryId
= LiveQueryId
{ _lqiQuery :: !LiveQuery
, _lqiSink :: !SinkId
type LiveQueryMap = STMMap.Map LiveQuery (LQHandler, ThreadTM)
dumpLiveQueryMap :: LiveQueryMap -> IO J.Value
dumpLiveQueryMap lqMap =
fmap J.toJSON $ STM.atomically $ do
entries <- ListT.toList $ STMMap.listT lqMap
forM entries $ \(lq, (lqHandler, threadRef)) -> do
prevResHash <- STM.readTVar $ _lqhPrevRes lqHandler
threadId <- A.asyncThreadId <$> STM.readTMVar threadRef
curOps <- toListTMap $ _lqhCurOps lqHandler
newOps <- toListTMap $ _lqhNewOps lqHandler
return $ J.object
[ "query" J..= lq
, "thread_id" J..= show threadId
, "current_ops" J..= map fst curOps
, "new_ops" J..= map fst newOps
, "previous_result_hash" J..= prevResHash
:: LiveQueriesState
-- the query and the associated operation
-> LiveQueryId
-> IO ()
removeLiveQuery lqState (LiveQueryId liveQ k) = do
-- clean the handler's state
threadRefM <- STM.atomically $ do
lqHandlerM <- STMMap.lookup liveQ lqMap
maybe (return Nothing) cleanLQHandler lqHandlerM
-- cancel the polling thread
onJust threadRefM A.cancel
lqMap = _lqsLiveQueryMap lqState
cleanLQHandler (handler, threadRef) = do
let curOps = _lqhCurOps handler
newOps = _lqhNewOps handler
deleteTMap k curOps
deleteTMap k newOps
cancelPollThread <- (&&)
<$> nullTMap curOps
<*> nullTMap newOps
-- if this happens to be the last operation, take the
-- ref for the polling thread to cancel it
if cancelPollThread then do
STMMap.delete liveQ lqMap
Just <$> STM.takeTMVar threadRef
else return Nothing
-- the transaction associated with this query
type FallbackOp = (LiveQuery, LazyRespTx)
:: UserInfo -> GQLReqUnparsed
-> LazyRespTx -> FallbackOp
mkFallbackOp userInfo req tx =
(LiveQuery userInfo req, tx)
:: PGExecCtx
-> LiveQueriesState
-- the query
-> FallbackOp
-- the action to be executed when result changes
-> OnChange
-> IO LiveQueryId
addLiveQuery pgExecCtx lqState (liveQ, respTx) onResultAction= do
sinkId <- newSinkId
-- a handler is returned only when it is newly created
handlerM <- STM.atomically $ do
lqHandlerM <- STMMap.lookup liveQ lqMap
maybe (newHandler sinkId) (addToExistingHandler sinkId) lqHandlerM
-- we can then attach a polling thread if it is new
-- the livequery can only be cancelled after putTMVar
onJust handlerM $ \(handler, pollerThreadTM) -> do
threadRef <- A.async $ forever $ do
pollQuery pgExecCtx handler
threadDelay $ refetchIntervalToMicro refetchInterval
STM.atomically $ STM.putTMVar pollerThreadTM threadRef
return $ LiveQueryId liveQ sinkId
LiveQueriesState lqOpts lqMap = lqState
FallbackOpts refetchInterval = lqOpts
addToExistingHandler sinkId (handler, _) = do
insertTMap onResultAction sinkId $ _lqhNewOps handler
return Nothing
newHandler sinkId = do
handler <- LQHandler
<$> return respTx
<*> STM.newTVar Nothing
<*> newTMap
<*> newTMap
insertTMap onResultAction sinkId $ _lqhNewOps handler
asyncRefTM <- STM.newEmptyTMVar
STMMap.insert (handler, asyncRefTM) liveQ lqMap
return $ Just (handler, asyncRefTM)
:: PGExecCtx
-> LQHandler
-> IO ()
pollQuery pgExecCtx (LQHandler respTx respTV curOpsTV newOpsTV) = do
resOrErr <- runExceptT $ runLazyTx pgExecCtx respTx
let (resp, respHashM) = case encJToLBS <$> resOrErr of
Left e -> (GQExecError [encodeGQErr False e], Nothing)
Right lbs -> (GQSuccess lbs, Just $ mkRespHash lbs)
-- extract the current and new operations
(curOps, newOps) <- STM.atomically $ do
curOpsL <- toListTMap curOpsTV
newOpsL <- toListTMap newOpsTV
forM_ newOpsL $ \(k, action) -> insertTMap action k curOpsTV
resetTMap newOpsTV
return (curOpsL, newOpsL)
runOperations resp newOps
-- write to the current websockets if needed
prevRespHashM <- STM.readTVarIO respTV
when (isExecError resp || respHashM /= prevRespHashM) $ do
runOperations resp curOps
STM.atomically $ STM.writeTVar respTV respHashM
runOperation resp action = action resp
runOperations resp =
void . A.mapConcurrently (runOperation resp . snd)