graphql-engine/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/State.hs
Antoine Leblanc ba70ca427a server: switch to a sub-backend approach
GitOrigin-RevId: 660126d5f65620fb58a3ffcbed564e9e35f59938
2021-04-21 21:45:32 +00:00

255 lines
9.6 KiB
Haskell

{-# LANGUAGE CPP #-}
-- | Top-level management of live query poller threads. The implementation of the polling itself is
-- in "Hasura.GraphQL.Execute.LiveQuery.Poll". See "Hasura.GraphQL.Execute.LiveQuery" for high-level
-- details.
module Hasura.GraphQL.Execute.LiveQuery.State
( LiveQueriesState(..)
, initLiveQueriesState
, dumpLiveQueriesState
, LiveQueryId
, LiveQueryPostPollHook
, addLiveQuery
, removeLiveQuery
, LiveAsyncActionQueryOnSource(..)
, LiveAsyncActionQueryWithNoRelationships(..)
, LiveAsyncActionQuery(..)
, AsyncActionQueryLive(..)
, AsyncActionSubscriptionState
, addAsyncActionLiveQuery
, removeAsyncActionLiveQuery
) where
import Hasura.Prelude
import qualified Control.Concurrent.STM as STM
import qualified Control.Immortal as Immortal
import qualified Data.Aeson.Extended as J
import qualified Data.UUID.V4 as UUID
import qualified StmContainers.Map as STMMap
import Control.Concurrent.Extended (forkImmortal, sleep)
import Control.Exception (mask_)
import Data.String
import Data.Text.Extended
#ifndef PROFILING
import GHC.AssertNF
#endif
import qualified Hasura.GraphQL.Execute.LiveQuery.TMap as TMap
import qualified Hasura.Logging as L
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.LiveQuery.Options
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Execute.LiveQuery.Poll
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.WebSocket.Protocol
import Hasura.RQL.Types.Action
import Hasura.RQL.Types.Common (SourceName, unNonNegativeDiffTime)
import Hasura.RQL.Types.Error
-- | The top-level datatype that holds the state for all active live queries.
--
-- NOTE!: This must be kept consistent with a websocket connection's
-- 'OperationMap', in 'onClose' and 'onStart'.
data LiveQueriesState
= LiveQueriesState
{ _lqsOptions :: !LiveQueriesOptions
, _lqsLiveQueryMap :: !PollerMap
, _lqsPostPollHook :: !LiveQueryPostPollHook
-- ^ A hook function which is run after each fetch cycle
, _lqsAsyncActions :: !AsyncActionSubscriptionState
}
initLiveQueriesState
:: LiveQueriesOptions -> LiveQueryPostPollHook -> IO LiveQueriesState
initLiveQueriesState options pollHook = STM.atomically $
LiveQueriesState options <$> STMMap.new <*> pure pollHook <*> TMap.new
dumpLiveQueriesState :: Bool -> LiveQueriesState -> IO J.Value
dumpLiveQueriesState extended (LiveQueriesState opts lqMap _ _) = do
lqMapJ <- dumpPollerMap extended lqMap
return $ J.object
[ "options" J..= opts
, "live_queries_map" J..= lqMapJ
]
data LiveQueryId
= LiveQueryId
{ _lqiPoller :: !PollerKey
, _lqiCohort :: !CohortKey
, _lqiSubscriber :: !SubscriberId
} deriving Show
addLiveQuery
:: forall b
. BackendTransport b
=> L.Logger L.Hasura
-> SubscriberMetadata
-> LiveQueriesState
-> SourceName
-> LiveQueryPlan b (MultiplexedQuery b)
-> OnChange
-- ^ the action to be executed when result changes
-> IO LiveQueryId
addLiveQuery logger subscriberMetadata lqState source plan onResultAction = do
-- CAREFUL!: It's absolutely crucial that we can't throw any exceptions here!
-- disposable UUIDs:
cohortId <- newCohortId
subscriberId <- newSubscriberId
let !subscriber = Subscriber subscriberId subscriberMetadata onResultAction
#ifndef PROFILING
$assertNFHere subscriber -- so we don't write thunks to mutable vars
#endif
-- a handler is returned only when it is newly created
handlerM <- STM.atomically $
STMMap.lookup handlerId lqMap >>= \case
Just handler -> do
TMap.lookup cohortKey (_pCohorts handler) >>= \case
Just cohort -> addToCohort subscriber cohort
Nothing -> addToPoller subscriber cohortId handler
return Nothing
Nothing -> do
!poller <- newPoller
addToPoller subscriber cohortId poller
STMMap.insert poller handlerId lqMap
return $ Just poller
-- we can then attach a polling thread if it is new the livequery can only be
-- cancelled after putTMVar
onJust handlerM $ \handler -> do
pollerId <- PollerId <$> UUID.nextRandom
threadRef <- forkImmortal ("pollQuery." <> show pollerId) logger $ forever $ do
pollQuery @b pollerId lqOpts sourceConfig query (_pCohorts handler) postPollHook
sleep $ unNonNegativeDiffTime $ unRefetchInterval refetchInterval
let !pState = PollerIOState threadRef pollerId
#ifndef PROFILING
$assertNFHere pState -- so we don't write thunks to mutable vars
#endif
STM.atomically $ STM.putTMVar (_pIOState handler) pState
pure $ LiveQueryId handlerId cohortKey subscriberId
where
LiveQueriesState lqOpts lqMap postPollHook _ = lqState
LiveQueriesOptions _ refetchInterval = lqOpts
LiveQueryPlan (ParameterizedLiveQueryPlan role query) sourceConfig cohortKey = plan
handlerId = PollerKey source role $ toTxt query
addToCohort subscriber handlerC =
TMap.insert subscriber (_sId subscriber) $ _cNewSubscribers handlerC
addToPoller subscriber cohortId handler = do
!newCohort <- Cohort cohortId <$> STM.newTVar Nothing <*> TMap.new <*> TMap.new
addToCohort subscriber newCohort
TMap.insert newCohort cohortKey $ _pCohorts handler
newPoller = Poller <$> TMap.new <*> STM.newEmptyTMVar
removeLiveQuery
:: L.Logger L.Hasura
-> LiveQueriesState
-- the query and the associated operation
-> LiveQueryId
-> IO ()
removeLiveQuery logger lqState lqId@(LiveQueryId handlerId cohortId sinkId) = mask_ $ do
mbCleanupIO <- STM.atomically $ do
detM <- getQueryDet
fmap join $ forM detM $ \(Poller cohorts ioState, cohort) ->
cleanHandlerC cohorts ioState cohort
sequence_ mbCleanupIO
where
lqMap = _lqsLiveQueryMap lqState
getQueryDet = do
pollerM <- STMMap.lookup handlerId lqMap
fmap join $ forM pollerM $ \poller -> do
cohortM <- TMap.lookup cohortId (_pCohorts poller)
return $ (poller,) <$> cohortM
cleanHandlerC cohortMap ioState handlerC = do
let curOps = _cExistingSubscribers handlerC
newOps = _cNewSubscribers handlerC
TMap.delete sinkId curOps
TMap.delete sinkId newOps
cohortIsEmpty <- (&&)
<$> TMap.null curOps
<*> TMap.null newOps
when cohortIsEmpty $ TMap.delete cohortId cohortMap
handlerIsEmpty <- TMap.null cohortMap
-- when there is no need for handler i.e, this happens to be the last
-- operation, take the ref for the polling thread to cancel it
if handlerIsEmpty
then do
STMMap.delete handlerId lqMap
threadRefM <- fmap _pThread <$> STM.tryReadTMVar ioState
return $ Just $ -- deferred IO:
case threadRefM of
Just threadRef -> Immortal.stop threadRef
-- This would seem to imply addLiveQuery broke or a bug
-- elsewhere. Be paranoid and log:
Nothing -> L.unLogger logger $ L.UnstructuredLog L.LevelError $ fromString $
"In removeLiveQuery no worker thread installed. Please report this as a bug: "<>
show lqId
else return Nothing
-- | An async action query whose relationships are refered to table in a source.
-- We need to generate an SQL statement with the action response and execute it
-- in the source database so as to fetch response joined with relationship rows.
-- For more details see Note [Resolving async action query]
data LiveAsyncActionQueryOnSource
= LiveAsyncActionQueryOnSource
{ _laaqpCurrentLqId :: !LiveQueryId
, _laaqpPrevActionLogMap :: !ActionLogResponseMap
, _laaqpRestartLq :: !(LiveQueryId -> ActionLogResponseMap -> IO (Maybe LiveQueryId))
-- ^ An IO action to restart the live query poller with updated action log responses fetched from metadata storage
-- Restarting a live query re-generates the SQL statement with new action log responses to send latest action
-- response to the client.
}
data LiveAsyncActionQueryWithNoRelationships
= LiveAsyncActionQueryWithNoRelationships
{ _laaqwnrSendResponse :: !(ActionLogResponseMap -> IO ())
-- ^ An IO action to send response to the websocket client
, _laaqwnrSendCompleted :: !(IO ())
-- ^ An IO action to send "completed" message to the websocket client
}
data LiveAsyncActionQuery
= LAAQNoRelationships !LiveAsyncActionQueryWithNoRelationships
| LAAQOnSourceDB !LiveAsyncActionQueryOnSource
data AsyncActionQueryLive
= AsyncActionQueryLive
{ _aaqlActionIds :: !(NonEmpty ActionId)
, _aaqlOnException :: !(QErr -> IO ())
-- ^ An IO action to send error message (in case of any exception) to the websocket client
, _aaqlLiveExecution :: !LiveAsyncActionQuery
}
-- | A share-able state map which stores an async action live query with it's subscription operation id
type AsyncActionSubscriptionState = TMap.TMap OperationId AsyncActionQueryLive
addAsyncActionLiveQuery
:: AsyncActionSubscriptionState
-> OperationId
-> NonEmpty ActionId
-> (QErr -> IO ())
-> LiveAsyncActionQuery -> IO ()
addAsyncActionLiveQuery queriesState opId actionIds onException liveQuery =
STM.atomically $
TMap.insert (AsyncActionQueryLive actionIds onException liveQuery) opId queriesState
removeAsyncActionLiveQuery
:: AsyncActionSubscriptionState -> OperationId -> IO ()
removeAsyncActionLiveQuery queriesState opId =
STM.atomically $ TMap.delete opId queriesState