2022-03-21 13:39:49 +03:00
-- | Multiplexed subscription poller threads; see "Hasura.GraphQL.Execute.Subscription" for details.
module Hasura.GraphQL.Execute.Subscription.Poll.Common
2021-09-24 01:56:37 +03:00
( -- * Pollers
Poller ( .. ) ,
PollerId ( .. ) ,
PollerIOState ( .. ) ,
PollerKey ( .. ) ,
PollerMap ,
dumpPollerMap ,
PollDetails ( .. ) ,
BatchExecutionDetails ( .. ) ,
CohortExecutionDetails ( .. ) ,
2022-03-21 13:39:49 +03:00
SubscriptionPostPollHook ,
defaultSubscriptionPostPollHook ,
2021-09-24 01:56:37 +03:00
-- * Cohorts
Cohort ( .. ) ,
2022-03-21 13:39:49 +03:00
CohortSnapshot ( .. ) ,
2022-04-07 17:41:43 +03:00
CursorVariableValues ( .. ) ,
2021-09-24 01:56:37 +03:00
CohortId ,
newCohortId ,
CohortVariables ,
CohortKey ,
CohortMap ,
-- * Subscribers
Subscriber ( .. ) ,
SubscriberId ,
newSubscriberId ,
SubscriberMetadata ,
mkSubscriberMetadata ,
unSubscriberMetadata ,
SubscriberMap ,
OnChange ,
2022-03-21 13:39:49 +03:00
SubscriptionGQResponse ,
SubscriptionResponse ( .. ) ,
SubscriptionMetadata ( .. ) ,
2021-09-24 01:56:37 +03:00
SubscriberExecutionDetails ( .. ) ,
-- * Batch
BatchId ( .. ) ,
2022-03-21 13:39:49 +03:00
-- * Hash
ResponseHash ( .. ) ,
mkRespHash ,
2021-09-24 01:56:37 +03:00
)
where
import Control.Concurrent.STM qualified as STM
import Control.Immortal qualified as Immortal
import Crypto.Hash qualified as CH
2022-03-21 13:39:49 +03:00
import Data.Aeson qualified as J
2021-09-24 01:56:37 +03:00
import Data.ByteString qualified as BS
import Data.Time.Clock qualified as Clock
import Data.UUID qualified as UUID
import Data.UUID.V4 qualified as UUID
2022-03-21 13:39:49 +03:00
import Hasura.GraphQL.Execute.Subscription.Options
import Hasura.GraphQL.Execute.Subscription.Plan
import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap
2021-09-24 01:56:37 +03:00
import Hasura.GraphQL.ParameterizedQueryHash ( ParameterizedQueryHash )
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.WebSocket.Protocol ( OperationId )
import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS
import Hasura.Logging qualified as L
import Hasura.Prelude
2022-03-21 13:39:49 +03:00
import Hasura.RQL.Types.Common ( SourceName )
2021-09-24 01:56:37 +03:00
import Hasura.Server.Types ( RequestId )
import Hasura.Session
import ListT qualified
import StmContainers.Map qualified as STMMap
2019-08-28 15:19:21 +03:00
2020-06-04 20:25:21 +03:00
-- ----------------------------------------------------------------------------------------------
2019-08-28 15:19:21 +03:00
-- Subscribers
2021-09-24 01:56:37 +03:00
newtype SubscriberId = SubscriberId { unSubscriberId :: UUID . UUID }
2020-06-04 20:25:21 +03:00
deriving ( Show , Eq , Generic , Hashable , J . ToJSON )
newSubscriberId :: IO SubscriberId
newSubscriberId =
SubscriberId <$> UUID . nextRandom
-- | Allows a user of the live query subsystem (currently websocket transport)
-- to attach arbitrary metadata about a subscriber. This information is available
-- as part of Subscriber in CohortExecutionDetails and can be logged by customizing
-- in pollerlog
2021-09-24 01:56:37 +03:00
newtype SubscriberMetadata = SubscriberMetadata { unSubscriberMetadata :: J . Value }
2020-06-04 20:25:21 +03:00
deriving ( Show , Eq , J . ToJSON )
2021-06-16 16:27:26 +03:00
mkSubscriberMetadata :: WS . WSId -> OperationId -> Maybe OperationName -> RequestId -> SubscriberMetadata
mkSubscriberMetadata websocketId operationId operationName reqId =
2021-09-24 01:56:37 +03:00
SubscriberMetadata $
J . object
[ " websocket_id " J ..= websocketId ,
" operation_id " J ..= operationId ,
" operation_name " J ..= operationName ,
" request_id " J ..= reqId
]
2020-06-04 20:25:21 +03:00
2021-09-24 01:56:37 +03:00
data Subscriber = Subscriber
{ _sId :: ! SubscriberId ,
_sMetadata :: ! SubscriberMetadata ,
_sRequestId :: ! RequestId ,
_sOperationName :: ! ( Maybe OperationName ) ,
_sOnChangeCallback :: ! OnChange
2020-06-04 20:25:21 +03:00
}
2019-08-28 15:19:21 +03:00
2022-03-21 13:39:49 +03:00
-- | Subscription onChange metadata, used for adding more extra analytics data
data SubscriptionMetadata = SubscriptionMetadata
{ _sqmExecutionTime :: ! Clock . DiffTime
2020-01-07 23:25:32 +03:00
}
2022-03-21 13:39:49 +03:00
data SubscriptionResponse = SubscriptionResponse
2021-09-24 01:56:37 +03:00
{ _lqrPayload :: ! BS . ByteString ,
_lqrExecutionTime :: ! Clock . DiffTime
2020-01-07 23:25:32 +03:00
}
2022-03-21 13:39:49 +03:00
type SubscriptionGQResponse = GQResult SubscriptionResponse
2021-09-24 01:56:37 +03:00
2022-03-21 13:39:49 +03:00
type OnChange = SubscriptionGQResponse -> IO ()
2019-08-28 15:19:21 +03:00
type SubscriberMap = TMap . TMap SubscriberId Subscriber
-- -------------------------------------------------------------------------------------------------
-- Cohorts
-- | A batched group of 'Subscriber's who are not only listening to the same query but also have
-- identical session and query variables. Each result pushed to a 'Cohort' is forwarded along to
-- each of its 'Subscriber's.
--
-- In SQL, each 'Cohort' corresponds to a single row in the laterally-joined @_subs@ table (and
-- therefore a single row in the query result).
2020-03-05 20:59:26 +03:00
--
-- See also 'CohortMap'.
2022-04-07 17:41:43 +03:00
data Cohort streamCursorVars = Cohort
2021-09-24 01:56:37 +03:00
{ -- | a unique identifier used to identify the cohort in the generated query
_cCohortId :: ! CohortId ,
-- | a hash of the previous query result, if any, used to determine if we need to push an updated
-- result to the subscribers or not
_cPreviousResponse :: ! ( STM . TVar ( Maybe ResponseHash ) ) ,
-- | the subscribers we’ ve already pushed a result to; we push new results to them iff the
-- response changes
_cExistingSubscribers :: ! SubscriberMap ,
-- | subscribers we haven’ t yet pushed any results to; we push results to them regardless if the
-- result changed, then merge them in the map of existing subscribers
2022-04-07 17:41:43 +03:00
_cNewSubscribers :: ! SubscriberMap ,
-- | a mutable type which holds the latest value of the subscription stream cursor. In case
-- of live query subscription, this field is ignored by setting `streamCursorVars` to `()`
_cStreamCursorVariables :: ! streamCursorVars
2019-08-28 15:19:21 +03:00
}
2021-06-16 16:27:26 +03:00
-- | The @BatchId@ is a number based ID to uniquely identify a batch in a single poll and
-- it's used to identify the batch to which a cohort belongs to.
2021-09-24 01:56:37 +03:00
newtype BatchId = BatchId { _unBatchId :: Int }
2021-06-16 16:27:26 +03:00
deriving ( Show , Eq , J . ToJSON )
2019-12-10 02:23:06 +03:00
{- Note [Blake2b faster than SHA - 256]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
At the time of writing , from https :// blake2 . net , it is stated ,
" BLAKE2 is a cryptographic hash function faster than MD5, SHA-1, SHA-2, and SHA-3,
yet is at least as secure as the latest standard SHA - 3 " .
- }
2019-08-28 15:19:21 +03:00
-- | A hash used to determine if the result changed without having to keep the entire result in
-- memory. Using a cryptographic hash ensures that a hash collision is almost impossible: with 256
-- bits, even if a subscription changes once per second for an entire year, the probability of a
2019-12-10 02:23:06 +03:00
-- hash collision is ~4.294417× 10-63. See Note [Blake2b faster than SHA-256].
2021-09-24 01:56:37 +03:00
newtype ResponseHash = ResponseHash { unResponseHash :: CH . Digest CH . Blake2b_256 }
2019-08-28 15:19:21 +03:00
deriving ( Show , Eq )
instance J . ToJSON ResponseHash where
toJSON = J . toJSON . show . unResponseHash
mkRespHash :: BS . ByteString -> ResponseHash
mkRespHash = ResponseHash . CH . hash
2020-06-04 20:25:21 +03:00
-- | A key we use to determine if two 'Subscriber's belong in the same 'Cohort'
-- (assuming they already meet the criteria to be in the same 'Poller'). Note
-- the distinction between this and 'CohortId'; the latter is a completely
-- synthetic key used only to identify the cohort in the generated SQL query.
2019-08-28 15:19:21 +03:00
type CohortKey = CohortVariables
2021-09-24 01:56:37 +03:00
2020-06-04 20:25:21 +03:00
-- | This has the invariant, maintained in 'removeLiveQuery', that it contains
-- no 'Cohort' with zero total (existing + new) subscribers.
2022-04-07 17:41:43 +03:00
type CohortMap streamCursor = TMap . TMap CohortKey ( Cohort streamCursor )
2019-08-28 15:19:21 +03:00
2022-04-07 17:41:43 +03:00
dumpCohortMap :: CohortMap streamCursor -> IO J . Value
2019-08-28 15:19:21 +03:00
dumpCohortMap cohortMap = do
cohorts <- STM . atomically $ TMap . toList cohortMap
2019-09-30 22:50:57 +03:00
fmap J . toJSON . forM cohorts $ \ ( variableValues , cohort ) -> do
2019-08-28 15:19:21 +03:00
cohortJ <- dumpCohort cohort
2021-09-24 01:56:37 +03:00
return $
J . object
[ " variables " J ..= variableValues ,
" cohort " J ..= cohortJ
]
2019-08-28 15:19:21 +03:00
where
2022-04-07 17:41:43 +03:00
dumpCohort ( Cohort respId respTV curOps newOps _ ) =
2019-08-28 15:19:21 +03:00
STM . atomically $ do
2021-09-24 01:56:37 +03:00
prevResHash <- STM . readTVar respTV
curOpIds <- TMap . toList curOps
newOpIds <- TMap . toList newOps
return $
J . object
[ " resp_id " J ..= respId ,
" current_ops " J ..= map fst curOpIds ,
" new_ops " J ..= map fst newOpIds ,
" previous_result_hash " J ..= prevResHash
]
2019-08-28 15:19:21 +03:00
2021-09-24 01:56:37 +03:00
data CohortSnapshot = CohortSnapshot
{ _csVariables :: ! CohortVariables ,
_csPreviousResponse :: ! ( STM . TVar ( Maybe ResponseHash ) ) ,
_csExistingSubscribers :: ! [ Subscriber ] ,
_csNewSubscribers :: ! [ Subscriber ]
2019-08-28 15:19:21 +03:00
}
2020-06-04 20:25:21 +03:00
-- -----------------------------------------------------------------------------
2019-08-28 15:19:21 +03:00
-- Pollers
2020-06-04 20:25:21 +03:00
-- | A unique, multiplexed query. Each 'Poller' has its own polling thread that
-- periodically polls Postgres and pushes results to each of its listening
-- 'Cohort's.
2019-08-28 15:19:21 +03:00
--
2020-06-04 20:25:21 +03:00
-- In SQL, an 'Poller' corresponds to a single, multiplexed query, though in
-- practice, 'Poller's with large numbers of 'Cohort's are batched into
-- multiple concurrent queries for performance reasons.
2022-04-07 17:41:43 +03:00
data Poller streamCursor = Poller
{ _pCohorts :: ! ( CohortMap streamCursor ) ,
2021-09-24 01:56:37 +03:00
-- | This is in a separate 'STM.TMVar' because it’ s important that we are
-- able to construct 'Poller' values in 'STM.STM' --- we need the insertion
-- into the 'PollerMap' to be atomic to ensure that we don’ t accidentally
-- create two for the same query due to a race. However, we can’ t spawn the
-- worker thread or create the metrics store in 'STM.STM', so we insert it
-- into the 'Poller' only after we’ re certain we won’ t create any duplicates.
--
-- This var is "write once", moving monotonically from empty to full.
-- TODO this could probably be tightened up to something like
-- 'STM PollerIOState'
_pIOState :: ! ( STM . TMVar PollerIOState )
2019-08-28 15:19:21 +03:00
}
2020-06-04 20:25:21 +03:00
2021-09-24 01:56:37 +03:00
data PollerIOState = PollerIOState
{ -- | a handle on the poller’ s worker thread that can be used to
-- 'Immortal.stop' it if all its cohorts stop listening
_pThread :: ! Immortal . Thread ,
_pId :: ! PollerId
2019-08-28 15:19:21 +03:00
}
2021-09-24 01:56:37 +03:00
data PollerKey =
2020-06-04 20:25:21 +03:00
-- we don't need operation name here as a subscription will only have a
-- single top level field
2021-09-24 01:56:37 +03:00
PollerKey
{ _lgSource :: ! SourceName ,
_lgRole :: ! RoleName ,
_lgQuery :: ! Text
}
deriving ( Show , Eq , Generic )
2019-08-28 15:19:21 +03:00
instance Hashable PollerKey
instance J . ToJSON PollerKey where
2021-02-20 16:45:49 +03:00
toJSON ( PollerKey source role query ) =
2021-09-24 01:56:37 +03:00
J . object
[ " source " J ..= source ,
" role " J ..= role ,
" query " J ..= query
]
2019-08-28 15:19:21 +03:00
2022-04-07 17:41:43 +03:00
type PollerMap streamCursor = STMMap . Map PollerKey ( Poller streamCursor )
2019-08-28 15:19:21 +03:00
2022-04-07 17:41:43 +03:00
dumpPollerMap :: Bool -> PollerMap streamCursor -> IO J . Value
dumpPollerMap extended pollerMap =
2019-08-28 15:19:21 +03:00
fmap J . toJSON $ do
2022-04-07 17:41:43 +03:00
entries <- STM . atomically $ ListT . toList $ STMMap . listT pollerMap
2021-02-20 16:45:49 +03:00
forM entries $ \ ( PollerKey source role query , Poller cohortsMap ioState ) -> do
2020-06-04 20:25:21 +03:00
PollerIOState threadId pollerId <- STM . atomically $ STM . readTMVar ioState
2019-08-28 15:19:21 +03:00
cohortsJ <-
if extended
2021-09-24 01:56:37 +03:00
then Just <$> dumpCohortMap cohortsMap
else return Nothing
return $
J . object
[ " source " J ..= source ,
" role " J ..= role ,
" thread_id " J ..= show ( Immortal . threadId threadId ) ,
" poller_id " J ..= pollerId ,
" multiplexed_query " J ..= query ,
" cohorts " J ..= cohortsJ
]
2019-08-28 15:19:21 +03:00
2020-06-04 20:25:21 +03:00
-- | An ID to track unique 'Poller's, so that we can gather metrics about each
-- poller
2021-09-24 01:56:37 +03:00
newtype PollerId = PollerId { unPollerId :: UUID . UUID }
2020-06-04 20:25:21 +03:00
deriving ( Show , Eq , Generic , J . ToJSON )
2021-09-24 01:56:37 +03:00
data SubscriberExecutionDetails = SubscriberExecutionDetails
{ _sedSubscriberId :: ! SubscriberId ,
_sedSubscriberMetadata :: ! SubscriberMetadata
}
deriving ( Show , Eq )
2021-06-16 16:27:26 +03:00
2020-06-04 20:25:21 +03:00
-- | Execution information related to a cohort on a poll cycle
2021-09-24 01:56:37 +03:00
data CohortExecutionDetails = CohortExecutionDetails
{ _cedCohortId :: ! CohortId ,
_cedVariables :: ! CohortVariables ,
-- | Nothing in case of an error
_cedResponseSize :: ! ( Maybe Int ) ,
-- | The response on this cycle has been pushed to these above subscribers
-- New subscribers (those which haven't been around during the previous poll
-- cycle) will always be part of this
_cedPushedTo :: ! [ SubscriberExecutionDetails ] ,
-- | The response on this cycle has *not* been pushed to these above
-- subscribers. This would when the response hasn't changed from the previous
-- polled cycle
_cedIgnored :: ! [ SubscriberExecutionDetails ] ,
_cedBatchId :: ! BatchId
}
deriving ( Show , Eq )
2020-06-04 20:25:21 +03:00
-- | Execution information related to a single batched execution
2021-09-24 01:56:37 +03:00
data BatchExecutionDetails = BatchExecutionDetails
{ -- | postgres execution time of each batch
_bedPgExecutionTime :: ! Clock . DiffTime ,
-- | time to taken to push to all cohorts belonging to this batch
_bedPushTime :: ! Clock . DiffTime ,
-- | id of the batch
_bedBatchId :: ! BatchId ,
-- | execution details of the cohorts belonging to this batch
_bedCohorts :: ! [ CohortExecutionDetails ] ,
_bedBatchResponseSizeBytes :: ! ( Maybe Int )
}
deriving ( Show , Eq )
2020-06-04 20:25:21 +03:00
-- | see Note [Minimal LiveQuery Poller Log]
batchExecutionDetailMinimal :: BatchExecutionDetails -> J . Value
2021-09-24 01:56:37 +03:00
batchExecutionDetailMinimal BatchExecutionDetails { .. } =
2021-06-16 16:27:26 +03:00
let batchRespSize =
2021-09-24 01:56:37 +03:00
maybe
mempty
( \ respSize -> [ " batch_response_size_bytes " J ..= respSize ] )
_bedBatchResponseSizeBytes
in J . object
( [ " pg_execution_time " J ..= _bedPgExecutionTime ,
" push_time " J ..= _bedPushTime
]
2021-06-16 16:27:26 +03:00
-- log batch resp size only when there are no errors
2021-09-24 01:56:37 +03:00
<> batchRespSize
)
2022-03-21 13:39:49 +03:00
-- TODO consider refactoring into two types: one that is returned from pollLiveQuery and pollStreamingQuery, and a parent type containing pollerId, sourceName, and so on, which is assembled at the callsites of those two functions. Move postPollHook out of those functions to callsites
2021-09-24 01:56:37 +03:00
data PollDetails = PollDetails
{ -- | the unique ID (basically a thread that run as a 'Poller') for the
-- 'Poller'
_pdPollerId :: ! PollerId ,
-- | the multiplexed SQL query to be run against the database with all the
-- variables together
_pdGeneratedSql :: ! Text ,
2022-03-21 13:39:49 +03:00
-- | the time taken to get a snapshot of cohorts from our 'SubscriptionsState'
2021-09-24 01:56:37 +03:00
-- data structure
_pdSnapshotTime :: ! Clock . DiffTime ,
-- | list of execution batches and their details
_pdBatches :: ! [ BatchExecutionDetails ] ,
-- | total time spent on a poll cycle
_pdTotalTime :: ! Clock . DiffTime ,
2022-03-21 13:39:49 +03:00
_pdLiveQueryOptions :: ! SubscriptionsOptions ,
2021-09-24 01:56:37 +03:00
_pdSource :: ! SourceName ,
_pdRole :: ! RoleName ,
_pdParameterizedQueryHash :: ! ParameterizedQueryHash
}
deriving ( Show , Eq )
2020-06-04 20:25:21 +03:00
{- Note [Minimal LiveQuery Poller Log]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
We only want to log the minimal information in the livequery - poller - log as it
could be expensive to log the details of every subscriber ( all poller related
information can always be retrieved by dumping the current live queries state )
We capture a lot more details in PollDetails and BatchExecutionDetails than
that is logged currently as other implementations such as pro can use them if
they need to .
- }
-- | see Note [Minimal LiveQuery Poller Log]
pollDetailMinimal :: PollDetails -> J . Value
2021-09-24 01:56:37 +03:00
pollDetailMinimal PollDetails { .. } =
J . object
[ " poller_id " J ..= _pdPollerId ,
" snapshot_time " J ..= _pdSnapshotTime ,
" batches " J ..= map batchExecutionDetailMinimal _pdBatches ,
" total_time " J ..= _pdTotalTime ,
" source " J ..= _pdSource ,
" role " J ..= _pdRole
]
2020-06-04 20:25:21 +03:00
instance L . ToEngineLog PollDetails L . Hasura where
toEngineLog pl = ( L . LevelInfo , L . ELTLivequeryPollerLog , pollDetailMinimal pl )
2022-03-21 13:39:49 +03:00
type SubscriptionPostPollHook = PollDetails -> IO ()
-- the default SubscriptionPostPollHook
defaultSubscriptionPostPollHook :: L . Logger L . Hasura -> SubscriptionPostPollHook
2022-06-26 01:08:01 +03:00
defaultSubscriptionPostPollHook = \ x -> L . unLogger x