{-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE TemplateHaskell #-} module Hasura.GraphQL.Execute.Subscription.Poll.StreamingQuery ( -- * Pollers pollStreamingQuery, ) where import Control.Concurrent.Async qualified as A import Control.Concurrent.STM qualified as STM import Control.Lens import Data.ByteString qualified as BS import Data.HashMap.Strict qualified as Map import Data.HashMap.Strict.Extended qualified as Map import Data.HashSet qualified as Set import Data.List.Split (chunksOf) import Data.Monoid (Sum (..)) import Data.Text.Extended import GHC.AssertNF.CPP import Hasura.Base.Error import Hasura.GraphQL.Execute.Backend import Hasura.GraphQL.Execute.Subscription.Options import Hasura.GraphQL.Execute.Subscription.Plan import Hasura.GraphQL.Execute.Subscription.Plan qualified as C import Hasura.GraphQL.Execute.Subscription.Poll.Common hiding (Cohort (..), CohortMap, CohortSnapshot (..)) import Hasura.GraphQL.Execute.Subscription.Poll.Common qualified as C import Hasura.GraphQL.Execute.Subscription.TMap qualified as TMap import Hasura.GraphQL.Execute.Subscription.Types import Hasura.GraphQL.ParameterizedQueryHash (ParameterizedQueryHash) import Hasura.GraphQL.Transport.Backend import Hasura.GraphQL.Transport.HTTP.Protocol import Hasura.Prelude import Hasura.RQL.Types.Backend import Hasura.RQL.Types.Common (SourceName) import Hasura.RQL.Types.Subscription (SubscriptionType (..)) import Hasura.SQL.Value (TxtEncodedVal (..)) import Hasura.Session import Language.GraphQL.Draft.Syntax qualified as G import Refined (unrefine) import Text.Shakespeare.Text (st) {- Note [Streaming subscriptions rebuilding cohort map] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ When the multiplexed query is polled, the cohort is snapshotted to get the existing and newly added subscribers (if any), let's call these subscribers as current poll subscribers. Every cohort is associated with a cohort key and the cohort key is basically the variables the cohort uses. The cohort variables contains multiple types of variables in it, session variables, query variables, synthetic variables and cursor variables, out of all these, the cursor variable may change with every poll. So, we rebuild the cohort map at the end of every poll, see Note [Streaming subscription polling]. But, rebuilding the cohort map is not straight forward because there are race conditions that need to be taken care of. Some of the race conditions which can happen when the current cohorts are processing: 1. A cohort is removed concurrently 2. A subscriber is removed concurrently 3. A new subscriber has started a subscription and it should be placed in the correct cohort In the current code, the rebuilding of the cohort map goes as the follows: 1. After snapshotting the cohorts, we build a cohort map out of those cohorts, in the code these are called as "processedCohorts", it's important to note that these are not retrieved from the mutable "CohortMap", these are the snapshotted cohorts which were processed in the current poll. The reason we maintain the snapshotted cohorts is because it is later used while rebuilding the cohort map. 2. We create a processed cohort map which looks like HashMap CohortKey (Cohort 'Streaming, CohortKey). The key of the map is the CohortKey which was associated with the cohort during the poll and the CohortKey in the value type is the cohort key after updating the cursor value. Note that both the values may or may not be equal. 3. We atomically get the list of the cohorts from the cohort map (mutable reference), let's call it current cohorts and then traverse over it. 1. Lookup with the given cohort key into the processed cohort map a. If no cohort is found, it means that the cohort with the given cohort key has been added while we were polling. So, we keep this as it is. b. If a processed cohort is found: i. We have to see if any new subscribers have been added to the current cohort, this is calculated using the diff of existing subscribers in the current cohort and the existing cohort, if there are any then we create a new cohort which includes only the newly added subscribers and add the new cohort into the cohort map. ii. We only retain the subscribers found in the processed cohort which exist in the current cohort. We do this because it is possible that a subscriber has been stopped their subscription in the time between the cohorts were snapshotted for processing and the time the cohort map is rebuilt. iii. Insert the processed cohort with the updated cohort key into the cohort map. -} {- Note [Streaming subscription polling] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Every cohort of a streaming subscription is associated with a mutable latest cursor value reference, which contains the current value of the cursor. After a poll, the mutable cursor value gets updated if its value is a non-null one, null value means that there were no rows to get the min/max value from. After this, the cohort map associated with the poller is also rebuilt, which will make sure that the cohort map's key contains the latest cursor values. So, any new subscriber will get added to an existing cohort if the new subscriber's cohort key matches with any of the existing cohorts. We *need* to rebuild the cohort map, because, say if we don't rebuild the cohort map then a new subscriber may get added to a cohort which has been streaming for a while now, then the new subscriber will get the responses according to the cursor value stored in the cohort, instead of the initial value specified by the client. For example: Client 1 started streaming a query at t1, say: ``` subscription { log_stream(cursor: {initial_value: {created_at: "2020-01-01"}}, batch_size: 1000) { id level detail } } ``` Client 2 starts streaming at t2 with the same query (where t2 > t1), if the cohort map is not rebuilt (to reflect cursor value in the cohort key), then client 2 and client 1 will both start getting the same responses, which is wrong because client 2 should start streaming from the initial value it provided. -} {- Note [Lifecycle of a streaming subscription poller] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +-------------------------------------------+ +-----------------------------------------------------------------+ | Execute multiplexed query in the database | -------> | Parse the response, every row of response contains three things:| +-------------------------------------------+ | 1. Cohort ID | ^ | 2. Cohort's Response | | | 3. Cohort's latest cursor value | | +-----------------------------------------------------------------+ | | | | | v | +------------------------------------------------------------------------+ | | Processing of the response: | | | 1. Send the result to the subscribers | +------------------------------------+ | 2. Update the cursor value in the mutable reference | | Rebuild the cohort map | | of the snapshot so that the next poll uses this value | +------------------------------------+ <------ +------------------------------------------------------------------------+ -} mergeOldAndNewCursorValues :: CursorVariableValues -> CursorVariableValues -> CursorVariableValues mergeOldAndNewCursorValues (CursorVariableValues prevPollCursorValues) (CursorVariableValues currentPollCursorValues) = let combineFn previousVal currentVal = case currentVal of TENull -> previousVal -- When we get a null value from the DB, we retain the previous value TELit t -> TELit t in CursorVariableValues $ Map.unionWith combineFn prevPollCursorValues currentPollCursorValues pushResultToCohort :: GQResult BS.ByteString -> Maybe ResponseHash -> SubscriptionMetadata -> CursorVariableValues -> -- | Root field name G.Name -> -- | subscribers to which data has been pushed, subscribers which already -- have this data (this information is exposed by metrics reporting) (CohortSnapshot 'Streaming, Cohort 'Streaming) -> IO ([SubscriberExecutionDetails], [SubscriberExecutionDetails]) pushResultToCohort result !respHashM (SubscriptionMetadata dTime) cursorValues rootFieldName (cohortSnapshot, cohort) = do prevRespHashM <- STM.readTVarIO respRef -- write to the current websockets if needed (subscribersToPush, subscribersToIgnore) <- -- We send the response to all the subscribers only when the response changes. if isExecError result || respHashM /= prevRespHashM then do $assertNFHere respHashM -- so we don't write thunks to mutable vars STM.atomically $ do STM.writeTVar respRef respHashM STM.writeTVar (C._cStreamCursorVariables cohort) cursorValues return (newSinks <> curSinks, mempty) else -- when the response is unchanged, the response is only sent to the newly added subscribers return (newSinks, curSinks) pushResultToSubscribers subscribersToPush pure $ over (each . each) ( \Subscriber {..} -> SubscriberExecutionDetails _sId _sMetadata ) (subscribersToPush, subscribersToIgnore) where rootFieldNameText = G.unName rootFieldName -- we want to know if the DB response is an empty array and if it is, then we don't send anything -- to the client. We could do this by parsing the response and check for an empty list, but -- this will have performance impact when large responses are parsed. Instead, we compare -- the DB response to a templated empty array response. -- We are using templating instead of doing something like -- J.encode $ J.object [ rootFieldNameText J..= [] :: [J.Value] ] -- is because, unfortunately, the value returned by the above is -- {:[]} (notice the lack of spaces). So, instead -- we're templating according to the format postgres sends JSON responses. emptyRespBS = Right $ txtToBs [st|{"#{rootFieldNameText}" : []}|] -- Same as the above, but cockroach prefixes the responses with @\x1@ and does not -- have a space between the key and the colon. roachEmptyRespBS = Right $ "\x1" <> txtToBs [st|{"#{rootFieldNameText}": []}|] isResponseEmpty = result == emptyRespBS || result == roachEmptyRespBS C.CohortSnapshot _ respRef curSinks newSinks = cohortSnapshot response = result <&> \payload -> SubscriptionResponse payload dTime pushResultToSubscribers subscribers = unless isResponseEmpty $ flip A.mapConcurrently_ subscribers $ \Subscriber {..} -> _sOnChangeCallback response -- | A single iteration of the streaming query polling loop. Invocations on the -- same mutable objects may race. pollStreamingQuery :: forall b. BackendTransport b => PollerId -> SubscriptionsOptions -> (SourceName, SourceConfig b) -> RoleName -> ParameterizedQueryHash -> MultiplexedQuery b -> CohortMap 'Streaming -> G.Name -> SubscriptionPostPollHook -> Maybe (IO ()) -> -- Optional IO action to make this function (pollStreamingQuery) testable IO () pollStreamingQuery pollerId lqOpts (sourceName, sourceConfig) roleName parameterizedQueryHash query cohortMap rootFieldName postPollHook testActionMaybe = do (totalTime, (snapshotTime, batchesDetailsAndProcessedCohorts)) <- withElapsedTime $ do -- snapshot the current cohorts and split them into batches -- This STM transaction is a read only transaction i.e. it doesn't mutate any state (snapshotTime, cohortBatches) <- withElapsedTime $ do -- get a snapshot of all the cohorts -- this need not be done in a transaction cohorts <- STM.atomically $ TMap.toList cohortMap cohortSnapshots <- mapM (STM.atomically . getCohortSnapshot) cohorts -- cohorts are broken down into batches specified by the batch size let cohortBatches = chunksOf (unrefine (unBatchSize batchSize)) cohortSnapshots -- associating every batch with their BatchId pure $ zip (BatchId <$> [1 ..]) cohortBatches for_ testActionMaybe id -- IO action intended to run after the cohorts have been snapshotted -- concurrently process each batch and also get the processed cohort with the new updated cohort key batchesDetailsAndProcessedCohorts <- A.forConcurrently cohortBatches $ \(batchId, cohorts) -> do (queryExecutionTime, mxRes) <- runDBStreamingSubscription @b sourceConfig query $ over (each . _2) C._csVariables $ fmap (fmap fst) cohorts let lqMeta = SubscriptionMetadata $ convertDuration queryExecutionTime operations = getCohortOperations cohorts mxRes -- batch response size is the sum of the response sizes of the cohorts batchResponseSize = case mxRes of Left _ -> Nothing Right resp -> Just $ getSum $ foldMap ((\(_, sqlResp, _) -> Sum . BS.length $ sqlResp)) resp (pushTime, cohortsExecutionDetails) <- withElapsedTime $ A.forConcurrently operations $ \(res, cohortId, respData, latestCursorValueMaybe, (snapshot, cohort)) -> do let latestCursorValue@(CursorVariableValues updatedCursorVarVal) = let prevCursorVariableValue = CursorVariableValues $ C._unValidatedVariables $ C._cvCursorVariables $ C._csVariables snapshot in case latestCursorValueMaybe of Nothing -> prevCursorVariableValue -- Nothing indicates there was an error when the query was polled Just currentPollCursorValue -> mergeOldAndNewCursorValues prevCursorVariableValue currentPollCursorValue (pushedSubscribers, ignoredSubscribers) <- -- Push the result to the subscribers present in the cohorts pushResultToCohort res (fst <$> respData) lqMeta latestCursorValue rootFieldName (snapshot, cohort) let currentCohortKey = C._csVariables snapshot updatedCohortKey = modifyCursorCohortVariables (mkUnsafeValidateVariables updatedCursorVarVal) $ C._csVariables snapshot snapshottedNewSubs = C._csNewSubscribers snapshot cohortExecutionDetails = CohortExecutionDetails { _cedCohortId = cohortId, _cedVariables = currentCohortKey, _cedPushedTo = pushedSubscribers, _cedIgnored = ignoredSubscribers, _cedResponseSize = snd <$> respData, _cedBatchId = batchId } pure (cohortExecutionDetails, (currentCohortKey, (cohort, updatedCohortKey, snapshottedNewSubs))) let processedCohortBatch = snd <$> cohortsExecutionDetails batchExecDetails = BatchExecutionDetails queryExecutionTime pushTime batchId (fst <$> cohortsExecutionDetails) batchResponseSize pure $ (batchExecDetails, processedCohortBatch) pure (snapshotTime, batchesDetailsAndProcessedCohorts) let pollDetails = PollDetails { _pdPollerId = pollerId, _pdGeneratedSql = toTxt query, _pdSnapshotTime = snapshotTime, _pdBatches = fst <$> batchesDetailsAndProcessedCohorts, _pdLiveQueryOptions = lqOpts, _pdTotalTime = totalTime, _pdSource = sourceName, _pdRole = roleName, _pdParameterizedQueryHash = parameterizedQueryHash } STM.atomically $ do -- constructing a cohort map for all the cohorts that have been -- processed in the current poll -- processed cohorts is an array of tuples of the current poll cohort variables and a tuple -- of the cohort and the new cohort key let processedCohortsMap = Map.fromList $ snd =<< batchesDetailsAndProcessedCohorts -- rebuilding the cohorts and the cohort map, see [Streaming subscription polling] -- and [Streaming subscriptions rebuilding cohort map] currentCohorts <- TMap.toList cohortMap updatedCohortsMap <- foldM ( \accCohortMap (currentCohortKey, currentCohort) -> do let processedCohortMaybe = Map.lookup currentCohortKey processedCohortsMap case processedCohortMaybe of -- A new cohort has been added in the cohort map whilst the -- current poll was happening, in this case we just return it -- as it is Nothing -> Map.insertWithM mergeCohorts currentCohortKey currentCohort accCohortMap Just (processedCohort, updatedCohortKey, snapshottedNewSubs) -> do updateCohortSubscribers currentCohort snapshottedNewSubs currentCohortExistingSubscribers <- TMap.toList $ C._cExistingSubscribers currentCohort newlyAddedSubscribers <- TMap.getMap $ C._cNewSubscribers currentCohort -- The newly added subscribers should not be added to the updated cohort, they should be added -- to the old cohort because they need to be procesed for the first time with their initial value. -- For example: let's take 2 cohorts, -- s means a subscriber -- C1 - [s1, s2] -- C2 -> [s3] -- and S4 is added to C1 and S5 is added to C2 during the poll. -- -- Let's say C1 is updated to C2 and C2 to C3, then -- the updated cohort map should look like: -- C1 -> [s4] -- C2 -> [s1, s2, s5] -- C3 -> [s3] -- -- Note that s4 and s5 have not been added to the updated cohort and instead -- are in the original cohort they were added in. -- all the existing subsribers are removed from the current cohort and -- the newly added subscribers are added back accCohortMapWithCurrentCohort <- if null newlyAddedSubscribers then pure accCohortMap else do -- Creating a new cohort which will only contain the newly added subscribers newCohort <- do existingSubs <- TMap.new newSubs <- TMap.new pure $ C.Cohort (C._cCohortId currentCohort) (C._cPreviousResponse currentCohort) existingSubs newSubs (C._cStreamCursorVariables currentCohort) TMap.replace (C._cNewSubscribers newCohort) newlyAddedSubscribers Map.insertWithM mergeCohorts currentCohortKey newCohort accCohortMap let allCurrentSubscribers = Set.fromList $ fst <$> (Map.toList newlyAddedSubscribers <> currentCohortExistingSubscribers) -- retain subscribers only if they still exist in the original cohort's subscriber. -- It may happen that a subscriber has stopped their subscription which means it will -- no longer exist in the cohort map, so we need to accordingly remove such subscribers -- from our processed cohorts. TMap.filterWithKey (\k _ -> k `elem` allCurrentSubscribers) $ C._cExistingSubscribers processedCohort TMap.filterWithKey (\k _ -> k `elem` allCurrentSubscribers) $ C._cNewSubscribers processedCohort Map.insertWithM mergeCohorts updatedCohortKey processedCohort accCohortMapWithCurrentCohort ) mempty currentCohorts TMap.replace cohortMap updatedCohortsMap postPollHook pollDetails where SubscriptionsOptions batchSize _ = lqOpts updateCohortSubscribers (C.Cohort _id _respRef curOpsTV newOpsTV _) snapshottedNewSubs = do allNewOpsL <- TMap.toList newOpsTV let snapshottedNewSubsSet = Set.fromList $ _sId <$> snapshottedNewSubs forM_ allNewOpsL $ \(subId, subscriber) -> when (subId `elem` snapshottedNewSubsSet) do -- we only add the snapshotted new subscribers to the current subscribers -- because they have been sent the first message of the subscription. The -- new subscribers apart from the snapshotted new subscribers are yet to -- recieve their first message so we just keep them as new subscribers TMap.insert subscriber subId curOpsTV TMap.delete subId newOpsTV getCohortSnapshot (cohortVars, cohort) = do let C.Cohort resId respRef curOpsTV newOpsTV _ = cohort curOpsL <- TMap.toList curOpsTV newOpsL <- TMap.toList newOpsTV let cohortSnapshot = C.CohortSnapshot cohortVars respRef (map snd curOpsL) (map snd newOpsL) return (resId, (cohortSnapshot, cohort)) mergeCohorts :: Cohort 'Streaming -> Cohort 'Streaming -> STM.STM (Cohort 'Streaming) mergeCohorts newCohort oldCohort = do let newCohortExistingSubscribers = C._cExistingSubscribers newCohort oldCohortExistingSubscribers = C._cExistingSubscribers oldCohort newCohortNewSubscribers = C._cNewSubscribers newCohort oldCohortNewSubscribers = C._cNewSubscribers oldCohort mergedExistingSubscribers <- TMap.union newCohortExistingSubscribers oldCohortExistingSubscribers mergedNewSubscribers <- TMap.union newCohortNewSubscribers oldCohortNewSubscribers pure $ newCohort { C._cNewSubscribers = mergedNewSubscribers, C._cExistingSubscribers = mergedExistingSubscribers } getCohortOperations cohorts = \case Left e -> let resp = throwError $ GQExecError [encodeGQLErr False e] in [(resp, cohortId, Nothing, Nothing, snapshot) | (cohortId, snapshot) <- cohorts] Right responses -> do let cohortSnapshotMap = Map.fromList cohorts -- every row of the response will contain the cohortId, response of the query and the latest value of the cursor for the cohort flip mapMaybe responses $ \(cohortId, respBS, respCursorLatestValue) -> let respHash = mkRespHash respBS respSize = BS.length respBS in -- TODO: currently we ignore the cases when the cohortId from -- Postgres response is not present in the cohort map of this batch -- (this shouldn't happen but if it happens it means a logic error and -- we should log it) (pure respBS,cohortId,Just (respHash, respSize),Just respCursorLatestValue,) <$> Map.lookup cohortId cohortSnapshotMap