server: refactor the term "live query" to "subscription"

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/4001
GitOrigin-RevId: 78a7ff5c88ac3751baa5e2b3ac6ee73b94c50051
This commit is contained in:
Karthikeyan Chinnakonda 2022-03-21 16:09:49 +05:30 committed by hasura-bot
parent c81d9f6962
commit a13ed140e8
32 changed files with 558 additions and 416 deletions

View File

@ -596,6 +596,7 @@ library
, Hasura.RQL.Types.SchemaCacheTypes
, Hasura.RQL.Types.Source
, Hasura.RQL.Types.SourceCustomization
, Hasura.RQL.Types.Subscription
, Hasura.RQL.Types.Table
, Hasura.RQL.Types.Roles
, Hasura.RQL.Types.Roles.Internal
@ -672,11 +673,13 @@ library
, Hasura.GraphQL.Execute.Common
, Hasura.GraphQL.Execute.Inline
, Hasura.GraphQL.Execute.Instances
, Hasura.GraphQL.Execute.LiveQuery.Options
, Hasura.GraphQL.Execute.LiveQuery.Plan
, Hasura.GraphQL.Execute.LiveQuery.Poll
, Hasura.GraphQL.Execute.LiveQuery.State
, Hasura.GraphQL.Execute.LiveQuery.TMap
, Hasura.GraphQL.Execute.Subscription.Options
, Hasura.GraphQL.Execute.Subscription.Plan
, Hasura.GraphQL.Execute.Subscription.Poll
, Hasura.GraphQL.Execute.Subscription.Poll.Common
, Hasura.GraphQL.Execute.Subscription.Poll.LiveQuery
, Hasura.GraphQL.Execute.Subscription.State
, Hasura.GraphQL.Execute.Subscription.TMap
, Hasura.GraphQL.Execute.Mutation
, Hasura.GraphQL.Execute.Remote
, Hasura.GraphQL.Execute.RemoteJoin

View File

@ -81,7 +81,7 @@ import Hasura.GraphQL.Execute
import Hasura.GraphQL.Execute.Action
import Hasura.GraphQL.Execute.Action.Subscription
import Hasura.GraphQL.Execute.Backend qualified as EB
import Hasura.GraphQL.Execute.LiveQuery.Poll qualified as EL
import Hasura.GraphQL.Execute.Subscription.Poll qualified as ES
import Hasura.GraphQL.Logging (MonadQueryLog (..))
import Hasura.GraphQL.Transport.HTTP
( CacheStoreSuccess (CacheStoreSkipped),
@ -569,7 +569,7 @@ runHGEServer ::
-- | start time
UTCTime ->
Maybe EL.LiveQueryPostPollHook ->
Maybe ES.SubscriptionPostPollHook ->
ServerMetrics ->
EKG.Store EKG.EmptyMetrics ->
ManagedT m ()
@ -643,7 +643,7 @@ mkHGEServer ::
-- | start time
UTCTime ->
Maybe EL.LiveQueryPostPollHook ->
Maybe ES.SubscriptionPostPollHook ->
ServerMetrics ->
EKG.Store EKG.EmptyMetrics ->
ManagedT m Application

View File

@ -42,7 +42,7 @@ instance BackendExecute 'BigQuery where
mkDBSubscriptionPlan _ _ _ _ _ =
throw500 "Cannot currently perform subscriptions on BigQuery sources."
mkDBQueryExplain = bqDBQueryExplain
mkLiveQueryExplain _ =
mkSubscriptionExplain _ =
throw500 "Cannot currently retrieve query execution plans on BigQuery sources."
-- NOTE: Currently unimplemented!.

View File

@ -30,5 +30,5 @@ instance BackendExecute 'DataWrapper where
throw400 NotSupported "mkDBSubscriptionPlan: not implemented for GraphQL Data Wrappers."
mkDBRemoteRelationshipPlan _ _ _ _ _ _ _ =
throw500 "mkDBRemoteRelationshipPlan: not implemented for GraphQL Data Wrappers."
mkLiveQueryExplain _ =
throw400 NotSupported "mkLiveQueryExplain: not implemented for GraphQL Data Wrappers."
mkSubscriptionExplain _ =
throw400 NotSupported "mkSubscriptionExplain: not implemented for GraphQL Data Wrappers."

View File

@ -37,7 +37,7 @@ import Hasura.Backends.MSSQL.Types.Internal as TSQL
import Hasura.Base.Error
import Hasura.EncJSON
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Namespace (RootFieldAlias (..), RootFieldMap)
import Hasura.GraphQL.Parser
import Hasura.Prelude
@ -58,7 +58,7 @@ instance BackendExecute 'MSSQL where
mkDBMutationPlan = msDBMutationPlan
mkDBSubscriptionPlan = msDBSubscriptionPlan
mkDBQueryExplain = msDBQueryExplain
mkLiveQueryExplain = msDBLiveQueryExplain
mkSubscriptionExplain = msDBSubscriptionExplain
-- NOTE: Currently unimplemented!.
--
@ -139,16 +139,16 @@ msDBQueryExplain fieldName userInfo sourceName sourceConfig qrf = do
AB.mkAnyBackend $
DBStepInfo @'MSSQL sourceName sourceConfig Nothing odbcQuery
msDBLiveQueryExplain ::
msDBSubscriptionExplain ::
(MonadIO m, MonadBaseControl IO m, MonadError QErr m) =>
LiveQueryPlan 'MSSQL (MultiplexedQuery 'MSSQL) ->
m LiveQueryPlanExplanation
msDBLiveQueryExplain (LiveQueryPlan plan sourceConfig variables _) = do
SubscriptionQueryPlan 'MSSQL (MultiplexedQuery 'MSSQL) ->
m SubscriptionQueryPlanExplanation
msDBSubscriptionExplain (SubscriptionQueryPlan plan sourceConfig variables _) = do
let (MultiplexedQuery' reselect) = _plqpQuery plan
query = toQueryPretty $ fromSelect $ multiplexRootReselect [(dummyCohortId, variables)] reselect
mssqlExecCtx = (_mscExecCtx sourceConfig)
explainInfo <- liftEitherM $ runExceptT $ (mssqlRunReadOnly mssqlExecCtx) (runShowplan query)
pure $ LiveQueryPlanExplanation (T.toTxt query) explainInfo variables
pure $ SubscriptionQueryPlanExplanation (T.toTxt query) explainInfo variables
--------------------------------------------------------------------------------
-- Producing the correct SQL-level list comprehension to multiplex a query
@ -258,14 +258,14 @@ msDBSubscriptionPlan ::
SourceConfig 'MSSQL ->
Maybe G.Name ->
RootFieldMap (QueryDB 'MSSQL Void (UnpreparedValue 'MSSQL)) ->
m (LiveQueryPlan 'MSSQL (MultiplexedQuery 'MSSQL))
m (SubscriptionQueryPlan 'MSSQL (MultiplexedQuery 'MSSQL))
msDBSubscriptionPlan UserInfo {_uiSession, _uiRole} _sourceName sourceConfig namespace rootFields = do
(reselect, prepareState) <- planSubscription (OMap.mapKeys _rfaAlias rootFields) _uiSession
cohortVariables <- prepareStateCohortVariables sourceConfig _uiSession prepareState
let parameterizedPlan = ParameterizedLiveQueryPlan _uiRole $ MultiplexedQuery' reselect
let parameterizedPlan = ParameterizedSubscriptionQueryPlan _uiRole $ MultiplexedQuery' reselect
pure $
LiveQueryPlan parameterizedPlan sourceConfig cohortVariables namespace
SubscriptionQueryPlan parameterizedPlan sourceConfig cohortVariables namespace
prepareStateCohortVariables :: (MonadError QErr m, MonadIO m, MonadBaseControl IO m) => SourceConfig 'MSSQL -> SessionVariables -> PrepareState -> m CohortVariables
prepareStateCohortVariables sourceConfig session prepState = do

View File

@ -21,7 +21,7 @@ import Hasura.Backends.MSSQL.ToQuery
import Hasura.Base.Error
import Hasura.EncJSON
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Logging
import Hasura.GraphQL.Namespace (RootFieldAlias)
import Hasura.GraphQL.Transport.Backend

View File

@ -8,7 +8,7 @@ import Data.Text.Encoding (decodeUtf8)
import Data.Text.Extended
import Database.ODBC.SQLServer qualified as ODBC
import Hasura.Backends.MSSQL.Types.Internal (Value)
import Hasura.GraphQL.Execute.LiveQuery.Plan ()
import Hasura.GraphQL.Execute.Subscription.Plan ()
import Hasura.Prelude
import Hasura.RQL.Types.Column qualified as RQL
import Hasura.SQL.Backend

View File

@ -37,7 +37,7 @@ instance BackendExecute 'MySQL where
mkDBMutationPlan = error "mkDBMutationPlan: MySQL backend does not support this operation yet."
mkDBSubscriptionPlan _ _ _ _ = error "mkDBSubscriptionPlan: MySQL backend does not support this operation yet."
mkDBQueryExplain = mysqlDBQueryExplain
mkLiveQueryExplain _ = error "mkLiveQueryExplain: MySQL backend does not support this operation yet."
mkSubscriptionExplain _ = error "mkSubscriptionExplain: MySQL backend does not support this operation yet."
mkDBRemoteRelationshipPlan = error "mkDBRemoteRelationshipPlan: MySQL does not support this operation yet."
mysqlDBQueryPlan ::

View File

@ -36,7 +36,7 @@ import Hasura.Backends.Postgres.Translate.Column (toTxtValue)
import Hasura.Backends.Postgres.Translate.Select qualified as DS
import Hasura.Backends.Postgres.Types.Column
import Hasura.Base.Error
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Parser
import Hasura.GraphQL.Parser.Schema qualified as PS
import Hasura.Prelude
@ -220,7 +220,7 @@ executeMultiplexedQuery ::
m [(CohortId, B.ByteString)]
executeMultiplexedQuery (MultiplexedQuery query) = executeQuery query
-- | Internal; used by both 'executeMultiplexedQuery' and 'pgDBLiveQueryExplain'.
-- | Internal; used by both 'executeMultiplexedQuery' and 'pgDBSubscriptionExplain'.
executeQuery ::
(MonadTx m, Q.FromRow a) =>
Q.Query ->

View File

@ -48,10 +48,10 @@ import Hasura.GraphQL.Execute.Backend
ExplainPlan (..),
convertRemoteSourceRelationship,
)
import Hasura.GraphQL.Execute.LiveQuery.Plan
( LiveQueryPlan (..),
LiveQueryPlanExplanation (..),
ParameterizedLiveQueryPlan (..),
import Hasura.GraphQL.Execute.Subscription.Plan
( ParameterizedSubscriptionQueryPlan (..),
SubscriptionQueryPlan (..),
SubscriptionQueryPlanExplanation (..),
mkCohortVariables,
newCohortId,
)
@ -107,7 +107,7 @@ instance
mkDBMutationPlan = pgDBMutationPlan
mkDBSubscriptionPlan = pgDBSubscriptionPlan
mkDBQueryExplain = pgDBQueryExplain
mkLiveQueryExplain = pgDBLiveQueryExplain
mkSubscriptionExplain = pgDBSubscriptionExplain
mkDBRemoteRelationshipPlan = pgDBRemoteRelationshipPlan
-- query
@ -159,16 +159,16 @@ pgDBQueryExplain fieldName userInfo sourceName sourceConfig rootSelection = do
AB.mkAnyBackend $
DBStepInfo @('Postgres pgKind) sourceName sourceConfig Nothing action
pgDBLiveQueryExplain ::
pgDBSubscriptionExplain ::
( MonadError QErr m,
MonadIO m,
MT.MonadBaseControl IO m
) =>
LiveQueryPlan ('Postgres pgKind) (MultiplexedQuery ('Postgres pgKind)) ->
m LiveQueryPlanExplanation
pgDBLiveQueryExplain plan = do
let parameterizedPlan = _lqpParameterizedPlan plan
pgExecCtx = _pscExecCtx $ _lqpSourceConfig plan
SubscriptionQueryPlan ('Postgres pgKind) (MultiplexedQuery ('Postgres pgKind)) ->
m SubscriptionQueryPlanExplanation
pgDBSubscriptionExplain plan = do
let parameterizedPlan = _sqpParameterizedPlan plan
pgExecCtx = _pscExecCtx $ _sqpSourceConfig plan
queryText = Q.getQueryText . PGL.unMultiplexedQuery $ _plqpQuery parameterizedPlan
-- CAREFUL!: an `EXPLAIN ANALYZE` here would actually *execute* this
-- query, maybe resulting in privilege escalation:
@ -178,8 +178,8 @@ pgDBLiveQueryExplain plan = do
liftEitherM $
runExceptT $
runTx pgExecCtx Q.ReadOnly $
map runIdentity <$> PGL.executeQuery explainQuery [(cohortId, _lqpVariables plan)]
pure $ LiveQueryPlanExplanation queryText explanationLines $ _lqpVariables plan
map runIdentity <$> PGL.executeQuery explainQuery [(cohortId, _sqpVariables plan)]
pure $ SubscriptionQueryPlanExplanation queryText explanationLines $ _sqpVariables plan
-- mutation
@ -300,7 +300,7 @@ pgDBSubscriptionPlan ::
SourceConfig ('Postgres pgKind) ->
Maybe G.Name ->
RootFieldMap (QueryDB ('Postgres pgKind) Void (UnpreparedValue ('Postgres pgKind))) ->
m (LiveQueryPlan ('Postgres pgKind) (MultiplexedQuery ('Postgres pgKind)))
m (SubscriptionQueryPlan ('Postgres pgKind) (MultiplexedQuery ('Postgres pgKind)))
pgDBSubscriptionPlan userInfo _sourceName sourceConfig namespace unpreparedAST = do
(preparedAST, PGL.QueryParametersInfo {..}) <-
flip runStateT mempty $
@ -310,7 +310,7 @@ pgDBSubscriptionPlan userInfo _sourceName sourceConfig namespace unpreparedAST =
multiplexedQueryWithQueryTags =
multiplexedQuery {PGL.unMultiplexedQuery = appendSQLWithQueryTags (PGL.unMultiplexedQuery multiplexedQuery) mutationQueryTagsComment}
roleName = _uiRole userInfo
parameterizedPlan = ParameterizedLiveQueryPlan roleName multiplexedQueryWithQueryTags
parameterizedPlan = ParameterizedSubscriptionQueryPlan roleName multiplexedQueryWithQueryTags
-- We need to ensure that the values provided for variables are correct according to Postgres.
-- Without this check an invalid value for a variable for one instance of the subscription will
@ -326,7 +326,7 @@ pgDBSubscriptionPlan userInfo _sourceName sourceConfig namespace unpreparedAST =
validatedQueryVars
validatedSyntheticVars
pure $ LiveQueryPlan parameterizedPlan sourceConfig cohortVariables namespace
pure $ SubscriptionQueryPlan parameterizedPlan sourceConfig cohortVariables namespace
-- turn the current plan into a transaction
mkCurPlanTx ::

View File

@ -23,7 +23,7 @@ import Hasura.Backends.Postgres.Translate.Select (PostgresAnnotatedFieldJSON)
import Hasura.Base.Error
import Hasura.EncJSON
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Logging
import Hasura.GraphQL.Namespace
( RootFieldAlias,

View File

@ -10,8 +10,8 @@ module Hasura.GraphQL.Execute
ExecutionCtx (..),
EC.MonadGQLExecutionCheck (..),
checkQueryInAllowlist,
MultiplexedLiveQueryPlan (..),
LiveQueryPlan (..),
MultiplexedSubscriptionQueryPlan (..),
SubscriptionQueryPlan (..),
)
where
@ -28,10 +28,10 @@ import Hasura.GraphQL.Context qualified as C
import Hasura.GraphQL.Execute.Action qualified as EA
import Hasura.GraphQL.Execute.Backend qualified as EB
import Hasura.GraphQL.Execute.Common qualified as EC
import Hasura.GraphQL.Execute.LiveQuery.Plan qualified as EL
import Hasura.GraphQL.Execute.Mutation qualified as EM
import Hasura.GraphQL.Execute.Query qualified as EQ
import Hasura.GraphQL.Execute.RemoteJoin qualified as RJ
import Hasura.GraphQL.Execute.Subscription.Plan qualified as ES
import Hasura.GraphQL.Execute.Types qualified as ET
import Hasura.GraphQL.Namespace
import Hasura.GraphQL.ParameterizedQueryHash
@ -105,10 +105,10 @@ data ResolvedExecutionPlan
| -- | either action query or live query execution; remote schemas and introspection not supported
SubscriptionExecutionPlan SubscriptionExecution
newtype MultiplexedLiveQueryPlan (b :: BackendType)
= MultiplexedLiveQueryPlan (EL.LiveQueryPlan b (EB.MultiplexedQuery b))
newtype MultiplexedSubscriptionQueryPlan (b :: BackendType)
= MultiplexedSubscriptionQueryPlan (ES.SubscriptionQueryPlan b (EB.MultiplexedQuery b))
newtype LiveQueryPlan = LQP (AB.AnyBackend MultiplexedLiveQueryPlan)
newtype SubscriptionQueryPlan = SubscriptionQueryPlan (AB.AnyBackend MultiplexedSubscriptionQueryPlan)
-- | The comprehensive subscription plan. We only support either
-- 1. Fields with only async action queries with no associated relationships
@ -119,7 +119,7 @@ data SubscriptionExecution
= SEAsyncActionsWithNoRelationships !(RootFieldMap (ActionId, ActionLogResponse -> Either QErr EncJSON))
| SEOnSourceDB
!(HashSet ActionId)
!(ActionLogResponseMap -> ExceptT QErr IO (SourceName, LiveQueryPlan))
!(ActionLogResponseMap -> ExceptT QErr IO (SourceName, SubscriptionQueryPlan))
buildSubscriptionPlan ::
forall m.
@ -205,7 +205,7 @@ buildSubscriptionPlan userInfo rootFields parameterizedQueryHash = do
RootFieldMap
(SourceName, AB.AnyBackend (IR.SourceConfigWith (IR.QueryDBRoot Void UnpreparedValue))) ->
RootFieldAlias ->
ExceptT QErr IO (SourceName, LiveQueryPlan)
ExceptT QErr IO (SourceName, SubscriptionQueryPlan)
buildAction (sourceName, exists) allFields rootFieldName = do
lqp <- AB.dispatchAnyBackend @EB.BackendExecute
exists
@ -213,7 +213,7 @@ buildSubscriptionPlan userInfo rootFields parameterizedQueryHash = do
qdbs <- traverse (checkField @b sourceName) allFields
let subscriptionQueryTagsAttributes = encodeQueryTags $ QTLiveQuery $ LivequeryMetadata rootFieldName parameterizedQueryHash
let queryTagsComment = Tagged.untag $ EB.createQueryTags @m subscriptionQueryTagsAttributes queryTagsConfig
LQP . AB.mkAnyBackend . MultiplexedLiveQueryPlan
SubscriptionQueryPlan . AB.mkAnyBackend . MultiplexedSubscriptionQueryPlan
<$> runReaderT (EB.mkDBSubscriptionPlan userInfo sourceName sourceConfig (_rfaNamespace rootFieldName) qdbs) queryTagsComment
pure (sourceName, lqp)

View File

@ -8,8 +8,8 @@ import Control.Concurrent.Extended qualified as C
import Control.Concurrent.STM qualified as STM
import Data.List.NonEmpty qualified as NE
import Hasura.GraphQL.Execute.Action
import Hasura.GraphQL.Execute.LiveQuery.State
import Hasura.GraphQL.Execute.LiveQuery.TMap qualified as TMap
import Hasura.GraphQL.Execute.Subscription.State
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.Metadata.Class
import Hasura.Prelude

View File

@ -21,8 +21,8 @@ import Database.PG.Query qualified as Q
import Hasura.Base.Error
import Hasura.EncJSON
import Hasura.GraphQL.Execute.Action.Types (ActionExecutionPlan)
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Execute.RemoteJoin.Types
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Namespace (RootFieldAlias, RootFieldMap)
import Hasura.GraphQL.Parser hiding (Type)
import Hasura.GraphQL.Transport.HTTP.Protocol qualified as GH
@ -98,7 +98,7 @@ class
SourceConfig b ->
Maybe G.Name ->
RootFieldMap (QueryDB b Void (UnpreparedValue b)) ->
m (LiveQueryPlan b (MultiplexedQuery b))
m (SubscriptionQueryPlan b (MultiplexedQuery b))
mkDBQueryExplain ::
forall m.
( MonadError QErr m
@ -109,13 +109,13 @@ class
SourceConfig b ->
QueryDB b Void (UnpreparedValue b) ->
m (AB.AnyBackend DBStepInfo)
mkLiveQueryExplain ::
mkSubscriptionExplain ::
( MonadError QErr m,
MonadIO m,
MonadBaseControl IO m
) =>
LiveQueryPlan b (MultiplexedQuery b) ->
m LiveQueryPlanExplanation
SubscriptionQueryPlan b (MultiplexedQuery b) ->
m SubscriptionQueryPlanExplanation
mkDBRemoteRelationshipPlan ::
forall m.

View File

@ -1,10 +1,11 @@
module Hasura.GraphQL.Execute.LiveQuery.Options
( LiveQueriesOptions (..),
module Hasura.GraphQL.Execute.Subscription.Options
( SubscriptionsOptions (..),
LiveQueriesOptions,
BatchSize,
unBatchSize,
RefetchInterval,
unRefetchInterval,
mkLiveQueriesOptions,
mkSubscriptionsOptions,
mkBatchSize,
mkRefetchInterval,
)
@ -14,29 +15,31 @@ import Data.Aeson qualified as J
import Hasura.Prelude
import Hasura.RQL.Types.Common
data LiveQueriesOptions = LiveQueriesOptions
data SubscriptionsOptions = SubscriptionsOptions
{ _lqoBatchSize :: !BatchSize,
_lqoRefetchInterval :: !RefetchInterval
}
deriving (Show, Eq)
mkLiveQueriesOptions :: Maybe BatchSize -> Maybe RefetchInterval -> LiveQueriesOptions
mkLiveQueriesOptions batchSize refetchInterval =
LiveQueriesOptions
type LiveQueriesOptions = SubscriptionsOptions
mkSubscriptionsOptions :: Maybe BatchSize -> Maybe RefetchInterval -> SubscriptionsOptions
mkSubscriptionsOptions batchSize refetchInterval =
SubscriptionsOptions
{ _lqoBatchSize = fromMaybe (BatchSize 100) batchSize,
_lqoRefetchInterval = fromMaybe (RefetchInterval 1) refetchInterval
}
instance J.ToJSON LiveQueriesOptions where
toJSON (LiveQueriesOptions batchSize refetchInterval) =
instance J.ToJSON SubscriptionsOptions where
toJSON (SubscriptionsOptions batchSize refetchInterval) =
J.object
[ "batch_size" J..= batchSize,
"refetch_delay" J..= refetchInterval
]
instance J.FromJSON LiveQueriesOptions where
instance J.FromJSON SubscriptionsOptions where
parseJSON = J.withObject "live query options" \o ->
LiveQueriesOptions <$> o J..: "batch_size"
SubscriptionsOptions <$> o J..: "batch_size"
<*> o J..: "refetch_delay"
newtype BatchSize = BatchSize {unBatchSize :: NonNegativeInt}

View File

@ -70,7 +70,7 @@
-- as “contains”:
--
-- @
-- 'LiveQueriesState' > 'Poller' > 'Cohort' > 'Subscriber'
-- 'SubscriptionsState' > 'Poller' > 'Cohort' > 'Subscriber'
-- @
--
-- Heres a brief summary of each types role:
@ -84,10 +84,10 @@
-- * A 'Poller' is a worker thread for a single, multiplexed query. It fetches data for a set of
-- 'Cohort's that all use the same parameterized query, but have different sets of variables.
--
-- * Finally, the 'LiveQueriesState' is the top-level container that holds all the active 'Poller's.
-- * Finally, the 'SubscriptionsState' is the top-level container that holds all the active 'Poller's.
--
-- Additional details are provided by the documentation for individual bindings.
module Hasura.GraphQL.Execute.LiveQuery.Plan
module Hasura.GraphQL.Execute.Subscription.Plan
( CohortId,
dummyCohortId,
newCohortId,
@ -96,14 +96,20 @@ module Hasura.GraphQL.Execute.LiveQuery.Plan
CohortVariables,
mkCohortVariables,
ValidatedVariables (..),
mkUnsafeValidateVariables,
ValidatedQueryVariables,
ValidatedSyntheticVariables,
LiveQueryPlan (..),
LiveQueryPlanExplanation (..),
ParameterizedLiveQueryPlan (..),
SubscriptionQueryPlan (..),
SubscriptionQueryPlanExplanation (..),
ParameterizedSubscriptionQueryPlan (..),
cvSessionVariables,
cvQueryVariables,
cvSyntheticVariables,
unValidatedVariables,
)
where
import Control.Lens (makeLenses)
import Data.Aeson.Extended qualified as J
import Data.Aeson.TH qualified as J
import Data.HashMap.Strict qualified as Map
@ -120,6 +126,40 @@ import Hasura.Session
import Language.GraphQL.Draft.Syntax qualified as G
import PostgreSQL.Binary.Encoding qualified as PE
----------------------------------------------------------------------------------------------------
-- Variable validation
-- | When running multiplexed queries, we have to be especially careful about user
-- input, since invalid values will cause the query to fail, causing collateral
-- damage for anyone else multiplexed into the same query. Therefore, we
-- pre-validate variables against Postgres by executing a no-op query of the shape
--
-- > SELECT 'v1'::t1, 'v2'::t2, ..., 'vn'::tn
--
-- so if any variable values are invalid, the error will be caught early.
newtype ValidatedVariables f = ValidatedVariables {_unValidatedVariables :: (f TxtEncodedVal)}
deriving instance (Show (f TxtEncodedVal)) => Show (ValidatedVariables f)
deriving instance (Eq (f TxtEncodedVal)) => Eq (ValidatedVariables f)
deriving instance (Hashable (f TxtEncodedVal)) => Hashable (ValidatedVariables f)
deriving instance (J.ToJSON (f TxtEncodedVal)) => J.ToJSON (ValidatedVariables f)
deriving instance (Semigroup (f TxtEncodedVal)) => Semigroup (ValidatedVariables f)
deriving instance (Monoid (f TxtEncodedVal)) => Monoid (ValidatedVariables f)
$(makeLenses 'ValidatedVariables)
type ValidatedQueryVariables = ValidatedVariables (Map.HashMap G.Name)
type ValidatedSyntheticVariables = ValidatedVariables []
mkUnsafeValidateVariables :: f TxtEncodedVal -> ValidatedVariables f
mkUnsafeValidateVariables = ValidatedVariables
----------------------------------------------------------------------------------------------------
-- Cohort
@ -161,6 +201,8 @@ data CohortVariables = CohortVariables
instance Hashable CohortVariables
$(makeLenses 'CohortVariables)
-- | Builds a cohort's variables by only using the session variables that
-- are required for the subscription
mkCohortVariables ::
@ -201,63 +243,40 @@ instance Q.ToPrepArg CohortVariablesArray where
where
encoder = PE.array 114 . PE.dimensionArray foldl' (PE.encodingArray . PE.json_ast)
----------------------------------------------------------------------------------------------------
-- Variable validation
-- | When running multiplexed queries, we have to be especially careful about user
-- input, since invalid values will cause the query to fail, causing collateral
-- damage for anyone else multiplexed into the same query. Therefore, we
-- pre-validate variables against Postgres by executing a no-op query of the shape
--
-- > SELECT 'v1'::t1, 'v2'::t2, ..., 'vn'::tn
--
-- so if any variable values are invalid, the error will be caught early.
newtype ValidatedVariables f = ValidatedVariables (f TxtEncodedVal)
deriving instance (Show (f TxtEncodedVal)) => Show (ValidatedVariables f)
deriving instance (Eq (f TxtEncodedVal)) => Eq (ValidatedVariables f)
deriving instance (Hashable (f TxtEncodedVal)) => Hashable (ValidatedVariables f)
deriving instance (J.ToJSON (f TxtEncodedVal)) => J.ToJSON (ValidatedVariables f)
deriving instance (Semigroup (f TxtEncodedVal)) => Semigroup (ValidatedVariables f)
deriving instance (Monoid (f TxtEncodedVal)) => Monoid (ValidatedVariables f)
type ValidatedQueryVariables = ValidatedVariables (Map.HashMap G.Name)
type ValidatedSyntheticVariables = ValidatedVariables []
----------------------------------------------------------------------------------------------------
-- Live query plans
-- | A self-contained, ready-to-execute live query plan. Contains enough information
-- | A self-contained, ready-to-execute subscription plan. Contains enough information
-- to find an existing poller that this can be added to /or/ to create a new poller
-- if necessary.
data LiveQueryPlan (b :: BackendType) q = LiveQueryPlan
{ _lqpParameterizedPlan :: !(ParameterizedLiveQueryPlan b q),
_lqpSourceConfig :: !(SourceConfig b),
_lqpVariables :: !CohortVariables,
data SubscriptionQueryPlan (b :: BackendType) q = SubscriptionQueryPlan
{ _sqpParameterizedPlan :: !(ParameterizedSubscriptionQueryPlan b q),
_sqpSourceConfig :: !(SourceConfig b),
_sqpVariables :: !CohortVariables,
-- | We need to know if the source has a namespace so that we can wrap it around
-- the response from the DB
_lqpNamespace :: !(Maybe G.Name)
_sqpNamespace :: !(Maybe G.Name)
}
data ParameterizedLiveQueryPlan (b :: BackendType) q = ParameterizedLiveQueryPlan
data ParameterizedSubscriptionQueryPlan (b :: BackendType) q = ParameterizedSubscriptionQueryPlan
{ _plqpRole :: !RoleName,
_plqpQuery :: !q
}
deriving (Show)
$(J.deriveToJSON hasuraJSON ''ParameterizedLiveQueryPlan)
$(J.deriveToJSON hasuraJSON ''ParameterizedSubscriptionQueryPlan)
data LiveQueryPlanExplanation = LiveQueryPlanExplanation
{ _lqpeSql :: !Text,
_lqpePlan :: ![Text],
_lqpeVariables :: !CohortVariables
data SubscriptionQueryPlanExplanation = SubscriptionQueryPlanExplanation
{ _sqpeSql :: !Text,
_sqpePlan :: ![Text],
_sqpeVariables :: !CohortVariables
}
deriving (Show)
$(J.deriveToJSON hasuraJSON ''LiveQueryPlanExplanation)
$(J.deriveToJSON hasuraJSON ''SubscriptionQueryPlanExplanation)
--------------------------------------------------------------------------
--- Streaming Subscriptions
newtype CursorVariableValues = CursorVariableValues (HashMap G.Name TxtEncodedVal)
deriving (J.FromJSON, J.ToJSON, Eq, Show)

View File

@ -0,0 +1,45 @@
-- | Multiplexed subscription poller threads; see "Hasura.GraphQL.Execute.Subscription" for details.
module Hasura.GraphQL.Execute.Subscription.Poll
( -- * Pollers
Poller (..),
PollerId (..),
PollerIOState (..),
pollLiveQuery,
PollerKey (..),
PollerMap,
dumpPollerMap,
PollDetails (..),
BatchExecutionDetails (..),
CohortExecutionDetails (..),
SubscriptionPostPollHook,
defaultSubscriptionPostPollHook,
-- * Cohorts
Cohort (..),
CohortId,
newCohortId,
CohortVariables,
CohortKey,
CohortMap,
-- * Subscribers
Subscriber (..),
SubscriberId,
newSubscriberId,
SubscriberMetadata,
mkSubscriberMetadata,
unSubscriberMetadata,
SubscriberMap,
OnChange,
SubscriptionGQResponse,
SubscriptionResponse (..),
SubscriptionMetadata (..),
SubscriberExecutionDetails (..),
-- * Batch
BatchId (..),
)
where
import Hasura.GraphQL.Execute.Subscription.Poll.Common
import Hasura.GraphQL.Execute.Subscription.Poll.LiveQuery

View File

@ -1,23 +1,21 @@
{-# LANGUAGE TemplateHaskell #-}
-- | Multiplexed live query poller threads; see "Hasura.GraphQL.Execute.LiveQuery" for details.
module Hasura.GraphQL.Execute.LiveQuery.Poll
-- | Multiplexed subscription poller threads; see "Hasura.GraphQL.Execute.Subscription" for details.
module Hasura.GraphQL.Execute.Subscription.Poll.Common
( -- * Pollers
Poller (..),
PollerId (..),
PollerIOState (..),
pollQuery,
PollerKey (..),
PollerMap,
dumpPollerMap,
PollDetails (..),
BatchExecutionDetails (..),
CohortExecutionDetails (..),
LiveQueryPostPollHook,
defaultLiveQueryPostPollHook,
SubscriptionPostPollHook,
defaultSubscriptionPostPollHook,
-- * Cohorts
Cohort (..),
CohortSnapshot (..),
CohortId,
newCohortId,
CohortVariables,
@ -33,45 +31,38 @@ module Hasura.GraphQL.Execute.LiveQuery.Poll
unSubscriberMetadata,
SubscriberMap,
OnChange,
LGQResponse,
LiveQueryResponse (..),
LiveQueryMetadata (..),
SubscriptionGQResponse,
SubscriptionResponse (..),
SubscriptionMetadata (..),
SubscriberExecutionDetails (..),
-- * Batch
BatchId (..),
-- * Hash
ResponseHash (..),
mkRespHash,
)
where
import Control.Concurrent.Async qualified as A
import Control.Concurrent.STM qualified as STM
import Control.Immortal qualified as Immortal
import Control.Lens
import Crypto.Hash qualified as CH
import Data.Aeson.Extended qualified as J
import Data.Aeson qualified as J
import Data.ByteString qualified as BS
import Data.HashMap.Strict qualified as Map
import Data.List.Split (chunksOf)
import Data.Monoid (Sum (..))
import Data.Text.Extended
import Data.Time.Clock qualified as Clock
import Data.UUID qualified as UUID
import Data.UUID.V4 qualified as UUID
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.LiveQuery.Options
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Execute.LiveQuery.TMap qualified as TMap
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.WebSocket.Protocol (OperationId)
import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common (SourceName, getNonNegativeInt)
import Hasura.RQL.Types.Common (SourceName)
import Hasura.Server.Types (RequestId)
import Hasura.Session
import ListT qualified
@ -112,19 +103,19 @@ data Subscriber = Subscriber
_sOnChangeCallback :: !OnChange
}
-- | live query onChange metadata, used for adding more extra analytics data
data LiveQueryMetadata = LiveQueryMetadata
{ _lqmExecutionTime :: !Clock.DiffTime
-- | Subscription onChange metadata, used for adding more extra analytics data
data SubscriptionMetadata = SubscriptionMetadata
{ _sqmExecutionTime :: !Clock.DiffTime
}
data LiveQueryResponse = LiveQueryResponse
data SubscriptionResponse = SubscriptionResponse
{ _lqrPayload :: !BS.ByteString,
_lqrExecutionTime :: !Clock.DiffTime
}
type LGQResponse = GQResult LiveQueryResponse
type SubscriptionGQResponse = GQResult SubscriptionResponse
type OnChange = LGQResponse -> IO ()
type OnChange = SubscriptionGQResponse -> IO ()
type SubscriberMap = TMap.TMap SubscriberId Subscriber
@ -219,39 +210,6 @@ data CohortSnapshot = CohortSnapshot
_csNewSubscribers :: ![Subscriber]
}
pushResultToCohort ::
GQResult BS.ByteString ->
Maybe ResponseHash ->
LiveQueryMetadata ->
CohortSnapshot ->
-- | subscribers to which data has been pushed, subscribers which already
-- have this data (this information is exposed by metrics reporting)
IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort result !respHashM (LiveQueryMetadata dTime) cohortSnapshot = do
prevRespHashM <- STM.readTVarIO respRef
-- write to the current websockets if needed
(subscribersToPush, subscribersToIgnore) <-
if isExecError result || respHashM /= prevRespHashM
then do
$assertNFHere respHashM -- so we don't write thunks to mutable vars
STM.atomically $ STM.writeTVar respRef respHashM
return (newSinks <> curSinks, mempty)
else return (newSinks, curSinks)
pushResultToSubscribers subscribersToPush
pure $
over
(each . each)
( \Subscriber {..} ->
SubscriberExecutionDetails _sId _sMetadata
)
(subscribersToPush, subscribersToIgnore)
where
CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot
response = result <&> \payload -> LiveQueryResponse payload dTime
pushResultToSubscribers =
A.mapConcurrently_ $ \Subscriber {..} -> _sOnChangeCallback response
-- -----------------------------------------------------------------------------
-- Pollers
@ -385,6 +343,7 @@ batchExecutionDetailMinimal BatchExecutionDetails {..} =
<> batchRespSize
)
-- TODO consider refactoring into two types: one that is returned from pollLiveQuery and pollStreamingQuery, and a parent type containing pollerId, sourceName, and so on, which is assembled at the callsites of those two functions. Move postPollHook out of those functions to callsites
data PollDetails = PollDetails
{ -- | the unique ID (basically a thread that run as a 'Poller') for the
-- 'Poller'
@ -392,14 +351,14 @@ data PollDetails = PollDetails
-- | the multiplexed SQL query to be run against the database with all the
-- variables together
_pdGeneratedSql :: !Text,
-- | the time taken to get a snapshot of cohorts from our 'LiveQueriesState'
-- | the time taken to get a snapshot of cohorts from our 'SubscriptionsState'
-- data structure
_pdSnapshotTime :: !Clock.DiffTime,
-- | list of execution batches and their details
_pdBatches :: ![BatchExecutionDetails],
-- | total time spent on a poll cycle
_pdTotalTime :: !Clock.DiffTime,
_pdLiveQueryOptions :: !LiveQueriesOptions,
_pdLiveQueryOptions :: !SubscriptionsOptions,
_pdSource :: !SourceName,
_pdRole :: !RoleName,
_pdParameterizedQueryHash :: !ParameterizedQueryHash
@ -431,111 +390,8 @@ pollDetailMinimal PollDetails {..} =
instance L.ToEngineLog PollDetails L.Hasura where
toEngineLog pl = (L.LevelInfo, L.ELTLivequeryPollerLog, pollDetailMinimal pl)
type LiveQueryPostPollHook = PollDetails -> IO ()
type SubscriptionPostPollHook = PollDetails -> IO ()
-- the default LiveQueryPostPollHook
defaultLiveQueryPostPollHook :: L.Logger L.Hasura -> LiveQueryPostPollHook
defaultLiveQueryPostPollHook = L.unLogger
-- | Where the magic happens: the top-level action run periodically by each
-- active 'Poller'. This needs to be async exception safe.
pollQuery ::
forall b.
BackendTransport b =>
PollerId ->
LiveQueriesOptions ->
(SourceName, SourceConfig b) ->
RoleName ->
ParameterizedQueryHash ->
MultiplexedQuery b ->
CohortMap ->
LiveQueryPostPollHook ->
IO ()
pollQuery pollerId lqOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap postPollHook = do
(totalTime, (snapshotTime, batchesDetails)) <- withElapsedTime $ do
-- snapshot the current cohorts and split them into batches
(snapshotTime, cohortBatches) <- withElapsedTime $ do
-- get a snapshot of all the cohorts
-- this need not be done in a transaction
cohorts <- STM.atomically $ TMap.toList cohortMap
cohortSnapshots <- mapM (STM.atomically . getCohortSnapshot) cohorts
-- cohorts are broken down into batches specified by the batch size
let cohortBatches = chunksOf (getNonNegativeInt (unBatchSize batchSize)) cohortSnapshots
-- associating every batch with their BatchId
pure $ zip (BatchId <$> [1 ..]) cohortBatches
-- concurrently process each batch
batchesDetails <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do
(queryExecutionTime, mxRes) <- runDBSubscription @b sourceConfig query $ over (each . _2) _csVariables cohorts
let lqMeta = LiveQueryMetadata $ convertDuration queryExecutionTime
operations = getCohortOperations cohorts mxRes
-- batch response size is the sum of the response sizes of the cohorts
batchResponseSize =
case mxRes of
Left _ -> Nothing
Right resp -> Just $ getSum $ foldMap (Sum . BS.length . snd) resp
(pushTime, cohortsExecutionDetails) <- withElapsedTime $
A.forConcurrently operations $ \(res, cohortId, respData, snapshot) -> do
(pushedSubscribers, ignoredSubscribers) <-
pushResultToCohort res (fst <$> respData) lqMeta snapshot
pure
CohortExecutionDetails
{ _cedCohortId = cohortId,
_cedVariables = _csVariables snapshot,
_cedPushedTo = pushedSubscribers,
_cedIgnored = ignoredSubscribers,
_cedResponseSize = snd <$> respData,
_cedBatchId = batchId
}
pure $
BatchExecutionDetails
queryExecutionTime
pushTime
batchId
cohortsExecutionDetails
batchResponseSize
pure (snapshotTime, batchesDetails)
let pollDetails =
PollDetails
{ _pdPollerId = pollerId,
_pdGeneratedSql = toTxt query,
_pdSnapshotTime = snapshotTime,
_pdBatches = batchesDetails,
_pdLiveQueryOptions = lqOpts,
_pdTotalTime = totalTime,
_pdSource = sourceName,
_pdRole = roleName,
_pdParameterizedQueryHash = parameterizedQueryHash
}
postPollHook pollDetails
where
LiveQueriesOptions batchSize _ = lqOpts
getCohortSnapshot (cohortVars, handlerC) = do
let Cohort resId respRef curOpsTV newOpsTV = handlerC
curOpsL <- TMap.toList curOpsTV
newOpsL <- TMap.toList newOpsTV
forM_ newOpsL $ \(k, action) -> TMap.insert action k curOpsTV
TMap.reset newOpsTV
let cohortSnapshot = CohortSnapshot cohortVars respRef (map snd curOpsL) (map snd newOpsL)
return (resId, cohortSnapshot)
getCohortOperations cohorts = \case
Left e ->
-- TODO: this is internal error
let resp = throwError $ GQExecError [encodeGQLErr False e]
in [(resp, cohortId, Nothing, snapshot) | (cohortId, snapshot) <- cohorts]
Right responses -> do
let cohortSnapshotMap = Map.fromList cohorts
flip mapMaybe responses $ \(cohortId, respBS) ->
let respHash = mkRespHash respBS
respSize = BS.length respBS
in -- TODO: currently we ignore the cases when the cohortId from
-- Postgres response is not present in the cohort map of this batch
-- (this shouldn't happen but if it happens it means a logic error and
-- we should log it)
(pure respBS,cohortId,Just (respHash, respSize),)
<$> Map.lookup cohortId cohortSnapshotMap
-- the default SubscriptionPostPollHook
defaultSubscriptionPostPollHook :: L.Logger L.Hasura -> SubscriptionPostPollHook
defaultSubscriptionPostPollHook = L.unLogger

View File

@ -0,0 +1,169 @@
{-# LANGUAGE TemplateHaskell #-}
-- | Multiplexed subscription poller threads; see "Hasura.GraphQL.Execute.Subscription" for details.
module Hasura.GraphQL.Execute.Subscription.Poll.LiveQuery
( -- * Pollers
pollLiveQuery,
)
where
import Control.Concurrent.Async qualified as A
import Control.Concurrent.STM qualified as STM
import Control.Lens
import Data.ByteString qualified as BS
import Data.HashMap.Strict qualified as Map
import Data.List.Split (chunksOf)
import Data.Monoid (Sum (..))
import Data.Text.Extended
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Poll.Common hiding (Cohort (..), CohortMap, CohortSnapshot (..))
import Hasura.GraphQL.Execute.Subscription.Poll.Common qualified as C
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.Prelude
import Hasura.RQL.Types.Backend
import Hasura.RQL.Types.Common (SourceName, getNonNegativeInt)
import Hasura.Session
pushResultToCohort ::
GQResult BS.ByteString ->
Maybe ResponseHash ->
SubscriptionMetadata ->
C.CohortSnapshot ->
-- | subscribers to which data has been pushed, subscribers which already
-- have this data (this information is exposed by metrics reporting)
IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails])
pushResultToCohort result !respHashM (SubscriptionMetadata dTime) cohortSnapshot = do
prevRespHashM <- STM.readTVarIO respRef
-- write to the current websockets if needed
(subscribersToPush, subscribersToIgnore) <-
if isExecError result || respHashM /= prevRespHashM
then do
$assertNFHere respHashM -- so we don't write thunks to mutable vars
STM.atomically $ do
STM.writeTVar respRef respHashM
return (newSinks <> curSinks, mempty)
else return (newSinks, curSinks)
pushResultToSubscribers subscribersToPush
pure $
over
(each . each)
( \Subscriber {..} ->
SubscriberExecutionDetails _sId _sMetadata
)
(subscribersToPush, subscribersToIgnore)
where
C.CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot
response = result <&> (`SubscriptionResponse` dTime)
pushResultToSubscribers =
A.mapConcurrently_ $ \Subscriber {..} -> _sOnChangeCallback response
-- | Where the magic happens: the top-level action run periodically by each
-- active 'Poller'. This needs to be async exception safe.
pollLiveQuery ::
forall b.
BackendTransport b =>
PollerId ->
SubscriptionsOptions ->
(SourceName, SourceConfig b) ->
RoleName ->
ParameterizedQueryHash ->
MultiplexedQuery b ->
C.CohortMap ->
SubscriptionPostPollHook ->
IO ()
pollLiveQuery pollerId lqOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap postPollHook = do
(totalTime, (snapshotTime, batchesDetails)) <- withElapsedTime $ do
-- snapshot the current cohorts and split them into batches
(snapshotTime, cohortBatches) <- withElapsedTime $ do
-- get a snapshot of all the cohorts
-- this need not be done in a transaction
cohorts <- STM.atomically $ TMap.toList cohortMap
cohortSnapshots <- mapM (STM.atomically . getCohortSnapshot) cohorts
-- cohorts are broken down into batches specified by the batch size
let cohortBatches = chunksOf (getNonNegativeInt (unBatchSize batchSize)) cohortSnapshots
-- associating every batch with their BatchId
pure $ zip (BatchId <$> [1 ..]) cohortBatches
-- concurrently process each batch
batchesDetails <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do
(queryExecutionTime, mxRes) <- runDBSubscription @b sourceConfig query $ over (each . _2) C._csVariables cohorts
let lqMeta = SubscriptionMetadata $ convertDuration queryExecutionTime
operations = getCohortOperations cohorts mxRes
-- batch response size is the sum of the response sizes of the cohorts
batchResponseSize =
case mxRes of
Left _ -> Nothing
Right resp -> Just $ getSum $ foldMap (Sum . BS.length . snd) resp
(pushTime, cohortsExecutionDetails) <- withElapsedTime $
A.forConcurrently operations $ \(res, cohortId, respData, snapshot) -> do
(pushedSubscribers, ignoredSubscribers) <-
pushResultToCohort res (fst <$> respData) lqMeta snapshot
pure
CohortExecutionDetails
{ _cedCohortId = cohortId,
_cedVariables = C._csVariables snapshot,
_cedPushedTo = pushedSubscribers,
_cedIgnored = ignoredSubscribers,
_cedResponseSize = snd <$> respData,
_cedBatchId = batchId
}
pure $
BatchExecutionDetails
queryExecutionTime
pushTime
batchId
cohortsExecutionDetails
batchResponseSize
pure (snapshotTime, batchesDetails)
let pollDetails =
PollDetails
{ _pdPollerId = pollerId,
_pdGeneratedSql = toTxt query,
_pdSnapshotTime = snapshotTime,
_pdBatches = batchesDetails,
_pdLiveQueryOptions = lqOpts,
_pdTotalTime = totalTime,
_pdSource = sourceName,
_pdRole = roleName,
_pdParameterizedQueryHash = parameterizedQueryHash
}
postPollHook pollDetails
where
SubscriptionsOptions batchSize _ = lqOpts
getCohortSnapshot (cohortVars, handlerC) = do
let C.Cohort resId respRef curOpsTV newOpsTV = handlerC
curOpsL <- TMap.toList curOpsTV
newOpsL <- TMap.toList newOpsTV
forM_ newOpsL $ \(k, action) -> TMap.insert action k curOpsTV
TMap.reset newOpsTV
let cohortSnapshot = C.CohortSnapshot cohortVars respRef (map snd curOpsL) (map snd newOpsL)
return (resId, cohortSnapshot)
getCohortOperations cohorts = \case
Left e ->
-- TODO: this is internal error
let resp = throwError $ GQExecError [encodeGQLErr False e]
in [(resp, cohortId, Nothing, snapshot) | (cohortId, snapshot) <- cohorts]
Right responses -> do
let cohortSnapshotMap = Map.fromList cohorts
flip mapMaybe responses $ \(cohortId, respBS) ->
let respHash = mkRespHash respBS
respSize = BS.length respBS
in -- TODO: currently we ignore the cases when the cohortId from
-- Postgres response is not present in the cohort map of this batch
-- (this shouldn't happen but if it happens it means a logic error and
-- we should log it)
(pure respBS,cohortId,Just (respHash, respSize),)
<$> Map.lookup cohortId cohortSnapshotMap

View File

@ -1,14 +1,15 @@
{-# LANGUAGE TemplateHaskell #-}
-- | Top-level management of live query poller threads. The implementation of the polling itself is
-- in "Hasura.GraphQL.Execute.LiveQuery.Poll". See "Hasura.GraphQL.Execute.LiveQuery" for high-level
-- | Top-level management of subscription poller threads.
-- The implementation of the polling itself is
-- in "Hasura.GraphQL.Execute.Subscription.Poll". See "Hasura.GraphQL.Execute.Subscription" for high-level
-- details.
module Hasura.GraphQL.Execute.LiveQuery.State
( LiveQueriesState (..),
initLiveQueriesState,
dumpLiveQueriesState,
LiveQueryId,
LiveQueryPostPollHook,
module Hasura.GraphQL.Execute.Subscription.State
( SubscriptionsState (..),
initSubscriptionsState,
dumpSubscriptionsState,
SubscriberDetails,
SubscriptionPostPollHook,
addLiveQuery,
removeLiveQuery,
LiveAsyncActionQueryOnSource (..),
@ -18,6 +19,7 @@ module Hasura.GraphQL.Execute.LiveQuery.State
AsyncActionSubscriptionState,
addAsyncActionLiveQuery,
removeAsyncActionLiveQuery,
LiveQuerySubscriberDetails,
)
where
@ -32,10 +34,10 @@ import Data.UUID.V4 qualified as UUID
import GHC.AssertNF.CPP
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.LiveQuery.Options
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Execute.LiveQuery.Poll
import Hasura.GraphQL.Execute.LiveQuery.TMap qualified as TMap
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.Poll
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
import Hasura.GraphQL.Transport.Backend
import Hasura.GraphQL.Transport.HTTP.Protocol (OperationName)
@ -49,61 +51,69 @@ import Hasura.Server.Types (RequestId)
import StmContainers.Map qualified as STMMap
import System.Metrics.Gauge qualified as EKG.Gauge
-- | The top-level datatype that holds the state for all active live queries.
-- | The top-level datatype that holds the state for all active subscriptions.
--
-- NOTE!: This must be kept consistent with a websocket connection's
-- 'OperationMap', in 'onClose' and 'onStart'.
data LiveQueriesState = LiveQueriesState
{ _lqsOptions :: !LiveQueriesOptions,
_lqsLiveQueryMap :: !PollerMap,
data SubscriptionsState = SubscriptionsState
{ _ssLiveQueryOptions :: !LiveQueriesOptions,
_ssLiveQueryMap :: !PollerMap,
-- | A hook function which is run after each fetch cycle
_lqsPostPollHook :: !LiveQueryPostPollHook,
_lqsAsyncActions :: !AsyncActionSubscriptionState
_ssPostPollHook :: !SubscriptionPostPollHook,
_ssAsyncActions :: !AsyncActionSubscriptionState
}
initLiveQueriesState ::
LiveQueriesOptions -> LiveQueryPostPollHook -> IO LiveQueriesState
initLiveQueriesState options pollHook =
initSubscriptionsState ::
LiveQueriesOptions -> SubscriptionPostPollHook -> IO SubscriptionsState
initSubscriptionsState liveQOptions pollHook =
STM.atomically $
LiveQueriesState options <$> STMMap.new <*> pure pollHook <*> TMap.new
SubscriptionsState liveQOptions
<$> STMMap.new
<*> pure pollHook
<*> TMap.new
dumpLiveQueriesState :: Bool -> LiveQueriesState -> IO J.Value
dumpLiveQueriesState extended (LiveQueriesState opts lqMap _ _) = do
dumpSubscriptionsState :: Bool -> SubscriptionsState -> IO J.Value
dumpSubscriptionsState extended (SubscriptionsState liveQOpts lqMap _ _) = do
lqMapJ <- dumpPollerMap extended lqMap
return $
J.object
[ "options" J..= opts,
[ "options" J..= liveQOpts,
"live_queries_map" J..= lqMapJ
]
data LiveQueryId = LiveQueryId
{ _lqiPoller :: !PollerKey,
_lqiCohort :: !CohortKey,
_lqiSubscriber :: !SubscriberId
-- | SubscriberDetails contains the data required to locate a subscriber
-- in the correct cohort within the correct poller in the operation map.
data SubscriberDetails a = SubscriberDetails
{ _sdPoller :: !PollerKey,
_sdCohort :: !a,
_sdSubscriber :: !SubscriberId
}
deriving (Show)
type LiveQuerySubscriberDetails = SubscriberDetails CohortKey
-- | Fork a thread handling a regular (live query) subscription
addLiveQuery ::
forall b.
BackendTransport b =>
L.Logger L.Hasura ->
ServerMetrics ->
SubscriberMetadata ->
LiveQueriesState ->
SubscriptionsState ->
SourceName ->
ParameterizedQueryHash ->
-- | operation name of the query
Maybe OperationName ->
RequestId ->
LiveQueryPlan b (MultiplexedQuery b) ->
SubscriptionQueryPlan b (MultiplexedQuery b) ->
-- | the action to be executed when result changes
OnChange ->
IO LiveQueryId
IO LiveQuerySubscriberDetails
addLiveQuery
logger
serverMetrics
subscriberMetadata
lqState
subscriptionState
source
parameterizedQueryHash
operationName
@ -139,9 +149,9 @@ addLiveQuery
-- cancelled after putTMVar
onJust handlerM $ \handler -> do
pollerId <- PollerId <$> UUID.nextRandom
threadRef <- forkImmortal ("pollQuery." <> show pollerId) logger $
threadRef <- forkImmortal ("pollLiveQuery." <> show pollerId) logger $
forever $ do
pollQuery @b pollerId lqOpts (source, sourceConfig) role parameterizedQueryHash query (_pCohorts handler) postPollHook
pollLiveQuery @b pollerId lqOpts (source, sourceConfig) role parameterizedQueryHash query (_pCohorts handler) postPollHook
sleep $ unNonNegativeDiffTime $ unRefetchInterval refetchInterval
let !pState = PollerIOState threadRef pollerId
$assertNFHere pState -- so we don't write thunks to mutable vars
@ -149,11 +159,11 @@ addLiveQuery
liftIO $ EKG.Gauge.inc $ smActiveSubscriptions serverMetrics
pure $ LiveQueryId handlerId cohortKey subscriberId
pure $ SubscriberDetails handlerId cohortKey subscriberId
where
LiveQueriesState lqOpts lqMap postPollHook _ = lqState
LiveQueriesOptions _ refetchInterval = lqOpts
LiveQueryPlan (ParameterizedLiveQueryPlan role query) sourceConfig cohortKey _ = plan
SubscriptionsState lqOpts lqMap postPollHook _ = subscriptionState
SubscriptionsOptions _ refetchInterval = lqOpts
SubscriptionQueryPlan (ParameterizedSubscriptionQueryPlan role query) sourceConfig cohortKey _ = plan
handlerId = PollerKey source role $ toTxt query
@ -161,7 +171,11 @@ addLiveQuery
TMap.insert subscriber (_sId subscriber) $ _cNewSubscribers handlerC
addToPoller subscriber cohortId handler = do
!newCohort <- Cohort cohortId <$> STM.newTVar Nothing <*> TMap.new <*> TMap.new
!newCohort <-
Cohort cohortId
<$> STM.newTVar Nothing
<*> TMap.new
<*> TMap.new
addToCohort subscriber newCohort
TMap.insert newCohort cohortKey $ _pCohorts handler
@ -170,23 +184,23 @@ addLiveQuery
removeLiveQuery ::
L.Logger L.Hasura ->
ServerMetrics ->
LiveQueriesState ->
SubscriptionsState ->
-- the query and the associated operation
LiveQueryId ->
LiveQuerySubscriberDetails ->
IO ()
removeLiveQuery logger serverMetrics lqState lqId@(LiveQueryId handlerId cohortId sinkId) = mask_ $ do
removeLiveQuery logger serverMetrics lqState lqId@(SubscriberDetails handlerId cohortId sinkId) = mask_ $ do
mbCleanupIO <- STM.atomically $ do
detM <- getQueryDet
detM <- getQueryDet lqMap
fmap join $
forM detM $ \(Poller cohorts ioState, cohort) ->
cleanHandlerC cohorts ioState cohort
sequence_ mbCleanupIO
liftIO $ EKG.Gauge.dec $ smActiveSubscriptions serverMetrics
where
lqMap = _lqsLiveQueryMap lqState
lqMap = _ssLiveQueryMap lqState
getQueryDet = do
pollerM <- STMMap.lookup handlerId lqMap
getQueryDet subMap = do
pollerM <- STMMap.lookup handlerId subMap
fmap join $
forM pollerM $ \poller -> do
cohortM <- TMap.lookup cohortId (_pCohorts poller)
@ -228,12 +242,12 @@ removeLiveQuery logger serverMetrics lqState lqId@(LiveQueryId handlerId cohortI
-- in the source database so as to fetch response joined with relationship rows.
-- For more details see Note [Resolving async action query]
data LiveAsyncActionQueryOnSource = LiveAsyncActionQueryOnSource
{ _laaqpCurrentLqId :: !LiveQueryId,
{ _laaqpCurrentLqId :: !LiveQuerySubscriberDetails,
_laaqpPrevActionLogMap :: !ActionLogResponseMap,
-- | An IO action to restart the live query poller with updated action log responses fetched from metadata storage
-- Restarting a live query re-generates the SQL statement with new action log responses to send latest action
-- response to the client.
_laaqpRestartLq :: !(LiveQueryId -> ActionLogResponseMap -> IO (Maybe LiveQueryId))
_laaqpRestartLq :: !(LiveQuerySubscriberDetails -> ActionLogResponseMap -> IO (Maybe LiveQuerySubscriberDetails))
}
data LiveAsyncActionQueryWithNoRelationships = LiveAsyncActionQueryWithNoRelationships

View File

@ -1,4 +1,4 @@
module Hasura.GraphQL.Execute.LiveQuery.TMap
module Hasura.GraphQL.Execute.Subscription.TMap
( TMap,
new,
reset,
@ -7,12 +7,16 @@ module Hasura.GraphQL.Execute.LiveQuery.TMap
insert,
delete,
toList,
replace,
union,
filterWithKey,
getMap,
)
where
import Control.Concurrent.STM
import Data.HashMap.Strict qualified as Map
import Hasura.Prelude hiding (lookup, null, toList)
import Hasura.Prelude hiding (lookup, null, toList, union)
-- | A coarse-grained transactional map implemented by simply wrapping a 'Map.HashMap' in a 'TVar'.
-- Compared to "StmContainers.Map", this provides much faster iteration over the elements at the
@ -39,3 +43,18 @@ delete k mapTv = modifyTVar' (unTMap mapTv) $ Map.delete k
toList :: TMap k v -> STM [(k, v)]
toList = fmap Map.toList . readTVar . unTMap
filterWithKey :: (k -> v -> Bool) -> TMap k v -> STM ()
filterWithKey f mapTV = modifyTVar' (unTMap mapTV) $ Map.filterWithKey f
replace :: TMap k v -> Map.HashMap k v -> STM ()
replace mapTV v = void $ swapTVar (unTMap mapTV) v
union :: (Eq k, Hashable k) => TMap k v -> TMap k v -> STM (TMap k v)
union mapA mapB = do
l <- readTVar $ unTMap mapA
r <- readTVar $ unTMap mapB
TMap <$> newTVar (Map.union l r)
getMap :: TMap k v -> STM (Map.HashMap k v)
getMap = readTVar . unTMap

View File

@ -109,9 +109,9 @@ explainGQLQuery sc (GQLExplain query userVarsRaw maybeIsRelay) = do
E.SEAsyncActionsWithNoRelationships _ -> throw400 NotSupported "async action query fields without relationships to table cannot be explained"
E.SEOnSourceDB actionIds liveQueryBuilder -> do
actionLogResponseMap <- fst <$> E.fetchActionLogResponses actionIds
(_, E.LQP exists) <- liftEitherM $ liftIO $ runExceptT $ liveQueryBuilder actionLogResponseMap
AB.dispatchAnyBackend @BackendExecute exists \(E.MultiplexedLiveQueryPlan execPlan) ->
encJFromJValue <$> mkLiveQueryExplain execPlan
(_, E.SubscriptionQueryPlan exists) <- liftEitherM $ liftIO $ runExceptT $ liveQueryBuilder actionLogResponseMap
AB.dispatchAnyBackend @BackendExecute exists \(E.MultiplexedSubscriptionQueryPlan execPlan) ->
encJFromJValue <$> mkSubscriptionExplain execPlan
where
queryType = bool E.QueryHasura E.QueryRelay $ Just True == maybeIsRelay
sessionVariables = mkSessionVariablesText $ fromMaybe mempty userVarsRaw

View File

@ -7,7 +7,7 @@ import Data.ByteString qualified as B
import Hasura.Base.Error
import Hasura.EncJSON
import Hasura.GraphQL.Execute.Backend
import Hasura.GraphQL.Execute.LiveQuery.Plan
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Logging (MonadQueryLog)
import Hasura.GraphQL.Namespace (RootFieldAlias)
import Hasura.GraphQL.Transport.HTTP.Protocol

View File

@ -15,7 +15,7 @@ import Data.Environment qualified as Env
import Data.Text (pack, unpack)
import Hasura.GraphQL.Execute qualified as E
import Hasura.GraphQL.Execute.Backend qualified as EB
import Hasura.GraphQL.Execute.LiveQuery.State qualified as LQ
import Hasura.GraphQL.Execute.Subscription.State qualified as ES
import Hasura.GraphQL.Logging
import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery)
import Hasura.GraphQL.Transport.Instances ()
@ -96,7 +96,7 @@ stopWSServerApp wsEnv = WS.shutdown (_wseServer wsEnv)
createWSServerEnv ::
(MonadIO m) =>
L.Logger L.Hasura ->
LQ.LiveQueriesState ->
ES.SubscriptionsState ->
IO (SchemaCache, SchemaCacheVer) ->
HTTP.Manager ->
CorsPolicy ->

View File

@ -47,10 +47,10 @@ import Hasura.EncJSON
import Hasura.GraphQL.Execute qualified as E
import Hasura.GraphQL.Execute.Action qualified as EA
import Hasura.GraphQL.Execute.Backend qualified as EB
import Hasura.GraphQL.Execute.LiveQuery.Plan qualified as LQ
import Hasura.GraphQL.Execute.LiveQuery.Poll qualified as LQ
import Hasura.GraphQL.Execute.LiveQuery.State qualified as LQ
import Hasura.GraphQL.Execute.RemoteJoin qualified as RJ
import Hasura.GraphQL.Execute.Subscription.Plan qualified as ES
import Hasura.GraphQL.Execute.Subscription.Poll qualified as ES
import Hasura.GraphQL.Execute.Subscription.State qualified as ES
import Hasura.GraphQL.Logging
import Hasura.GraphQL.Namespace (RootFieldAlias)
import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash)
@ -92,7 +92,7 @@ import Network.HTTP.Types qualified as HTTP
import Network.WebSockets qualified as WS
import StmContainers.Map qualified as STMMap
-- | 'LQ.LiveQueryId' comes from 'Hasura.GraphQL.Execute.LiveQuery.State.addLiveQuery'. We use
-- | 'ES.LiveQueryId' comes from 'Hasura.GraphQL.Execute.LiveQuery.State.addLiveQuery'. We use
-- this to track a connection's operations so we can remove them from 'LiveQueryState', and
-- log.
--
@ -249,9 +249,9 @@ sendMsgWithMetadata ::
ServerMsg ->
Maybe OperationName ->
Maybe ParameterizedQueryHash ->
LQ.LiveQueryMetadata ->
ES.SubscriptionMetadata ->
m ()
sendMsgWithMetadata wsConn msg opName paramQueryHash (LQ.LiveQueryMetadata execTime) =
sendMsgWithMetadata wsConn msg opName paramQueryHash (ES.SubscriptionMetadata execTime) =
liftIO $ WS.sendMsg wsConn $ WS.WSQueueResponse bs wsInfo
where
bs = encodeServerMsg msg
@ -479,7 +479,7 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
case cachedValue of
Just cachedResponseData -> do
logQueryLog logger $ QueryLog q Nothing requestId QueryLogKindCached
sendSuccResp cachedResponseData opName parameterizedQueryHash $ LQ.LiveQueryMetadata 0
sendSuccResp cachedResponseData opName parameterizedQueryHash $ ES.SubscriptionMetadata 0
Nothing -> do
conclusion <- runExceptT $
runLimits $
@ -543,7 +543,7 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
telemTimeIO = convertDuration telemTimeIO_DT
telemTimeTot <- Seconds <$> timerTot
sendSuccResp (encodeEncJSONResults results) opName parameterizedQueryHash $
LQ.LiveQueryMetadata telemTimeIO_DT
ES.SubscriptionMetadata telemTimeIO_DT
-- Telemetry. NOTE: don't time network IO:
Telem.recordTimingMetric Telem.RequestDimensions {..} Telem.RequestTimings {..}
@ -612,14 +612,14 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
pure $
encJToLBS $
encodeEncJSONResults results
sendMsgWithMetadata wsConn dataMsg opName (Just parameterizedQueryHash) $ LQ.LiveQueryMetadata dTime
sendMsgWithMetadata wsConn dataMsg opName (Just parameterizedQueryHash) $ ES.SubscriptionMetadata dTime
asyncActionQueryLive =
LQ.LAAQNoRelationships $
LQ.LiveAsyncActionQueryWithNoRelationships sendResponseIO (sendCompleted (Just requestId) (Just parameterizedQueryHash))
ES.LAAQNoRelationships $
ES.LiveAsyncActionQueryWithNoRelationships sendResponseIO (sendCompleted (Just requestId) (Just parameterizedQueryHash))
LQ.addAsyncActionLiveQuery
(LQ._lqsAsyncActions lqMap)
ES.addAsyncActionLiveQuery
(ES._ssAsyncActions lqMap)
opId
actionIds
(sendError requestId)
@ -640,15 +640,15 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
logQueryLog logger $ QueryLog q Nothing requestId QueryLogKindAction
liftIO $ do
let asyncActionQueryLive =
LQ.LAAQOnSourceDB $
LQ.LiveAsyncActionQueryOnSource lqId actionLogMap $
ES.LAAQOnSourceDB $
ES.LiveAsyncActionQueryOnSource lqId actionLogMap $
restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder
onUnexpectedException err = do
sendError requestId err
stopOperation serverEnv wsConn opId (pure ()) -- Don't log in case opId don't exist
LQ.addAsyncActionLiveQuery
(LQ._lqsAsyncActions lqMap)
ES.addAsyncActionLiveQuery
(ES._ssAsyncActions lqMap)
opId
nonEmptyActionIds
onUnexpectedException
@ -703,7 +703,7 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
telemTimeIO = convertDuration $ sum $ fmap arpTimeIO results
telemTimeTot <- Seconds <$> timerTot
sendSuccResp (encodeAnnotatedResponseParts results) opName pqh $
LQ.LiveQueryMetadata $ sum $ fmap arpTimeIO results
ES.SubscriptionMetadata $ sum $ fmap arpTimeIO results
-- Telemetry. NOTE: don't time network IO:
Telem.recordTimingMetric Telem.RequestDimensions {..} Telem.RequestTimings {..}
@ -798,7 +798,7 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
EncJSON ->
Maybe OperationName ->
ParameterizedQueryHash ->
LQ.LiveQueryMetadata ->
ES.SubscriptionMetadata ->
ExceptT () m ()
sendSuccResp encJson opName queryHash =
sendMsgWithMetadata
@ -814,20 +814,20 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
throwError ()
restartLiveQuery parameterizedQueryHash requestId liveQueryBuilder lqId actionLogMap = do
LQ.removeLiveQuery logger (_wseServerMetrics serverEnv) lqMap lqId
ES.removeLiveQuery logger (_wseServerMetrics serverEnv) lqMap lqId
either (const Nothing) Just <$> startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap
startLiveQuery liveQueryBuilder parameterizedQueryHash requestId actionLogMap = do
liveQueryE <- runExceptT $ liveQueryBuilder actionLogMap
for liveQueryE $ \(sourceName, E.LQP exists) -> do
for liveQueryE $ \(sourceName, E.SubscriptionQueryPlan exists) -> do
let !opName = _grOperationName q
subscriberMetadata = LQ.mkSubscriberMetadata (WS.getWSId wsConn) opId opName requestId
subscriberMetadata = ES.mkSubscriberMetadata (WS.getWSId wsConn) opId opName requestId
-- NOTE!: we mask async exceptions higher in the call stack, but it's
-- crucial we don't lose lqId after addLiveQuery returns successfully.
!lqId <- liftIO $ AB.dispatchAnyBackend @BackendTransport
exists
\(E.MultiplexedLiveQueryPlan liveQueryPlan) ->
LQ.addLiveQuery
\(E.MultiplexedSubscriptionQueryPlan liveQueryPlan) ->
ES.addLiveQuery
logger
(_wseServerMetrics serverEnv)
subscriberMetadata
@ -837,7 +837,7 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
opName
requestId
liveQueryPlan
(liveQOnChange opName parameterizedQueryHash $ LQ._lqpNamespace liveQueryPlan)
(liveQOnChange opName parameterizedQueryHash $ ES._sqpNamespace liveQueryPlan)
liftIO $ $assertNFHere (lqId, opName) -- so we don't write thunks to mutable vars
STM.atomically $
-- NOTE: see crucial `lookup` check above, ensuring this doesn't clobber:
@ -845,18 +845,18 @@ onStart env enabledLogTypes serverEnv wsConn (StartMsg opId q) onMessageActions
pure lqId
-- on change, send message on the websocket
liveQOnChange :: Maybe OperationName -> ParameterizedQueryHash -> Maybe Name -> LQ.OnChange
liveQOnChange :: Maybe OperationName -> ParameterizedQueryHash -> Maybe Name -> ES.OnChange
liveQOnChange opName queryHash namespace = \case
Right (LQ.LiveQueryResponse bs dTime) ->
Right (ES.SubscriptionResponse bs dTime) ->
sendMsgWithMetadata
wsConn
(sendDataMsg $ DataMsg opId $ pure $ maybe LBS.fromStrict wrapNamespace namespace bs)
opName
(Just queryHash)
(LQ.LiveQueryMetadata dTime)
(ES.SubscriptionMetadata dTime)
resp ->
sendMsg wsConn $
sendDataMsg $ DataMsg opId $ LBS.fromStrict . LQ._lqrPayload <$> resp
sendDataMsg $ DataMsg opId $ LBS.fromStrict . ES._lqrPayload <$> resp
-- If the source has a namespace then we need to wrap the response
-- from the DB in that namespace.
@ -952,7 +952,7 @@ stopOperation serverEnv wsConn opId logWhenOpNotExist = do
case opM of
Just (lqId, opNameM) -> do
logWSEvent logger wsConn $ EOperation $ opDet opNameM
LQ.removeLiveQuery logger (_wseServerMetrics serverEnv) lqMap lqId
ES.removeLiveQuery logger (_wseServerMetrics serverEnv) lqMap lqId
Nothing -> logWhenOpNotExist
STM.atomically $ STMMap.delete opId opMap
where
@ -1035,7 +1035,7 @@ onClose ::
MonadIO m =>
L.Logger L.Hasura ->
ServerMetrics ->
LQ.LiveQueriesState ->
ES.SubscriptionsState ->
WSConn ->
m ()
onClose logger serverMetrics lqMap wsConn = do
@ -1043,6 +1043,6 @@ onClose logger serverMetrics lqMap wsConn = do
operations <- liftIO $ STM.atomically $ ListT.toList $ STMMap.listT opMap
liftIO $
for_ operations $ \(_, (lqId, _)) ->
LQ.removeLiveQuery logger serverMetrics lqMap lqId
ES.removeLiveQuery logger serverMetrics lqMap lqId
where
opMap = _wscOpMap $ WS.getData wsConn

View File

@ -12,7 +12,7 @@ where
import Control.Concurrent.STM qualified as STM
import Data.Time.Clock qualified as TC
import Hasura.GraphQL.Execute qualified as E
import Hasura.GraphQL.Execute.LiveQuery.State qualified as LQ
import Hasura.GraphQL.Execute.Subscription.State qualified as ES
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.Instances ()
import Hasura.GraphQL.Transport.WebSocket.Protocol
@ -70,7 +70,7 @@ data WSConnData = WSConnData
data WSServerEnv = WSServerEnv
{ _wseLogger :: !(L.Logger L.Hasura),
_wseLiveQMap :: !LQ.LiveQueriesState,
_wseLiveQMap :: !ES.SubscriptionsState,
-- | an action that always returns the latest version of the schema cache. See 'SchemaCacheRef'.
_wseGCtxMap :: !(IO (SchemaCache, SchemaCacheVer)),
_wseHManager :: !HTTP.Manager,
@ -83,7 +83,7 @@ data WSServerEnv = WSServerEnv
_wseServerMetrics :: !ServerMetrics
}
type OperationMap = STMMap.Map OperationId (LQ.LiveQueryId, Maybe OperationName)
type OperationMap = STMMap.Map OperationId (ES.LiveQuerySubscriberDetails, Maybe OperationName)
type WSServer = WS.WSServer WSConnData

View File

@ -0,0 +1,14 @@
module Hasura.RQL.Types.Subscription
( CursorOrdering (..),
SubscriptionType (..),
)
where
import Hasura.Prelude
-- | CursorOrdering is used in the streaming subscriptions to specify how to order the cursor.
data CursorOrdering = COAscending | CODescending deriving (Show, Eq, Generic)
instance Hashable CursorOrdering
data SubscriptionType = Streaming | LiveQuery deriving (Show, Eq, Generic)

View File

@ -10,7 +10,7 @@ where
import Data.Aeson.TH
import Data.HashSet qualified as Set
import Hasura.GraphQL.Execute.LiveQuery.Options qualified as LQ
import Hasura.GraphQL.Execute.Subscription.Options qualified as ES
import Hasura.Prelude
import Hasura.RQL.Types
( FunctionPermissionsCtx,
@ -39,7 +39,7 @@ data ServerConfig = ServerConfig
scfgIsJwtSet :: !Bool,
scfgJwt :: ![JWTInfo],
scfgIsAllowListEnabled :: !Bool,
scfgLiveQueries :: !LQ.LiveQueriesOptions,
scfgLiveQueries :: !ES.LiveQueriesOptions,
scfgConsoleAssetsDir :: !(Maybe Text),
scfgExperimentalFeatures :: !(Set.HashSet ExperimentalFeature)
}
@ -52,7 +52,7 @@ runGetConfig ::
RemoteSchemaPermsCtx ->
AuthMode ->
Bool ->
LQ.LiveQueriesOptions ->
ES.LiveQueriesOptions ->
Maybe Text ->
Set.HashSet ExperimentalFeature ->
ServerConfig

View File

@ -45,9 +45,9 @@ import Hasura.Base.Error
import Hasura.EncJSON
import Hasura.GraphQL.Execute qualified as E
import Hasura.GraphQL.Execute.Backend qualified as EB
import Hasura.GraphQL.Execute.LiveQuery.Options qualified as EL
import Hasura.GraphQL.Execute.LiveQuery.Poll qualified as EL
import Hasura.GraphQL.Execute.LiveQuery.State qualified as EL
import Hasura.GraphQL.Execute.Subscription.Options qualified as ES
import Hasura.GraphQL.Execute.Subscription.Poll qualified as ES
import Hasura.GraphQL.Execute.Subscription.State qualified as ES
import Hasura.GraphQL.Explain qualified as GE
import Hasura.GraphQL.Logging (MonadQueryLog)
import Hasura.GraphQL.Transport.HTTP qualified as GH
@ -109,7 +109,7 @@ data ServerCtx = ServerCtx
scSQLGenCtx :: !SQLGenCtx,
scEnabledAPIs :: !(S.HashSet API),
scInstanceId :: !InstanceId,
scLQState :: !EL.LiveQueriesState,
scSubscriptionState :: !ES.SubscriptionsState,
scEnableAllowlist :: !Bool,
scEkgStore :: !(EKG.Store EKG.EmptyMetrics),
scResponseInternalErrorsConfig :: !ResponseInternalErrorsConfig,
@ -675,7 +675,7 @@ configApiGetHandler serverCtx@ServerCtx {..} consoleAssetsDir =
scRemoteSchemaPermsCtx
scAuthMode
scEnableAllowlist
(EL._lqsOptions $ scLQState)
(ES._ssLiveQueryOptions $ scSubscriptionState)
consoleAssetsDir
scExperimentalFeatures
return (emptyHttpLogMetadata @m, JSONResp $ HttpResponse (encJFromJValue res) [])
@ -683,7 +683,7 @@ configApiGetHandler serverCtx@ServerCtx {..} consoleAssetsDir =
data HasuraApp = HasuraApp
{ _hapApplication :: !Wai.Application,
_hapSchemaRef :: !SchemaCacheRef,
_hapAsyncActionSubscriptionState :: !EL.AsyncActionSubscriptionState,
_hapAsyncActionSubscriptionState :: !ES.AsyncActionSubscriptionState,
_hapShutdownWsServer :: !(IO ())
}
@ -734,9 +734,9 @@ mkWaiApp ::
InstanceId ->
-- | set of the enabled 'API's
S.HashSet API ->
EL.LiveQueriesOptions ->
ES.LiveQueriesOptions ->
ResponseInternalErrorsConfig ->
Maybe EL.LiveQueryPostPollHook ->
Maybe ES.SubscriptionPostPollHook ->
SchemaCacheRef ->
EKG.Store EKG.EmptyMetrics ->
ServerMetrics ->
@ -785,13 +785,13 @@ mkWaiApp
let getSchemaCache' = first lastBuiltSchemaCache <$> readSchemaCacheRef schemaCacheRef
let corsPolicy = mkDefaultCorsPolicy corsCfg
postPollHook = fromMaybe (EL.defaultLiveQueryPostPollHook logger) liveQueryHook
postPollHook = fromMaybe (ES.defaultSubscriptionPostPollHook logger) liveQueryHook
lqState <- liftIO $ EL.initLiveQueriesState lqOpts postPollHook
subscriptionsState <- liftIO $ ES.initSubscriptionsState lqOpts postPollHook
wsServerEnv <-
WS.createWSServerEnv
logger
lqState
subscriptionsState
getSchemaCache'
httpManager
corsPolicy
@ -810,7 +810,7 @@ mkWaiApp
scSQLGenCtx = sqlGenCtx,
scEnabledAPIs = apis,
scInstanceId = instanceId,
scLQState = lqState,
scSubscriptionState = subscriptionsState,
scEnableAllowlist = enableAL,
scEkgStore = ekgStore,
scEnvironment = env,
@ -835,7 +835,7 @@ mkWaiApp
waiApp <- liftWithStateless $ \lowerIO ->
pure $ WSC.websocketsOr connectionOptions (\ip conn -> lowerIO $ wsServerApp ip conn) spockApp
return $ HasuraApp waiApp schemaCacheRef (EL._lqsAsyncActions lqState) stopWSServer
return $ HasuraApp waiApp schemaCacheRef (ES._ssAsyncActions subscriptionsState) stopWSServer
httpApp ::
forall m.
@ -1028,13 +1028,13 @@ httpApp setupHook corsCfg serverCtx enableConsole consoleAssetsDir enableTelemet
spockAction encodeQErr id $
mkGetHandler $ do
onlyAdmin
respJ <- liftIO $ EL.dumpLiveQueriesState False $ scLQState serverCtx
respJ <- liftIO $ ES.dumpSubscriptionsState False $ scSubscriptionState serverCtx
return (emptyHttpLogMetadata @m, JSONResp $ HttpResponse (encJFromJValue respJ) [])
Spock.get "dev/subscriptions/extended" $
spockAction encodeQErr id $
mkGetHandler $ do
onlyAdmin
respJ <- liftIO $ EL.dumpLiveQueriesState True $ scLQState serverCtx
respJ <- liftIO $ ES.dumpSubscriptionsState True $ scSubscriptionState serverCtx
return (emptyHttpLogMetadata @m, JSONResp $ HttpResponse (encJFromJValue respJ) [])
Spock.get "api/swagger/json" $
spockAction encodeQErr id $

View File

@ -25,7 +25,7 @@ import Hasura.Backends.Postgres.Connection
import Hasura.Base.Error
import Hasura.Cache.Bounded qualified as Cache (CacheSize, parseCacheSize)
import Hasura.Eventing.EventTrigger (defaultFetchBatchSize)
import Hasura.GraphQL.Execute.LiveQuery.Options qualified as LQ
import Hasura.GraphQL.Execute.Subscription.Options qualified as ES
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.Types
@ -364,7 +364,7 @@ mkServeOptions rso = do
mkLQOpts = do
mxRefetchIntM <- withEnv (rsoMxRefetchInt rso) $ fst mxRefetchDelayEnv
mxBatchSizeM <- withEnv (rsoMxBatchSize rso) $ fst mxBatchSizeEnv
return $ LQ.mkLiveQueriesOptions mxBatchSizeM mxRefetchIntM
return $ ES.mkSubscriptionsOptions mxBatchSizeM mxRefetchIntM
mkExamplesDoc :: [[String]] -> PP.Doc
mkExamplesDoc exampleLines =
@ -1118,7 +1118,7 @@ parseGracefulShutdownTimeout =
<> help (snd gracefulShutdownEnv)
)
parseMxRefetchInt :: Parser (Maybe LQ.RefetchInterval)
parseMxRefetchInt :: Parser (Maybe ES.RefetchInterval)
parseMxRefetchInt =
optional $
option
@ -1128,7 +1128,7 @@ parseMxRefetchInt =
<> help (snd mxRefetchDelayEnv)
)
parseMxBatchSize :: Parser (Maybe LQ.BatchSize)
parseMxBatchSize :: Parser (Maybe ES.BatchSize)
parseMxBatchSize =
optional $
option

View File

@ -53,7 +53,7 @@ import Data.Text qualified as T
import Data.Time
import Data.URL.Template
import Database.PG.Query qualified as Q
import Hasura.GraphQL.Execute.LiveQuery.Options qualified as LQ
import Hasura.GraphQL.Execute.Subscription.Options qualified as ES
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.Types
@ -161,8 +161,8 @@ data RawServeOptions impl = RawServeOptions
rsoStringifyNum :: Bool,
rsoDangerousBooleanCollapse :: Maybe Bool,
rsoEnabledAPIs :: Maybe [API],
rsoMxRefetchInt :: Maybe LQ.RefetchInterval,
rsoMxBatchSize :: Maybe LQ.BatchSize,
rsoMxRefetchInt :: Maybe ES.RefetchInterval,
rsoMxBatchSize :: Maybe ES.BatchSize,
rsoEnableAllowlist :: Bool,
rsoEnabledLogTypes :: Maybe [L.EngineLogType impl],
rsoLogLevel :: Maybe L.LogLevel,
@ -232,7 +232,7 @@ data ServeOptions impl = ServeOptions
soStringifyNum :: StringifyNumbers,
soDangerousBooleanCollapse :: Bool,
soEnabledAPIs :: Set.HashSet API,
soLiveQueryOpts :: LQ.LiveQueriesOptions,
soLiveQueryOpts :: ES.LiveQueriesOptions,
soEnableAllowlist :: Bool,
soEnabledLogTypes :: Set.HashSet (L.EngineLogType impl),
soLogLevel :: L.LogLevel,
@ -429,15 +429,15 @@ instance FromEnv [API] where
instance FromEnv [ExperimentalFeature] where
fromEnv = readExperimentalFeatures
instance FromEnv LQ.BatchSize where
instance FromEnv ES.BatchSize where
fromEnv s = do
val <- readEither s
maybe (Left "batch size should be a non negative integer") Right $ LQ.mkBatchSize val
maybe (Left "batch size should be a non negative integer") Right $ ES.mkBatchSize val
instance FromEnv LQ.RefetchInterval where
instance FromEnv ES.RefetchInterval where
fromEnv x = do
val <- fmap (milliseconds . fromInteger) . readEither $ x
maybe (Left "refetch interval should be a non negative integer") Right $ LQ.mkRefetchInterval val
maybe (Left "refetch interval should be a non negative integer") Right $ ES.mkRefetchInterval val
instance FromEnv Milliseconds where
fromEnv = fmap fromInteger . readEither

View File

@ -43,7 +43,7 @@ import Data.HashSet qualified as Set
import Data.Word (Word16)
import Database.MySQL.Simple qualified as Mysql
import Database.PG.Query qualified as Q
import Hasura.GraphQL.Execute.LiveQuery.Options qualified as LQ
import Hasura.GraphQL.Execute.Subscription.Options qualified as ES
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.Types
@ -234,7 +234,7 @@ serveOptions =
soStringifyNum = StringifyNumbers,
soDangerousBooleanCollapse = False,
soEnabledAPIs = testSuiteEnabledApis,
soLiveQueryOpts = LQ.mkLiveQueriesOptions Nothing Nothing,
soLiveQueryOpts = ES.mkSubscriptionsOptions Nothing Nothing,
soEnableAllowlist = False,
soEnabledLogTypes = Set.fromList L.userAllowedLogTypes,
soLogLevel = fromMaybe (L.LevelOther "test-suite") engineLogLevel,