diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index 8d0eb5f98e1..2678f894358 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -375,6 +375,7 @@ initialiseServeCtx env GlobalCtx {..} so@ServeOptions {..} = do sqlGenCtx soEnableMaintenanceMode soExperimentalFeatures + soEventingMode (rebuildableSchemaCache, _) <- lift . flip onException (flushLogger loggerCtx) $ @@ -693,6 +694,7 @@ mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} initTime postPollHook soConnectionOptions soWebsocketKeepAlive soEnableMaintenanceMode + soEventingMode soExperimentalFeatures _scEnabledLogTypes soWebsocketConnectionInitTimeout @@ -704,6 +706,7 @@ mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} initTime postPollHook sqlGenCtx soEnableMaintenanceMode soExperimentalFeatures + soEventingMode -- Log Warning if deprecated environment variables are used sources <- scSources <$> liftIO (getSCFromRef cacheRef) @@ -716,6 +719,7 @@ mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} initTime postPollHook -- NOTE: `newLogTVar` is being used to make sure that the metadata logger runs only once -- while logging errors or any `inconsistent_metadata` logs. newLogTVar <- liftIO $ STM.newTVarIO False + -- Start a background thread for processing schema sync event present in the '_sscSyncEventRef' _ <- startSchemaSyncProcessorThread @@ -727,10 +731,7 @@ mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} initTime postPollHook serverConfigCtx newLogTVar - let maxEvThrds = fromMaybe defaultMaxEventThreads soEventsHttpPoolSize - fetchI = milliseconds $ fromMaybe (Milliseconds defaultFetchInterval) soEventsFetchInterval - allSources = HM.elems $ scSources $ lastBuiltSchemaCache _scSchemaCache - eventLogBehavior = + let eventLogBehavior = LogBehavior { _lbHeader = if soLogHeadersFromEnv then LogEnvValue else LogEnvVarname, _lbResponse = if soDevMode then LogEntireResponse else LogSanitisedResponse @@ -744,100 +745,19 @@ mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} initTime postPollHook <*> STM.newTVarIO mempty <*> STM.newTVarIO mempty - unless (getNonNegativeInt soEventsFetchBatchSize == 0 || soEventsFetchInterval == Just 0) $ do - -- Don't start the events poller thread when fetchBatchSize or fetchInterval is 0 - -- prepare event triggers data - eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI soEventsFetchBatchSize - let eventsGracefulShutdownAction = - waitForProcessingAction - logger - "event_triggers" - (length <$> readTVarIO (leEvents lockedEventsCtx)) - (EventTriggerShutdownAction (shutdownEventTriggerEvents allSources logger lockedEventsCtx)) - soGracefulShutdownTimeout - unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers" - void $ - C.forkManagedTWithGracefulShutdown - "processEventQueue" - logger - (C.ThreadShutdown (liftIO eventsGracefulShutdownAction)) - $ processEventQueue - logger - eventLogBehavior - _scHttpManager - (getSCFromRef cacheRef) - eventEngineCtx - lockedEventsCtx - serverMetrics - soEnableMaintenanceMode + case soEventingMode of + EventingEnabled -> do + startEventTriggerPollerThread logger eventLogBehavior lockedEventsCtx cacheRef + startAsyncActionsPollerThread logger lockedEventsCtx cacheRef actionSubState - -- start a background thread to handle async actions - case soAsyncActionsFetchInterval of - Skip -> pure () -- Don't start the poller thread - Interval sleepTime -> do - let label = "asyncActionsProcessor" - asyncActionGracefulShutdownAction = - ( liftWithStateless \lowerIO -> - ( waitForProcessingAction - logger - "async_actions" - (length <$> readTVarIO (leActionEvents lockedEventsCtx)) - (MetadataDBShutdownAction (hoist lowerIO (shutdownAsyncActions lockedEventsCtx))) - soGracefulShutdownTimeout - ) - ) + -- start a background thread to create new cron events + _cronEventsThread <- + C.forkManagedT "runCronEventsGenerator" logger $ + runCronEventsGenerator logger (getSCFromRef cacheRef) - void $ - C.forkManagedTWithGracefulShutdown - label - logger - (C.ThreadShutdown asyncActionGracefulShutdownAction) - $ asyncActionsProcessor - env - logger - (_scrCache cacheRef) - (leActionEvents lockedEventsCtx) - _scHttpManager - sleepTime - Nothing - - -- start a background thread to handle async action live queries - _asyncActionsSubThread <- - C.forkManagedT "asyncActionSubscriptionsProcessor" logger $ - asyncActionSubscriptionsProcessor actionSubState - - -- start a background thread to create new cron events - _cronEventsThread <- - C.forkManagedT "runCronEventsGenerator" logger $ - runCronEventsGenerator logger (getSCFromRef cacheRef) - - -- prepare scheduled triggers - lift $ prepareScheduledEvents logger - - -- start a background thread to deliver the scheduled events - _scheduledEventsThread <- do - let scheduledEventsGracefulShutdownAction = - ( liftWithStateless \lowerIO -> - ( waitForProcessingAction - logger - "scheduled_events" - (getProcessingScheduledEventsCount lockedEventsCtx) - (MetadataDBShutdownAction (hoist lowerIO unlockAllLockedScheduledEvents)) - soGracefulShutdownTimeout - ) - ) - - C.forkManagedTWithGracefulShutdown - "processScheduledTriggers" - logger - (C.ThreadShutdown scheduledEventsGracefulShutdownAction) - $ processScheduledTriggers - env - logger - eventLogBehavior - _scHttpManager - (getSCFromRef cacheRef) - lockedEventsCtx + startScheduledEventsPollerThread logger eventLogBehavior lockedEventsCtx cacheRef + EventingDisabled -> + unLogger logger $ mkGenericStrLog LevelInfo "server" "starting in eventing disabled mode" -- start a background thread to check for updates _updateThread <- @@ -960,6 +880,104 @@ mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} initTime postPollHook C.sleep (5) -- sleep for 5 seconds and then repeat waitForProcessingAction l actionType processingEventsCountAction' shutdownAction (maxTimeout - (Seconds 5)) + startEventTriggerPollerThread logger eventLogBehavior lockedEventsCtx cacheRef = do + let maxEvThrds = fromMaybe defaultMaxEventThreads soEventsHttpPoolSize + fetchI = milliseconds $ fromMaybe (Milliseconds defaultFetchInterval) soEventsFetchInterval + allSources = HM.elems $ scSources $ lastBuiltSchemaCache _scSchemaCache + + unless (getNonNegativeInt soEventsFetchBatchSize == 0 || soEventsFetchInterval == Just 0) $ do + -- Don't start the events poller thread when fetchBatchSize or fetchInterval is 0 + -- prepare event triggers data + eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI soEventsFetchBatchSize + let eventsGracefulShutdownAction = + waitForProcessingAction + logger + "event_triggers" + (length <$> readTVarIO (leEvents lockedEventsCtx)) + (EventTriggerShutdownAction (shutdownEventTriggerEvents allSources logger lockedEventsCtx)) + soGracefulShutdownTimeout + unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers" + void $ + C.forkManagedTWithGracefulShutdown + "processEventQueue" + logger + (C.ThreadShutdown (liftIO eventsGracefulShutdownAction)) + $ processEventQueue + logger + eventLogBehavior + _scHttpManager + (getSCFromRef cacheRef) + eventEngineCtx + lockedEventsCtx + serverMetrics + soEnableMaintenanceMode + + startAsyncActionsPollerThread logger lockedEventsCtx cacheRef actionSubState = do + -- start a background thread to handle async actions + case soAsyncActionsFetchInterval of + Skip -> pure () -- Don't start the poller thread + Interval sleepTime -> do + let label = "asyncActionsProcessor" + asyncActionGracefulShutdownAction = + ( liftWithStateless \lowerIO -> + ( waitForProcessingAction + logger + "async_actions" + (length <$> readTVarIO (leActionEvents lockedEventsCtx)) + (MetadataDBShutdownAction (hoist lowerIO (shutdownAsyncActions lockedEventsCtx))) + soGracefulShutdownTimeout + ) + ) + + void $ + C.forkManagedTWithGracefulShutdown + label + logger + (C.ThreadShutdown asyncActionGracefulShutdownAction) + $ asyncActionsProcessor + env + logger + (_scrCache cacheRef) + (leActionEvents lockedEventsCtx) + _scHttpManager + sleepTime + Nothing + + -- start a background thread to handle async action live queries + void $ + C.forkManagedT "asyncActionSubscriptionsProcessor" logger $ + asyncActionSubscriptionsProcessor actionSubState + + startScheduledEventsPollerThread logger eventLogBehavior lockedEventsCtx cacheRef = do + -- prepare scheduled triggers + lift $ prepareScheduledEvents logger + + -- start a background thread to deliver the scheduled events + -- _scheduledEventsThread <- do + let scheduledEventsGracefulShutdownAction = + ( liftWithStateless \lowerIO -> + ( waitForProcessingAction + logger + "scheduled_events" + (getProcessingScheduledEventsCount lockedEventsCtx) + (MetadataDBShutdownAction (hoist lowerIO unlockAllLockedScheduledEvents)) + soGracefulShutdownTimeout + ) + ) + + void $ + C.forkManagedTWithGracefulShutdown + "processScheduledTriggers" + logger + (C.ThreadShutdown scheduledEventsGracefulShutdownAction) + $ processScheduledTriggers + env + logger + eventLogBehavior + _scHttpManager + (getSCFromRef cacheRef) + lockedEventsCtx + instance (Monad m) => Tracing.HasReporter (PGMetadataStorageAppT m) instance (Monad m) => HasResourceLimits (PGMetadataStorageAppT m) where diff --git a/server/src-lib/Hasura/Backends/Postgres/DDL/Source.hs b/server/src-lib/Hasura/Backends/Postgres/DDL/Source.hs index cae19d15884..77a3eb2994e 100644 --- a/server/src-lib/Hasura/Backends/Postgres/DDL/Source.hs +++ b/server/src-lib/Hasura/Backends/Postgres/DDL/Source.hs @@ -40,7 +40,7 @@ import Hasura.RQL.Types.SourceCustomization import Hasura.RQL.Types.Table import Hasura.SQL.Backend import Hasura.Server.Migrate.Internal -import Hasura.Server.Types (MaintenanceMode (..)) +import Hasura.Server.Types (EventingMode (..), MaintenanceMode (..)) import Language.Haskell.TH.Lib qualified as TH import Language.Haskell.TH.Syntax qualified as TH @@ -143,13 +143,16 @@ resolveDatabaseMetadata sourceConfig sourceCustomization = runExceptT do -- | Initialise catalog tables for a source, including those required by the event delivery subsystem. initCatalogForSource :: - forall m. MonadTx m => MaintenanceMode -> UTCTime -> m RecreateEventTriggers -initCatalogForSource maintenanceMode migrationTime = do + forall m. MonadTx m => MaintenanceMode -> EventingMode -> UTCTime -> m RecreateEventTriggers +initCatalogForSource maintenanceMode eventingMode migrationTime = do hdbCatalogExist <- doesSchemaExist "hdb_catalog" eventLogTableExist <- doesTableExist "hdb_catalog" "event_log" sourceVersionTableExist <- doesTableExist "hdb_catalog" "hdb_source_catalog_version" - -- when maintenance mode is enabled, don't perform any migrations + if + -- when eventing mode is disabled, don't perform any migrations + | eventingMode == EventingDisabled -> pure RETDoNothing + -- when maintenance mode is enabled, don't perform any migrations | maintenanceMode == MaintenanceModeEnabled -> pure RETDoNothing -- Fresh database | not hdbCatalogExist -> liftTx do diff --git a/server/src-lib/Hasura/GraphQL/Schema.hs b/server/src-lib/Hasura/GraphQL/Schema.hs index 103a28c78a9..b1be25c0a6d 100644 --- a/server/src-lib/Hasura/GraphQL/Schema.hs +++ b/server/src-lib/Hasura/GraphQL/Schema.hs @@ -65,7 +65,7 @@ buildGQLContext :: Seq InconsistentMetadata ) buildGQLContext queryType sources allRemoteSchemas allActions nonObjectCustomTypes = do - ServerConfigCtx functionPermsCtx remoteSchemaPermsCtx sqlGenCtx@(SQLGenCtx stringifyNum boolCollapse) _maintenanceMode _experimentalFeatures <- + ServerConfigCtx functionPermsCtx remoteSchemaPermsCtx sqlGenCtx@(SQLGenCtx stringifyNum boolCollapse) _maintenanceMode _experimentalFeatures _eventingMode <- askServerConfigCtx let remoteSchemasRoles = concatMap (Map.keys . _rscPermissions . fst . snd) $ Map.toList allRemoteSchemas diff --git a/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs b/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs index cb8bda0cb54..724d5fc5026 100644 --- a/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs +++ b/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs @@ -416,12 +416,13 @@ buildSchemaCacheRule logger env = proc (metadata, invalidationKeys) -> do Tag.PostgresVanillaTag -> do migrationTime <- liftIO getCurrentTime maintenanceMode <- _sccMaintenanceMode <$> askServerConfigCtx + eventingMode <- _sccEventingMode <$> askServerConfigCtx liftEitherM $ liftIO $ LA.withAsync (logPGSourceCatalogMigrationLockedQueries logger sourceConfig) $ const $ do let initCatalogAction = - runExceptT $ runTx (_pscExecCtx sourceConfig) Q.ReadWrite (initCatalogForSource maintenanceMode migrationTime) + runExceptT $ runTx (_pscExecCtx sourceConfig) Q.ReadWrite (initCatalogForSource maintenanceMode eventingMode migrationTime) -- The `initCatalogForSource` action is retried here because -- in cloud there will be multiple workers (graphql-engine instances) -- trying to migrate the source catalog, when needed. This introduces diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index 9f9da5fd193..a2fdab12079 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -140,7 +140,8 @@ data ServerCtx = ServerCtx scEnableMaintenanceMode :: !MaintenanceMode, scExperimentalFeatures :: !(S.HashSet ExperimentalFeature), -- | this is only required for the short-term fix in https://github.com/hasura/graphql-engine-mono/issues/1770 - scEnabledLogTypes :: !(S.HashSet (L.EngineLogType L.Hasura)) + scEnabledLogTypes :: !(S.HashSet (L.EngineLogType L.Hasura)), + scEventingMode :: !EventingMode } data HandlerCtx = HandlerCtx @@ -464,7 +465,8 @@ v1QueryHandler query = do functionPermsCtx <- asks (scFunctionPermsCtx . hcServerCtx) maintenanceMode <- asks (scEnableMaintenanceMode . hcServerCtx) experimentalFeatures <- asks (scExperimentalFeatures . hcServerCtx) - let serverConfigCtx = ServerConfigCtx functionPermsCtx remoteSchemaPermsCtx sqlGenCtx maintenanceMode experimentalFeatures + eventingMode <- asks (scEventingMode . hcServerCtx) + let serverConfigCtx = ServerConfigCtx functionPermsCtx remoteSchemaPermsCtx sqlGenCtx maintenanceMode experimentalFeatures eventingMode runQuery env logger @@ -500,7 +502,8 @@ v1MetadataHandler query = do functionPermsCtx <- asks (scFunctionPermsCtx . hcServerCtx) experimentalFeatures <- asks (scExperimentalFeatures . hcServerCtx) maintenanceMode <- asks (scEnableMaintenanceMode . hcServerCtx) - let serverConfigCtx = ServerConfigCtx functionPermsCtx remoteSchemaPermsCtx sqlGenCtx maintenanceMode experimentalFeatures + eventingMode <- asks (scEventingMode . hcServerCtx) + let serverConfigCtx = ServerConfigCtx functionPermsCtx remoteSchemaPermsCtx sqlGenCtx maintenanceMode experimentalFeatures eventingMode r <- withSCUpdate scRef @@ -551,7 +554,8 @@ v2QueryHandler query = do experimentalFeatures <- asks (scExperimentalFeatures . hcServerCtx) functionPermsCtx <- asks (scFunctionPermsCtx . hcServerCtx) maintenanceMode <- asks (scEnableMaintenanceMode . hcServerCtx) - let serverConfigCtx = ServerConfigCtx functionPermsCtx remoteSchemaPermsCtx sqlGenCtx maintenanceMode experimentalFeatures + eventingMode <- asks (scEventingMode . hcServerCtx) + let serverConfigCtx = ServerConfigCtx functionPermsCtx remoteSchemaPermsCtx sqlGenCtx maintenanceMode experimentalFeatures eventingMode V2Q.runQuery env instanceId userInfo schemaCache httpMgr serverConfigCtx query v1Alpha1GQHandler :: @@ -812,6 +816,7 @@ mkWaiApp :: WS.ConnectionOptions -> KeepAliveDelay -> MaintenanceMode -> + EventingMode -> -- | Set of the enabled experimental features S.HashSet ExperimentalFeature -> S.HashSet (L.EngineLogType L.Hasura) -> @@ -842,6 +847,7 @@ mkWaiApp connectionOptions keepAliveDelay maintenanceMode + eventingMode experimentalFeatures enabledLogTypes wsConnInitTimeout = do @@ -881,7 +887,8 @@ mkWaiApp scFunctionPermsCtx = functionPermsCtx, scEnableMaintenanceMode = maintenanceMode, scExperimentalFeatures = experimentalFeatures, - scEnabledLogTypes = enabledLogTypes + scEnabledLogTypes = enabledLogTypes, + scEventingMode = eventingMode } spockApp <- liftWithStateless $ \lowerIO -> diff --git a/server/src-lib/Hasura/Server/Init.hs b/server/src-lib/Hasura/Server/Init.hs index cfb3a30b405..96b3f9b68a1 100644 --- a/server/src-lib/Hasura/Server/Init.hs +++ b/server/src-lib/Hasura/Server/Init.hs @@ -284,6 +284,7 @@ mkServeOptions rso = do devMode gracefulShutdownTime webSocketConnectionInitTimeout + EventingEnabled where defaultAsyncActionsFetchInterval = Interval 1000 -- 1000 Milliseconds or 1 Second defaultSchemaPollInterval = Interval 1000 -- 1000 Milliseconds or 1 Second @@ -481,6 +482,7 @@ eventsFetchBatchSizeEnv :: (String, String) eventsFetchBatchSizeEnv = ( "HASURA_GRAPHQL_EVENTS_FETCH_BATCH_SIZE", "The maximum number of events to be fetched from the events table in a single batch. Default 100" + ++ "Value \"0\" implies completely disable fetching events from events table. " ) asyncActionsFetchIntervalEnv :: (String, String) diff --git a/server/src-lib/Hasura/Server/Init/Config.hs b/server/src-lib/Hasura/Server/Init/Config.hs index caf14d2d527..e9866a7c2b1 100644 --- a/server/src-lib/Hasura/Server/Init/Config.hs +++ b/server/src-lib/Hasura/Server/Init/Config.hs @@ -220,7 +220,8 @@ data ServeOptions impl = ServeOptions soEventsFetchBatchSize :: !NonNegativeInt, soDevMode :: !Bool, soGracefulShutdownTimeout :: !Seconds, - soWebsocketConnectionInitTimeout :: !WSConnectionInitTimeout + soWebsocketConnectionInitTimeout :: !WSConnectionInitTimeout, + soEventingMode :: !EventingMode } data DowngradeOptions = DowngradeOptions diff --git a/server/src-lib/Hasura/Server/Types.hs b/server/src-lib/Hasura/Server/Types.hs index cdce0969ec1..49ee5fc43c3 100644 --- a/server/src-lib/Hasura/Server/Types.hs +++ b/server/src-lib/Hasura/Server/Types.hs @@ -2,6 +2,7 @@ module Hasura.Server.Types ( ExperimentalFeature (..), InstanceId (..), MaintenanceMode (..), + EventingMode (..), PGVersion (PGVersion), RequestId (..), ServerConfigCtx (..), @@ -72,11 +73,18 @@ instance FromJSON MaintenanceMode where instance ToJSON MaintenanceMode where toJSON = Bool . (== MaintenanceModeEnabled) +-- | EventingMode decides whether the eventing subsystem should be enabled or disabled. +-- `EventDisabled` mode disables Event Triggers, Async Actions, Scheduled Events and source catalaog migrations. +-- This is an internal feature and will not be exposed to users. +data EventingMode = EventingEnabled | EventingDisabled + deriving (Show, Eq) + data ServerConfigCtx = ServerConfigCtx { _sccFunctionPermsCtx :: !FunctionPermissionsCtx, _sccRemoteSchemaPermsCtx :: !RemoteSchemaPermsCtx, _sccSQLGenCtx :: !SQLGenCtx, _sccMaintenanceMode :: !MaintenanceMode, - _sccExperimentalFeatures :: !(Set.HashSet ExperimentalFeature) + _sccExperimentalFeatures :: !(Set.HashSet ExperimentalFeature), + _sccEventingMode :: !EventingMode } deriving (Show, Eq) diff --git a/server/src-test/Main.hs b/server/src-test/Main.hs index 24150a360e3..cd25ac186c0 100644 --- a/server/src-test/Main.hs +++ b/server/src-test/Main.hs @@ -150,7 +150,7 @@ buildPostgresSpecs maybeUrlTemplate = do let sqlGenCtx = SQLGenCtx False False maintenanceMode = MaintenanceModeDisabled serverConfigCtx = - ServerConfigCtx FunctionPermissionsInferred RemoteSchemaPermsDisabled sqlGenCtx maintenanceMode mempty + ServerConfigCtx FunctionPermissionsInferred RemoteSchemaPermsDisabled sqlGenCtx maintenanceMode mempty EventingEnabled cacheBuildParams = CacheBuildParams httpManager (mkPgSourceResolver print) serverConfigCtx pgLogger = print diff --git a/server/tests-hspec/Harness/Constants.hs b/server/tests-hspec/Harness/Constants.hs index 977ee411809..56d78aae73f 100644 --- a/server/tests-hspec/Harness/Constants.hs +++ b/server/tests-hspec/Harness/Constants.hs @@ -164,7 +164,8 @@ serveOptions = soEventsFetchBatchSize = 1, soDevMode = True, soGracefulShutdownTimeout = 0, -- Don't wait to shutdown. - soWebsocketConnectionInitTimeout = defaultWSConnectionInitTimeout + soWebsocketConnectionInitTimeout = defaultWSConnectionInitTimeout, + soEventingMode = EventingEnabled } -- | Use the below to show messages.