mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-17 20:41:49 +03:00
b167120f96
We'll see if this improves compile times at all, but I think it's worth doing as at least the most minimal form of module documentation. This was accomplished by first compiling everything with -ddump-minimal-imports, and then a bunch of scripting (with help from ormolu) **EDIT** it doesn't seem to improve CI compile times but the noise floor is high as it looks like we're not caching library dependencies anymore PR-URL: https://github.com/hasura/graphql-engine-mono/pull/2730 GitOrigin-RevId: 667eb8de1e0f1af70420cbec90402922b8b84cb4
85 lines
4.1 KiB
Haskell
85 lines
4.1 KiB
Haskell
-- | Module related to async action query subscriptions
|
|
module Hasura.GraphQL.Execute.Action.Subscription
|
|
( asyncActionSubscriptionsProcessor,
|
|
)
|
|
where
|
|
|
|
import Control.Concurrent.Extended qualified as C
|
|
import Control.Concurrent.STM qualified as STM
|
|
import Data.List.NonEmpty qualified as NE
|
|
import Hasura.GraphQL.Execute.Action
|
|
import Hasura.GraphQL.Execute.LiveQuery.State
|
|
import Hasura.GraphQL.Execute.LiveQuery.TMap qualified as TMap
|
|
import Hasura.Metadata.Class
|
|
import Hasura.Prelude
|
|
|
|
{- Note [Async action subscriptions]
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
We have two kinds of async action query execution based on the defined relationships
|
|
on output object type. See Note [Resolving async action query]. Actions with table
|
|
relationships on output object type have to a generate SQL select query with the action
|
|
webhook response embedded in so as to execute on the source database that emits client
|
|
response with joined relationship rows (see Note [Resolving async action query]).
|
|
We can multiplex this queries and run a live subscription just like our usual database
|
|
queries. But the action log from the metadata storage is subjected to change after
|
|
calling action webhook in two ways,
|
|
1. When the webhook returns successful response, 'response_payload' column value isn't null
|
|
2. When any exception occurs in the process, 'errors' column value isn't null
|
|
|
|
So, we have a background thread (@'asyncActionSubscriptionsProcessor') which
|
|
constantly fetches the action log response from the metadata storage and restarts the
|
|
live query to reflect those changes in subscriptions.
|
|
|
|
In case of action queries without relationships, there are no SQL queries associated with.
|
|
We can't multiplex them. The thread, @'asyncActionSubscriptionsProcessor' also handles these
|
|
action subscriptions by sending the responses to the websocket client after fetching the
|
|
action log response.
|
|
|
|
We can't support multiple async query fields of different kinds in a single subscription.
|
|
-}
|
|
|
|
-- | A forever running thread which processes async action subscriptions.
|
|
-- See Note [Async action subscriptions]
|
|
asyncActionSubscriptionsProcessor ::
|
|
( MonadIO m,
|
|
MonadMetadataStorage (MetadataStorageT m)
|
|
) =>
|
|
AsyncActionSubscriptionState ->
|
|
m void
|
|
asyncActionSubscriptionsProcessor subsState = forever do
|
|
-- Collect all active async query subscription operations
|
|
opList <- liftIO $ STM.atomically do
|
|
l <- TMap.toList subsState
|
|
onNothing (NE.nonEmpty l) STM.retry -- Continue only if there are any active subscription operations
|
|
for_ opList $ \(opId, AsyncActionQueryLive actionIds onError op) -> do
|
|
-- Fetch action webhook responses from the metadata storage
|
|
actionLogMapE <- runExceptT $ fetchActionLogResponses $ toList actionIds
|
|
liftIO $ case actionLogMapE of
|
|
Left err -> do
|
|
onError err
|
|
removeAsyncActionLiveQuery subsState opId
|
|
Right (actionLogMap, actionsComplete) ->
|
|
case op of
|
|
LAAQNoRelationships (LiveAsyncActionQueryWithNoRelationships sendResp sendCompleted) -> do
|
|
sendResp actionLogMap
|
|
when actionsComplete $ do
|
|
sendCompleted
|
|
removeAsyncActionLiveQuery subsState opId
|
|
LAAQOnSourceDB (LiveAsyncActionQueryOnSource currLqId prevLogMap lqRestarter) -> do
|
|
-- Actions webhook responses aren't updated in metadata storage, hence no need to restart the live query
|
|
unless (prevLogMap == actionLogMap) $ do
|
|
maybeNewLqId <- lqRestarter currLqId actionLogMap
|
|
if actionsComplete
|
|
then removeAsyncActionLiveQuery subsState opId
|
|
else do
|
|
case maybeNewLqId of
|
|
Nothing ->
|
|
-- Happens only when restarting a live query fails.
|
|
-- There's no point in holding the operation in the state.
|
|
removeAsyncActionLiveQuery subsState opId
|
|
Just newLqId ->
|
|
addAsyncActionLiveQuery subsState opId actionIds onError $
|
|
LAAQOnSourceDB $ LiveAsyncActionQueryOnSource newLqId actionLogMap lqRestarter
|
|
-- Sleep for a second
|
|
liftIO $ C.sleep $ seconds 1
|