From d3611af58dc3eaf2f30c754abd32f2c543a1ddb9 Mon Sep 17 00:00:00 2001 From: Swann Moreau Date: Tue, 25 May 2021 12:20:13 +0530 Subject: [PATCH] server: lazy event catalog initialisation event catalog: - `hdb_catalog` is no longer automatically created - catalog is initialised when the first event trigger is created - catalog initialisation is done during the schema cache build, using `ArrowCache` so it is only run in response to a change to the set of event triggers event queue: - `processEventQueue` thread is prevented from starting when `HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL=0` - `processEventQueue` thread only processes sources for which at least one event trigger exists in some table in the source Co-authored-by: Anon Ray <616387+ecthiender@users.noreply.github.com> GitOrigin-RevId: 73f256465d62490cd2b59dcd074718679993d4fe --- CHANGELOG.md | 1 + .../Hasura/Backends/BigQuery/DDL/Source.hs | 4 +- .../Hasura/Backends/MSSQL/DDL/Source.hs | 4 +- .../Hasura/Backends/Postgres/DDL/Source.hs | 68 ++++++++++++++----- .../src-lib/Hasura/Eventing/EventTrigger.hs | 54 ++++++++------- server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs | 41 ++++++++++- .../Hasura/RQL/Types/Metadata/Backend.hs | 1 - server/src-lib/Hasura/SQL/Tag.hs | 5 +- server/src-rsr/drop_pg_source.sql | 8 +-- 9 files changed, 129 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1f893f436b..98187dfdf31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ ### Bug fixes and improvements +- server: initialise `hdb_catalog` tables only when required, and only run the event loop for sources where it is required - server: fix a bug where remote schema permissions would fail when used in conjunction with query variables (fix #6656) - server: add `rename_source` metadata API (fix #6681) - server: fix subscriptions with session argument in user-defined function (fix #6657) diff --git a/server/src-lib/Hasura/Backends/BigQuery/DDL/Source.hs b/server/src-lib/Hasura/Backends/BigQuery/DDL/Source.hs index 406f64230f3..4a602c7941a 100644 --- a/server/src-lib/Hasura/Backends/BigQuery/DDL/Source.hs +++ b/server/src-lib/Hasura/Backends/BigQuery/DDL/Source.hs @@ -27,7 +27,6 @@ import Hasura.RQL.Types.Common import Hasura.RQL.Types.Source import Hasura.RQL.Types.Table import Hasura.SQL.Backend -import Hasura.Server.Types (MaintenanceMode) resolveSourceConfig :: @@ -53,9 +52,8 @@ resolveSourceConfig _name BigQueryConnSourceConfig{..} = runExceptT $ do resolveSource :: (MonadIO m) => BigQuerySourceConfig - -> MaintenanceMode -> m (Either QErr (ResolvedSource 'BigQuery)) -resolveSource sourceConfig _maintenanceMode = +resolveSource sourceConfig = runExceptT $ do result <- getTables sourceConfig case result of diff --git a/server/src-lib/Hasura/Backends/MSSQL/DDL/Source.hs b/server/src-lib/Hasura/Backends/MSSQL/DDL/Source.hs index a590b1e5457..7afc8bf415e 100644 --- a/server/src-lib/Hasura/Backends/MSSQL/DDL/Source.hs +++ b/server/src-lib/Hasura/Backends/MSSQL/DDL/Source.hs @@ -13,7 +13,6 @@ import Hasura.Base.Error import Hasura.RQL.Types.Common import Hasura.RQL.Types.Source import Hasura.SQL.Backend -import Hasura.Server.Types (MaintenanceMode) resolveSourceConfig :: (MonadIO m) @@ -27,9 +26,8 @@ resolveSourceConfig _name (MSSQLConnConfiguration connInfo) = runExceptT do resolveDatabaseMetadata :: (MonadIO m) => MSSQLSourceConfig - -> MaintenanceMode -> m (Either QErr (ResolvedSource 'MSSQL)) -resolveDatabaseMetadata config _maintenanceMode = runExceptT do +resolveDatabaseMetadata config = runExceptT do dbTablesMetadata <- loadDBMetadata pool pure $ ResolvedSource config dbTablesMetadata mempty mempty where diff --git a/server/src-lib/Hasura/Backends/Postgres/DDL/Source.hs b/server/src-lib/Hasura/Backends/Postgres/DDL/Source.hs index 1393c2c641e..d464002a0b6 100644 --- a/server/src-lib/Hasura/Backends/Postgres/DDL/Source.hs +++ b/server/src-lib/Hasura/Backends/Postgres/DDL/Source.hs @@ -3,7 +3,7 @@ module Hasura.Backends.Postgres.DDL.Source , fetchFunctionMetadata , fetchPgScalars , fetchTableMetadata - , initSource + , initCatalogForSource , postDropSourceHook , resolveDatabaseMetadata , resolveSourceConfig @@ -54,18 +54,18 @@ resolveSourceConfig name config = runExceptT do resolveDatabaseMetadata :: forall pgKind m . (Backend ('Postgres pgKind), ToMetadataFetchQuery pgKind, MonadIO m, MonadBaseControl IO m) - => SourceConfig ('Postgres pgKind) -> MaintenanceMode -> m (Either QErr (ResolvedSource ('Postgres pgKind))) -resolveDatabaseMetadata sourceConfig maintenanceMode = runExceptT do - (tablesMeta, functionsMeta, pgScalars) <- runLazyTx (_pscExecCtx sourceConfig) Q.ReadWrite $ do - initSource maintenanceMode + => SourceConfig ('Postgres pgKind) -> m (Either QErr (ResolvedSource ('Postgres pgKind))) +resolveDatabaseMetadata sourceConfig = runExceptT do + (tablesMeta, functionsMeta, pgScalars) <- runLazyTx (_pscExecCtx sourceConfig) Q.ReadOnly $ do tablesMeta <- fetchTableMetadata functionsMeta <- fetchFunctionMetadata pgScalars <- fetchPgScalars pure (tablesMeta, functionsMeta, pgScalars) pure $ ResolvedSource sourceConfig tablesMeta functionsMeta pgScalars -initSource :: MonadTx m => MaintenanceMode -> m () -initSource maintenanceMode = do +-- | Initialise catalog tables for a source, including those required by the event delivery subsystem. +initCatalogForSource :: MonadTx m => MaintenanceMode -> m () +initCatalogForSource maintenanceMode = do hdbCatalogExist <- doesSchemaExist "hdb_catalog" eventLogTableExist <- doesTableExist "hdb_catalog" "event_log" sourceVersionTableExist <- doesTableExist "hdb_catalog" "hdb_source_catalog_version" @@ -185,19 +185,53 @@ postDropSourceHook => PGSourceConfig -> m () postDropSourceHook sourceConfig = do -- Clean traces of Hasura in source database + -- + -- There are three type of database we have to consider here, which we + -- refer to as types 1, 2, and 3 below: + -- 1. default postgres source (no separate metadata database) + -- In this case, we want to drop nothing. + -- + -- 2. dedicated metadata database + -- In this case, we want to only drop source-related tables ("event_log", + -- "hdb_source_catalog_version", etc), leaving the rest of the schema intact. + -- + -- 3. non-default postgres source (necessarily without metadata tables) + -- In this case, we want to drop the entire "hdb_catalog" schema. liftEitherM $ runPgSourceWriteTx sourceConfig $ do hdbMetadataTableExist <- doesTableExist "hdb_catalog" "hdb_metadata" eventLogTableExist <- doesTableExist "hdb_catalog" "event_log" - -- If "hdb_metadata" and "event_log" tables found in the "hdb_catalog" schema - -- then this infers the source is being used as default potgres source (--database-url option). - -- In this case don't drop any thing in the catalog schema. - if | hdbMetadataTableExist && eventLogTableExist -> pure () - -- Otherwise, if only "hdb_metadata" table exist, then this infers the source is - -- being used as metadata storage (--metadata-database-url option). In this case - -- drop only source related tables and not "hdb_catalog" schema - | hdbMetadataTableExist -> - Q.multiQE defaultTxErrorHandler $(makeRelativeToProject "src-rsr/drop_pg_source.sql" >>= Q.sqlFromFile) - -- Otherwise, drop "hdb_catalog" schema. + if + -- If "hdb_metadata" and "event_log" tables are found in the "hdb_catalog" schema, + -- then this implies the source is being used as the default postgres source, i.e. + -- this is a default postgres source (type 1 above). + -- In this case we don't drop anything in the catalog schema. + | hdbMetadataTableExist && eventLogTableExist -> pure () + -- However, it is possible that the above condition is not met for a default + -- postgres source. This will happen if no event triggers have been defined, + -- because we initialise event catalog tables only when required (i.e. when + -- a trigger is defined). + -- + -- This could lead to a possible problem where "hdb_metadata" exists, "event_log" + -- does not exist, but the _other_ source-related tables exist. In that case, we + -- would end up dropping them here, which would go against our requirements above. + -- However, observe that these tables are always all created or destroyed together, + -- in single transactions where we run setup/teardown SQL files, so this condition + -- is guaranteed to not take place. + -- + -- So if only "hdb_metadata" exists, we have one of two possible cases: + -- * this is a metadata database (type 2) and we can drop all source-related tables + -- * this is a default database (type 1) which has no source-related tables (because + -- it has no "event_log" table, it cannot have the others, because of the previous + -- argument) + -- + -- It should be clear that we can now safely issue DROP IF EXISTS statements for + -- all source-related tables now according to the spec above. The IF EXISTS lets us + -- handle both cases uniformly, doing nothing in the second case, and for metadata + -- databases, we drop only source-related tables from the database's "hdb_catalog" schema. + | hdbMetadataTableExist -> Q.multiQE + defaultTxErrorHandler $(makeRelativeToProject "src-rsr/drop_pg_source.sql" >>= Q.sqlFromFile) + -- Otherwise, we have a non-default postgres source, which has no metadata tables. + -- We drop the entire "hdb_catalog" schema as discussed above. | otherwise -> dropHdbCatalogSchema -- Destory postgres source connection diff --git a/server/src-lib/Hasura/Eventing/EventTrigger.hs b/server/src-lib/Hasura/Eventing/EventTrigger.hs index d264c45497b..446c36fa99b 100644 --- a/server/src-lib/Hasura/Eventing/EventTrigger.hs +++ b/server/src-lib/Hasura/Eventing/EventTrigger.hs @@ -260,33 +260,39 @@ processEventQueue logger logenv httpMgr getSchemaCache EventEngineCtx{..} Locked (delivered=t or error=t or archived=t) after a fixed number of tries (assuming it begins with locked='f'). -} pgSources <- scSources <$> liftIO getSchemaCache - liftIO $ fmap concat $ forM (M.toList pgSources) $ \(sourceName, sourceCache) -> - case unsafeSourceConfiguration @('Postgres 'Vanilla) sourceCache of - Nothing -> pure [] - Just sourceConfig -> do - fetchEventsTxE <- - case maintenanceMode of - MaintenanceModeEnabled -> do - maintenanceModeVersion <- runPgSourceReadTx sourceConfig getMaintenanceModeVersion - pure $ fmap (fetchEventsMaintenanceMode sourceName fetchBatchSize) maintenanceModeVersion - MaintenanceModeDisabled -> return $ Right $ fetchEvents sourceName fetchBatchSize - liftIO $ do - case fetchEventsTxE of - Left err -> do - liftIO $ L.unLogger logger $ EventInternalErr err - return [] - Right fetchEventsTx -> - runPgSourceWriteTx sourceConfig fetchEventsTx >>= \case + liftIO . fmap concat $ + for (M.toList pgSources) \(sourceName, sourceCache) -> concat . toList <$> + for (unsafeSourceTables @('Postgres 'Vanilla) sourceCache) \tables -> liftIO do + -- count the number of event triggers on tables in this source + let eventTriggerCount = sum (M.size . _tiEventTriggerInfoMap <$> tables) + + -- only process events for this source if at least one event trigger exists + if eventTriggerCount > 0 then fmap (concat . toList) $ + for (unsafeSourceConfiguration @('Postgres 'Vanilla) sourceCache) \sourceConfig -> do + fetchEventsTxE <- + case maintenanceMode of + MaintenanceModeEnabled -> do + maintenanceModeVersion <- runPgSourceReadTx sourceConfig getMaintenanceModeVersion + pure $ fmap (fetchEventsMaintenanceMode sourceName fetchBatchSize) maintenanceModeVersion + MaintenanceModeDisabled -> return $ Right $ fetchEvents sourceName fetchBatchSize + liftIO $ do + case fetchEventsTxE of Left err -> do liftIO $ L.unLogger logger $ EventInternalErr err return [] - Right events -> do - -- Track number of events fetched in EKG - _ <- liftIO $ EKG.Distribution.add (smNumEventsFetchedPerBatch serverMetrics) (fromIntegral $ length events) - -- The time when the events were fetched. This is used to calculate the average lock time of an event. - eventsFetchedTime <- liftIO getCurrentTime - saveLockedEvents (map eId events) leEvents - return $ map (, sourceConfig, eventsFetchedTime) events + Right fetchEventsTx -> + runPgSourceWriteTx sourceConfig fetchEventsTx >>= \case + Left err -> do + liftIO $ L.unLogger logger $ EventInternalErr err + return [] + Right events -> do + -- Track number of events fetched in EKG + _ <- liftIO $ EKG.Distribution.add (smNumEventsFetchedPerBatch serverMetrics) (fromIntegral $ length events) + -- The time when the events were fetched. This is used to calculate the average lock time of an event. + eventsFetchedTime <- liftIO getCurrentTime + saveLockedEvents (map eId events) leEvents + return $ map (, sourceConfig, eventsFetchedTime) events + else pure [] -- !!! CAREFUL !!! -- The logic here in particular is subtle and has been fixed, broken, diff --git a/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs b/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs index daf0f6a4c26..2ad44f93a4d 100644 --- a/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs +++ b/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs @@ -27,6 +27,7 @@ import qualified Data.HashMap.Strict.InsOrd as OMap import qualified Data.HashSet as HS import qualified Data.HashSet.InsOrd as HSIns import qualified Data.Set as S +import qualified Database.PG.Query as Q import qualified Language.GraphQL.Draft.Syntax as G import Control.Arrow.Extended @@ -40,9 +41,11 @@ import Network.HTTP.Client.Extended hiding (Proxy) import qualified Hasura.Incremental as Inc import qualified Hasura.SQL.AnyBackend as AB +import qualified Hasura.SQL.Tag as Tag import qualified Hasura.Tracing as Tracing import Hasura.Backends.Postgres.Connection +import Hasura.Backends.Postgres.DDL.Source (initCatalogForSource) import Hasura.Base.Error import Hasura.GraphQL.Execute.Types import Hasura.GraphQL.Schema (buildGQLContext) @@ -268,7 +271,6 @@ buildSchemaCacheRule env = proc (metadata, invalidationKeys) -> do , MonadIO m, MonadBaseControl IO m , MonadResolveSource m , BackendMetadata b - , HasServerConfigCtx m ) => ( Inc.Dependency (HashMap SourceName Inc.InvalidationKey) , SourceMetadata b @@ -276,15 +278,45 @@ buildSchemaCacheRule env = proc (metadata, invalidationKeys) -> do resolveSourceIfNeeded = Inc.cache proc (invalidationKeys, sourceMetadata) -> do let sourceName = _smName sourceMetadata metadataObj = MetadataObject (MOSource sourceName) $ toJSON sourceName - maintenanceMode <- bindA -< _sccMaintenanceMode <$> askServerConfigCtx maybeSourceConfig <- getSourceConfigIfNeeded @b -< (invalidationKeys, sourceName, _smConfiguration sourceMetadata) case maybeSourceConfig of Nothing -> returnA -< Nothing Just sourceConfig -> (| withRecordInconsistency ( - liftEitherA <<< bindA -< resolveDatabaseMetadata sourceConfig maintenanceMode) + liftEitherA <<< bindA -< resolveDatabaseMetadata sourceConfig) |) metadataObj + + -- impl notes (swann): + -- + -- as our cache invalidation key (in a sense) we use the number of event triggers + -- present, rerunning catalog init when this changes. this is correct, because we + -- only care about the transition from zero event triggers to nonzero (not + -- necessarily one, as Anon has observed, because replace_metadata can add multiple + -- event triggers in one go) + -- + -- a future optimisation would be to cache, on a per-source basis, whether or not + -- the event catalog itself exists, and to then trigger catalog init when an event + -- trigger is created _but only if_ this cached information says the event catalog + -- doesn't already exist. + + initCatalogIfNeeded + :: forall b arr m. + ( ArrowChoice arr, Inc.ArrowCache m arr + , MonadIO m, MonadBaseControl IO m + , BackendMetadata b + , HasServerConfigCtx m + ) + => (Int, SourceConfig b) `arr` () + initCatalogIfNeeded = Inc.cache proc (numEventTriggers, sc) -> do + arrM id -< do + when (numEventTriggers > 0) do + case backendTag @b of + Tag.PostgresVanillaTag -> void do + maintenanceMode <- _sccMaintenanceMode <$> askServerConfigCtx + runExceptT do runLazyTx (_pscExecCtx sc) Q.ReadWrite (initCatalogForSource maintenanceMode) + _ -> pure () + buildSource :: forall b arr m . ( ArrowChoice arr, Inc.ArrowDistribute arr, Inc.ArrowCache m arr @@ -308,6 +340,9 @@ buildSchemaCacheRule env = proc (metadata, invalidationKeys) -> do alignTableMap :: HashMap (TableName b) a -> HashMap (TableName b) c -> HashMap (TableName b) (a, c) alignTableMap = M.intersectionWith (,) metadataInvalidationKey = Inc.selectD #_ikMetadata invalidationKeys + numEventTriggers = sum $ map (length . snd) eventTriggers + + initCatalogIfNeeded @b -< (numEventTriggers, sourceConfig) -- tables tableRawInfos <- buildTableCache -< ( source, sourceConfig, dbTables diff --git a/server/src-lib/Hasura/RQL/Types/Metadata/Backend.hs b/server/src-lib/Hasura/RQL/Types/Metadata/Backend.hs index 4dbf8a9a3d5..979ecd250c0 100644 --- a/server/src-lib/Hasura/RQL/Types/Metadata/Backend.hs +++ b/server/src-lib/Hasura/RQL/Types/Metadata/Backend.hs @@ -64,7 +64,6 @@ class (Backend b) => BackendMetadata (b :: BackendType) where resolveDatabaseMetadata :: (MonadIO m, MonadBaseControl IO m, MonadResolveSource m) => SourceConfig b - -> MaintenanceMode -> m (Either QErr (ResolvedSource b)) createTableEventTrigger diff --git a/server/src-lib/Hasura/SQL/Tag.hs b/server/src-lib/Hasura/SQL/Tag.hs index 1340d64f406..0ed728174a9 100644 --- a/server/src-lib/Hasura/SQL/Tag.hs +++ b/server/src-lib/Hasura/SQL/Tag.hs @@ -62,8 +62,9 @@ $(concat <$> forEachBackend \b -> do -- is generated with Template Haskell for each 'Backend'. The case -- switch looks like this: -- --- PostgresTag -> Postgres --- MSSQLTag -> MSSQL +-- PostgresVanillaTag -> Postgres Vanilla +-- PostgresCitusTag -> Postgres Citus +-- MSSQLTag -> MSSQL -- ... reify :: BackendTag b -> BackendType reify t = $(backendCase diff --git a/server/src-rsr/drop_pg_source.sql b/server/src-rsr/drop_pg_source.sql index a0079defb11..7ece0289033 100644 --- a/server/src-rsr/drop_pg_source.sql +++ b/server/src-rsr/drop_pg_source.sql @@ -1,4 +1,4 @@ -DROP TABLE hdb_catalog.hdb_source_catalog_version; -DROP FUNCTION hdb_catalog.insert_event_log(text, text, text, text, json); -DROP TABLE hdb_catalog.event_invocation_logs; -DROP TABLE hdb_catalog.event_log; +DROP TABLE IF EXISTS hdb_catalog.hdb_source_catalog_version; +DROP FUNCTION IF EXISTS hdb_catalog.insert_event_log(text, text, text, text, json); +DROP TABLE IF EXISTS hdb_catalog.event_invocation_logs; +DROP TABLE IF EXISTS hdb_catalog.event_log;