server: add Env Variable to set the number of requests processed at a time in async actions

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/10326
Co-authored-by: Puru Gupta <32328846+purugupta99@users.noreply.github.com>
GitOrigin-RevId: d88ef6e2bb0b94f1cba9903cf7338ff2931d7ee2
This commit is contained in:
pranshi06 2023-09-26 11:52:31 +05:30 committed by hasura-bot
parent bf85fdfb2a
commit 33bafdf450
11 changed files with 59 additions and 16 deletions

View File

@ -131,6 +131,18 @@ If both Admin Secret and Admin Secrets are set, then only Admin Secrets will be
:::
### Async Actions Fetch Batch Size
The maximum number of async actions to be processed in a single batch.
| | |
| ------------------- | ----------------------------------------------- |
| **Flag** | `--async-actions-fetch-batch-size <BATCH_SIZE>` |
| **Env var** | `HASURA_GRAPHQL_ASYNC_ACTIONS_FETCH_BATCH_SIZE` |
| **Accepted values** | Integer |
| **Default** | `10` |
| **Supported in** | CE, Enterprise Edition, Cloud |
### Async Actions Fetch Interval
The interval, in milliseconds, to sleep before trying to fetch [async actions](/actions/async-actions.mdx) again after a

View File

@ -312,7 +312,8 @@ serveOptions =
soApolloFederationStatus = ApolloFederationDisabled,
soCloseWebsocketsOnMetadataChangeStatus = Init._default Init.closeWebsocketsOnMetadataChangeOption,
soMaxTotalHeaderLength = Init._default Init.maxTotalHeaderLengthOption,
soTriggersErrorLogLevelStatus = Init._default Init.triggersErrorLogLevelStatusOption
soTriggersErrorLogLevelStatus = Init._default Init.triggersErrorLogLevelStatusOption,
soAsyncActionsFetchBatchSize = Init._default Init.asyncActionsFetchBatchSizeOption
}
-- | What log level should be used by the engine; this is not exported, and

View File

@ -487,7 +487,8 @@ initialiseAppEnv env BasicConnectionInfo {..} serveOptions@ServeOptions {..} liv
appEnvSchemaPollInterval = soSchemaPollInterval,
appEnvLicenseKeyCache = Nothing,
appEnvMaxTotalHeaderLength = soMaxTotalHeaderLength,
appEnvTriggersErrorLogLevelStatus = soTriggersErrorLogLevelStatus
appEnvTriggersErrorLogLevelStatus = soTriggersErrorLogLevelStatus,
appEnvAsyncActionsFetchBatchSize = soAsyncActionsFetchBatchSize
}
)
@ -817,7 +818,7 @@ instance MonadMetadataStorage AppM where
deleteScheduledEvent a b = runInSeparateTx $ deleteScheduledEventTx a b
insertAction a b c d = runInSeparateTx $ insertActionTx a b c d
fetchUndeliveredActionEvents = runInSeparateTx fetchUndeliveredActionEventsTx
fetchUndeliveredActionEvents a = runInSeparateTx $ fetchUndeliveredActionEventsTx a
setActionStatus a b = runInSeparateTx $ setActionStatusTx a b
fetchActionResponse = runInSeparateTx . fetchActionResponseTx
clearActionData = runInSeparateTx . clearActionDataTx
@ -1312,6 +1313,7 @@ mkHGEServer setupHook appStateRef consoleType ekgStore = do
(acAsyncActionsFetchInterval <$> getAppContext appStateRef)
(leActionEvents lockedEventsCtx)
Nothing
appEnvAsyncActionsFetchBatchSize
-- start a background thread to handle async action live queries
void

View File

@ -142,7 +142,8 @@ data AppEnv = AppEnv
appEnvCheckFeatureFlag :: CheckFeatureFlag,
appEnvLicenseKeyCache :: Maybe (CredentialCache AgentLicenseKey),
appEnvMaxTotalHeaderLength :: Int,
appEnvTriggersErrorLogLevelStatus :: TriggersErrorLogLevelStatus
appEnvTriggersErrorLogLevelStatus :: TriggersErrorLogLevelStatus,
appEnvAsyncActionsFetchBatchSize :: Int
}
-- | Represents the Dynamic Hasura State, these field are mutable and can be changed

View File

@ -460,8 +460,9 @@ asyncActionsProcessor ::
IO OptionalInterval ->
STM.TVar (Set LockedActionEventId) ->
Maybe GH.GQLQueryText ->
Int ->
m (Forever m)
asyncActionsProcessor getEnvHook logger getSCFromRef' getFetchInterval lockedActionEvents gqlQueryText =
asyncActionsProcessor getEnvHook logger getSCFromRef' getFetchInterval lockedActionEvents gqlQueryText fetchBatchSize =
return
$ Forever ()
$ const
@ -482,7 +483,7 @@ asyncActionsProcessor getEnvHook logger getSCFromRef' getFetchInterval lockedAct
unless (HashMap.null asyncActions) $ do
-- fetch undelivered action events only when there's at least
-- one async action present in the schema cache
asyncInvocationsE <- fetchUndeliveredActionEvents
asyncInvocationsE <- fetchUndeliveredActionEvents fetchBatchSize
asyncInvocations <- liftIO $ onLeft asyncInvocationsE mempty
-- save the actions that are currently fetched from the DB to
-- be processed in a TVar (Set LockedActionEventId) and when
@ -769,8 +770,8 @@ insertActionTx actionName sessionVariables httpHeaders inputArgsPayload =
where
toHeadersMap = HashMap.fromList . map ((bsToTxt . CI.original) *** bsToTxt)
fetchUndeliveredActionEventsTx :: PG.TxE QErr [ActionLogItem]
fetchUndeliveredActionEventsTx =
fetchUndeliveredActionEventsTx :: Int -> PG.TxE QErr [ActionLogItem]
fetchUndeliveredActionEventsTx fetchBatchSize =
map mapEvent
<$> PG.withQE
defaultTxErrorHandler
@ -780,12 +781,12 @@ fetchUndeliveredActionEventsTx =
id in (
select id from hdb_catalog.hdb_action_log
where status = 'created'
for update skip locked limit 10
for update skip locked limit $1
)
returning
id, action_name, request_headers::json, session_variables::json, input_payload::json
|]
()
(Identity batchSize)
False
where
mapEvent
@ -799,6 +800,8 @@ fetchUndeliveredActionEventsTx =
fromHeadersMap = map ((CI.mk . txtToBs) *** txtToBs) . HashMap.toList
batchSize = fromIntegral fetchBatchSize :: Word64
setActionStatusTx :: ActionId -> AsyncActionStatus -> PG.TxE QErr ()
setActionStatusTx actionId = \case
AASCompleted responsePayload ->

View File

@ -146,7 +146,7 @@ class (Monad m) => MonadMetadataStorage m where
[HTTP.Header] ->
Value ->
m (Either QErr ActionId)
fetchUndeliveredActionEvents :: m (Either QErr [ActionLogItem])
fetchUndeliveredActionEvents :: Int -> m (Either QErr [ActionLogItem])
setActionStatus :: ActionId -> AsyncActionStatus -> m (Either QErr ())
fetchActionResponse :: ActionId -> m (Either QErr ActionLogResponse)
clearActionData :: ActionName -> m (Either QErr ())
@ -182,7 +182,7 @@ instance (MonadMetadataStorage m, MonadTrans t, Monad (t m)) => MonadMetadataSto
deleteScheduledEvent a b = lift $ deleteScheduledEvent a b
insertAction a b c d = lift $ insertAction a b c d
fetchUndeliveredActionEvents = lift fetchUndeliveredActionEvents
fetchUndeliveredActionEvents a = lift $ fetchUndeliveredActionEvents a
setActionStatus a b = lift $ setActionStatus a b
fetchActionResponse = lift . fetchActionResponse
clearActionData = lift . clearActionData

View File

@ -219,6 +219,7 @@ mkServeOptions sor@ServeOptionsRaw {..} = do
withOptionDefault rsoCloseWebsocketsOnMetadataChangeStatus closeWebsocketsOnMetadataChangeOption
soMaxTotalHeaderLength <- withOptionDefault rsoMaxTotalHeaderLength maxTotalHeaderLengthOption
soTriggersErrorLogLevelStatus <- withOptionDefault rsoTriggersErrorLogLevelStatus triggersErrorLogLevelStatusOption
soAsyncActionsFetchBatchSize <- withOptionDefault rsoAsyncActionsFetchBatchSize asyncActionsFetchBatchSizeOption
pure ServeOptions {..}
-- | Fetch Postgres 'Query.ConnParams' components from the environment

View File

@ -64,6 +64,7 @@ module Hasura.Server.Init.Arg.Command.Serve
triggersErrorLogLevelStatusOption,
closeWebsocketsOnMetadataChangeOption,
maxTotalHeaderLengthOption,
asyncActionsFetchBatchSizeOption,
-- * Pretty Printer
serveCmdFooter,
@ -154,6 +155,7 @@ serveCommandParser =
<*> parseEnableCloseWebsocketsOnMetadataChange
<*> parseMaxTotalHeaderLength
<*> parseTriggersErrorLoglevelStatus
<*> parseAsyncActionsFetchBatchSize
--------------------------------------------------------------------------------
-- Serve Options
@ -1233,6 +1235,23 @@ parseTriggersErrorLoglevelStatus =
<> Opt.help (Config._helpMessage triggersErrorLogLevelStatusOption)
)
asyncActionsFetchBatchSizeOption :: Config.Option Int
asyncActionsFetchBatchSizeOption =
Config.Option
{ Config._default = 10,
Config._envVar = "HASURA_GRAPHQL_ASYNC_ACTIONS_FETCH_BATCH_SIZE",
Config._helpMessage = "Number of requests processed at a time in asynchronous actions (Default: 10)"
}
parseAsyncActionsFetchBatchSize :: Opt.Parser (Maybe Int)
parseAsyncActionsFetchBatchSize =
Opt.optional
$ Opt.option
(Opt.eitherReader Env.fromEnv)
( Opt.long "async-actions-fetch-batch-size"
<> Opt.help (Config._helpMessage asyncActionsFetchBatchSizeOption)
)
--------------------------------------------------------------------------------
-- Pretty Printer

View File

@ -324,7 +324,8 @@ data ServeOptionsRaw impl = ServeOptionsRaw
rsoApolloFederationStatus :: Maybe Server.Types.ApolloFederationStatus,
rsoCloseWebsocketsOnMetadataChangeStatus :: Maybe Server.Types.CloseWebsocketsOnMetadataChangeStatus,
rsoMaxTotalHeaderLength :: Maybe Int,
rsoTriggersErrorLogLevelStatus :: Maybe Server.Types.TriggersErrorLogLevelStatus
rsoTriggersErrorLogLevelStatus :: Maybe Server.Types.TriggersErrorLogLevelStatus,
rsoAsyncActionsFetchBatchSize :: Maybe Int
}
-- | Whether or not to serve Console assets.
@ -626,7 +627,8 @@ data ServeOptions impl = ServeOptions
soApolloFederationStatus :: Server.Types.ApolloFederationStatus,
soCloseWebsocketsOnMetadataChangeStatus :: Server.Types.CloseWebsocketsOnMetadataChangeStatus,
soMaxTotalHeaderLength :: Int,
soTriggersErrorLogLevelStatus :: Server.Types.TriggersErrorLogLevelStatus
soTriggersErrorLogLevelStatus :: Server.Types.TriggersErrorLogLevelStatus,
soAsyncActionsFetchBatchSize :: Int
}
-- | 'ResponseInternalErrorsConfig' represents the encoding of the

View File

@ -95,7 +95,8 @@ emptyServeOptionsRaw =
rsoApolloFederationStatus = Nothing,
rsoCloseWebsocketsOnMetadataChangeStatus = Nothing,
rsoMaxTotalHeaderLength = Nothing,
rsoTriggersErrorLogLevelStatus = Nothing
rsoTriggersErrorLogLevelStatus = Nothing,
rsoAsyncActionsFetchBatchSize = Nothing
}
mkServeOptionsSpec :: Hspec.Spec

View File

@ -94,7 +94,8 @@ serveOptions =
soApolloFederationStatus = ApolloFederationDisabled,
soCloseWebsocketsOnMetadataChangeStatus = Init._default Init.closeWebsocketsOnMetadataChangeOption,
soMaxTotalHeaderLength = Init._default Init.maxTotalHeaderLengthOption,
soTriggersErrorLogLevelStatus = Init._default Init.triggersErrorLogLevelStatusOption
soTriggersErrorLogLevelStatus = Init._default Init.triggersErrorLogLevelStatusOption,
soAsyncActionsFetchBatchSize = Init._default Init.asyncActionsFetchBatchSizeOption
}
-- | What log level should be used by the engine; this is not exported, and