mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-18 04:51:35 +03:00
ce243f5899
* add types to represent unparsed http gql requests This will help when we add caching of frequently used ASTs * query plan caching * move livequery to execute * add multiplexed module * session variable can be customised depending on the context Previously the value was always "current_setting('hasura.user')" * get rid of typemap requirement in reusable plan * subscriptions are multiplexed when possible * use lazytx for introspection to avoid acquiring a pg connection * refactor to make execute a completely decoupled module * don't issue a transaction for a query * don't use current setting for explained sql * move postgres related types to a different module * validate variableValues on postgres before multiplexing subs * don't user current_setting for queries over ws * plan_cache is only visible when developer flag is enabled * introduce 'batch size' when multiplexing subscriptions * bump stackage to 13.16 * fix schema_stitching test case error code * store hashes instead of actual responses for subscriptions * internal api to dump subscriptions state * remove PlanCache from SchemaCacheRef * allow live query options to be configured on server startup * capture metrics for multiplexed subscriptions * more metrics captured for multiplexed subs * switch to tvar based hashmap for faster snapshotting * livequery modules do not expose internal details * fix typo in live query env vars * switch to hasura's pg-client-hs
250 lines
6.9 KiB
Haskell
250 lines
6.9 KiB
Haskell
module Hasura.GraphQL.Execute.LiveQuery.Fallback
|
|
( RefetchInterval
|
|
, refetchIntervalFromMilli
|
|
, FallbackOpts
|
|
, mkFallbackOpts
|
|
|
|
, LiveQueriesState
|
|
, initLiveQueriesState
|
|
, dumpLiveQueriesState
|
|
|
|
, FallbackOp
|
|
, mkFallbackOp
|
|
, LiveQueryId
|
|
, addLiveQuery
|
|
, removeLiveQuery
|
|
) where
|
|
|
|
import qualified Control.Concurrent.Async as A
|
|
import qualified Control.Concurrent.STM as STM
|
|
import qualified Data.Aeson as J
|
|
import qualified ListT
|
|
import qualified StmContainers.Map as STMMap
|
|
|
|
import Control.Concurrent (threadDelay)
|
|
|
|
import Hasura.EncJSON
|
|
import Hasura.GraphQL.Execute.LiveQuery.Types
|
|
import Hasura.GraphQL.Transport.HTTP.Protocol
|
|
import Hasura.Prelude
|
|
import Hasura.RQL.Types
|
|
|
|
data LiveQuery
|
|
= LiveQuery
|
|
{ _lqUser :: !UserInfo
|
|
, _lqRequest :: !GQLReqUnparsed
|
|
} deriving (Show, Eq, Generic)
|
|
|
|
instance J.ToJSON LiveQuery where
|
|
toJSON (LiveQuery user req) =
|
|
J.object [ "user" J..= userVars user
|
|
, "request" J..= req
|
|
]
|
|
|
|
instance Hashable LiveQuery
|
|
|
|
data LQHandler
|
|
= LQHandler
|
|
-- the tx to be executed
|
|
{ _lqhRespTx :: !LazyRespTx
|
|
-- previous result
|
|
, _lqhPrevRes :: !RespTV
|
|
-- the actions that have been run previously
|
|
-- we run these if the response changes
|
|
, _lqhCurOps :: !Sinks
|
|
-- we run these operations regardless
|
|
-- and then merge them with current operations
|
|
, _lqhNewOps :: !Sinks
|
|
}
|
|
|
|
data FallbackOpts
|
|
= FallbackOpts
|
|
{ _foRefetchInterval :: !RefetchInterval
|
|
} deriving (Show, Eq)
|
|
|
|
instance J.ToJSON FallbackOpts where
|
|
toJSON (FallbackOpts refetchInterval) =
|
|
J.object [ "refetch_delay" J..= refetchInterval
|
|
]
|
|
|
|
-- 1 second
|
|
defaultRefetchInterval :: RefetchInterval
|
|
defaultRefetchInterval =
|
|
refetchIntervalFromMilli 1000
|
|
|
|
mkFallbackOpts
|
|
:: Maybe RefetchInterval
|
|
-> FallbackOpts
|
|
mkFallbackOpts refetchIntervalM =
|
|
FallbackOpts
|
|
(fromMaybe defaultRefetchInterval refetchIntervalM)
|
|
|
|
data LiveQueriesState
|
|
= LiveQueriesState
|
|
{ _lqsOptions :: !FallbackOpts
|
|
, _lqsLiveQueryMap :: !LiveQueryMap
|
|
}
|
|
|
|
dumpLiveQueriesState :: LiveQueriesState -> IO J.Value
|
|
dumpLiveQueriesState (LiveQueriesState opts lqMap) = do
|
|
lqMapJ <- dumpLiveQueryMap lqMap
|
|
return $ J.object
|
|
[ "options" J..= opts
|
|
, "live_queries_map" J..= lqMapJ
|
|
]
|
|
|
|
initLiveQueriesState
|
|
:: FallbackOpts
|
|
-> STM.STM LiveQueriesState
|
|
initLiveQueriesState lqOptions =
|
|
LiveQueriesState
|
|
lqOptions
|
|
<$> STMMap.new
|
|
|
|
data LiveQueryId
|
|
= LiveQueryId
|
|
{ _lqiQuery :: !LiveQuery
|
|
, _lqiSink :: !SinkId
|
|
}
|
|
|
|
type LiveQueryMap = STMMap.Map LiveQuery (LQHandler, ThreadTM)
|
|
|
|
dumpLiveQueryMap :: LiveQueryMap -> IO J.Value
|
|
dumpLiveQueryMap lqMap =
|
|
fmap J.toJSON $ STM.atomically $ do
|
|
entries <- ListT.toList $ STMMap.listT lqMap
|
|
forM entries $ \(lq, (lqHandler, threadRef)) -> do
|
|
prevResHash <- STM.readTVar $ _lqhPrevRes lqHandler
|
|
threadId <- A.asyncThreadId <$> STM.readTMVar threadRef
|
|
curOps <- toListTMap $ _lqhCurOps lqHandler
|
|
newOps <- toListTMap $ _lqhNewOps lqHandler
|
|
return $ J.object
|
|
[ "query" J..= lq
|
|
, "thread_id" J..= show threadId
|
|
, "current_ops" J..= map fst curOps
|
|
, "new_ops" J..= map fst newOps
|
|
, "previous_result_hash" J..= prevResHash
|
|
]
|
|
|
|
removeLiveQuery
|
|
:: LiveQueriesState
|
|
-- the query and the associated operation
|
|
-> LiveQueryId
|
|
-> IO ()
|
|
removeLiveQuery lqState (LiveQueryId liveQ k) = do
|
|
|
|
-- clean the handler's state
|
|
threadRefM <- STM.atomically $ do
|
|
lqHandlerM <- STMMap.lookup liveQ lqMap
|
|
maybe (return Nothing) cleanLQHandler lqHandlerM
|
|
|
|
-- cancel the polling thread
|
|
onJust threadRefM A.cancel
|
|
|
|
where
|
|
lqMap = _lqsLiveQueryMap lqState
|
|
cleanLQHandler (handler, threadRef) = do
|
|
let curOps = _lqhCurOps handler
|
|
newOps = _lqhNewOps handler
|
|
deleteTMap k curOps
|
|
deleteTMap k newOps
|
|
cancelPollThread <- (&&)
|
|
<$> nullTMap curOps
|
|
<*> nullTMap newOps
|
|
-- if this happens to be the last operation, take the
|
|
-- ref for the polling thread to cancel it
|
|
if cancelPollThread then do
|
|
STMMap.delete liveQ lqMap
|
|
Just <$> STM.takeTMVar threadRef
|
|
else return Nothing
|
|
|
|
-- the transaction associated with this query
|
|
type FallbackOp = (LiveQuery, LazyRespTx)
|
|
|
|
mkFallbackOp
|
|
:: UserInfo -> GQLReqUnparsed
|
|
-> LazyRespTx -> FallbackOp
|
|
mkFallbackOp userInfo req tx =
|
|
(LiveQuery userInfo req, tx)
|
|
|
|
|
|
addLiveQuery
|
|
:: PGExecCtx
|
|
-> LiveQueriesState
|
|
-- the query
|
|
-> FallbackOp
|
|
-- the action to be executed when result changes
|
|
-> OnChange
|
|
-> IO LiveQueryId
|
|
addLiveQuery pgExecCtx lqState (liveQ, respTx) onResultAction= do
|
|
|
|
sinkId <- newSinkId
|
|
|
|
-- a handler is returned only when it is newly created
|
|
handlerM <- STM.atomically $ do
|
|
lqHandlerM <- STMMap.lookup liveQ lqMap
|
|
maybe (newHandler sinkId) (addToExistingHandler sinkId) lqHandlerM
|
|
|
|
-- we can then attach a polling thread if it is new
|
|
-- the livequery can only be cancelled after putTMVar
|
|
onJust handlerM $ \(handler, pollerThreadTM) -> do
|
|
threadRef <- A.async $ forever $ do
|
|
pollQuery pgExecCtx handler
|
|
threadDelay $ refetchIntervalToMicro refetchInterval
|
|
STM.atomically $ STM.putTMVar pollerThreadTM threadRef
|
|
|
|
return $ LiveQueryId liveQ sinkId
|
|
|
|
where
|
|
|
|
LiveQueriesState lqOpts lqMap = lqState
|
|
FallbackOpts refetchInterval = lqOpts
|
|
|
|
addToExistingHandler sinkId (handler, _) = do
|
|
insertTMap onResultAction sinkId $ _lqhNewOps handler
|
|
return Nothing
|
|
|
|
newHandler sinkId = do
|
|
handler <- LQHandler
|
|
<$> return respTx
|
|
<*> STM.newTVar Nothing
|
|
<*> newTMap
|
|
<*> newTMap
|
|
insertTMap onResultAction sinkId $ _lqhNewOps handler
|
|
asyncRefTM <- STM.newEmptyTMVar
|
|
STMMap.insert (handler, asyncRefTM) liveQ lqMap
|
|
return $ Just (handler, asyncRefTM)
|
|
|
|
pollQuery
|
|
:: PGExecCtx
|
|
-> LQHandler
|
|
-> IO ()
|
|
pollQuery pgExecCtx (LQHandler respTx respTV curOpsTV newOpsTV) = do
|
|
|
|
resOrErr <- runExceptT $ runLazyTx pgExecCtx respTx
|
|
|
|
let (resp, respHashM) = case encJToLBS <$> resOrErr of
|
|
Left e -> (GQExecError [encodeGQErr False e], Nothing)
|
|
Right lbs -> (GQSuccess lbs, Just $ mkRespHash lbs)
|
|
|
|
-- extract the current and new operations
|
|
(curOps, newOps) <- STM.atomically $ do
|
|
curOpsL <- toListTMap curOpsTV
|
|
newOpsL <- toListTMap newOpsTV
|
|
forM_ newOpsL $ \(k, action) -> insertTMap action k curOpsTV
|
|
resetTMap newOpsTV
|
|
return (curOpsL, newOpsL)
|
|
|
|
runOperations resp newOps
|
|
|
|
-- write to the current websockets if needed
|
|
prevRespHashM <- STM.readTVarIO respTV
|
|
when (isExecError resp || respHashM /= prevRespHashM) $ do
|
|
runOperations resp curOps
|
|
STM.atomically $ STM.writeTVar respTV respHashM
|
|
|
|
where
|
|
runOperation resp action = action resp
|
|
runOperations resp =
|
|
void . A.mapConcurrently (runOperation resp . snd)
|