mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-14 08:02:15 +03:00
server: lazy event catalog initialisation
event catalog: - `hdb_catalog` is no longer automatically created - catalog is initialised when the first event trigger is created - catalog initialisation is done during the schema cache build, using `ArrowCache` so it is only run in response to a change to the set of event triggers event queue: - `processEventQueue` thread is prevented from starting when `HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL=0` - `processEventQueue` thread only processes sources for which at least one event trigger exists in some table in the source Co-authored-by: Anon Ray <616387+ecthiender@users.noreply.github.com> GitOrigin-RevId: 73f256465d62490cd2b59dcd074718679993d4fe
This commit is contained in:
parent
3ad4089126
commit
d3611af58d
@ -10,6 +10,7 @@
|
||||
|
||||
### Bug fixes and improvements
|
||||
|
||||
- server: initialise `hdb_catalog` tables only when required, and only run the event loop for sources where it is required
|
||||
- server: fix a bug where remote schema permissions would fail when used in conjunction with query variables (fix #6656)
|
||||
- server: add `rename_source` metadata API (fix #6681)
|
||||
- server: fix subscriptions with session argument in user-defined function (fix #6657)
|
||||
|
@ -27,7 +27,6 @@ import Hasura.RQL.Types.Common
|
||||
import Hasura.RQL.Types.Source
|
||||
import Hasura.RQL.Types.Table
|
||||
import Hasura.SQL.Backend
|
||||
import Hasura.Server.Types (MaintenanceMode)
|
||||
|
||||
|
||||
resolveSourceConfig ::
|
||||
@ -53,9 +52,8 @@ resolveSourceConfig _name BigQueryConnSourceConfig{..} = runExceptT $ do
|
||||
resolveSource
|
||||
:: (MonadIO m)
|
||||
=> BigQuerySourceConfig
|
||||
-> MaintenanceMode
|
||||
-> m (Either QErr (ResolvedSource 'BigQuery))
|
||||
resolveSource sourceConfig _maintenanceMode =
|
||||
resolveSource sourceConfig =
|
||||
runExceptT $ do
|
||||
result <- getTables sourceConfig
|
||||
case result of
|
||||
|
@ -13,7 +13,6 @@ import Hasura.Base.Error
|
||||
import Hasura.RQL.Types.Common
|
||||
import Hasura.RQL.Types.Source
|
||||
import Hasura.SQL.Backend
|
||||
import Hasura.Server.Types (MaintenanceMode)
|
||||
|
||||
resolveSourceConfig
|
||||
:: (MonadIO m)
|
||||
@ -27,9 +26,8 @@ resolveSourceConfig _name (MSSQLConnConfiguration connInfo) = runExceptT do
|
||||
resolveDatabaseMetadata
|
||||
:: (MonadIO m)
|
||||
=> MSSQLSourceConfig
|
||||
-> MaintenanceMode
|
||||
-> m (Either QErr (ResolvedSource 'MSSQL))
|
||||
resolveDatabaseMetadata config _maintenanceMode = runExceptT do
|
||||
resolveDatabaseMetadata config = runExceptT do
|
||||
dbTablesMetadata <- loadDBMetadata pool
|
||||
pure $ ResolvedSource config dbTablesMetadata mempty mempty
|
||||
where
|
||||
|
@ -3,7 +3,7 @@ module Hasura.Backends.Postgres.DDL.Source
|
||||
, fetchFunctionMetadata
|
||||
, fetchPgScalars
|
||||
, fetchTableMetadata
|
||||
, initSource
|
||||
, initCatalogForSource
|
||||
, postDropSourceHook
|
||||
, resolveDatabaseMetadata
|
||||
, resolveSourceConfig
|
||||
@ -54,18 +54,18 @@ resolveSourceConfig name config = runExceptT do
|
||||
resolveDatabaseMetadata
|
||||
:: forall pgKind m
|
||||
. (Backend ('Postgres pgKind), ToMetadataFetchQuery pgKind, MonadIO m, MonadBaseControl IO m)
|
||||
=> SourceConfig ('Postgres pgKind) -> MaintenanceMode -> m (Either QErr (ResolvedSource ('Postgres pgKind)))
|
||||
resolveDatabaseMetadata sourceConfig maintenanceMode = runExceptT do
|
||||
(tablesMeta, functionsMeta, pgScalars) <- runLazyTx (_pscExecCtx sourceConfig) Q.ReadWrite $ do
|
||||
initSource maintenanceMode
|
||||
=> SourceConfig ('Postgres pgKind) -> m (Either QErr (ResolvedSource ('Postgres pgKind)))
|
||||
resolveDatabaseMetadata sourceConfig = runExceptT do
|
||||
(tablesMeta, functionsMeta, pgScalars) <- runLazyTx (_pscExecCtx sourceConfig) Q.ReadOnly $ do
|
||||
tablesMeta <- fetchTableMetadata
|
||||
functionsMeta <- fetchFunctionMetadata
|
||||
pgScalars <- fetchPgScalars
|
||||
pure (tablesMeta, functionsMeta, pgScalars)
|
||||
pure $ ResolvedSource sourceConfig tablesMeta functionsMeta pgScalars
|
||||
|
||||
initSource :: MonadTx m => MaintenanceMode -> m ()
|
||||
initSource maintenanceMode = do
|
||||
-- | Initialise catalog tables for a source, including those required by the event delivery subsystem.
|
||||
initCatalogForSource :: MonadTx m => MaintenanceMode -> m ()
|
||||
initCatalogForSource maintenanceMode = do
|
||||
hdbCatalogExist <- doesSchemaExist "hdb_catalog"
|
||||
eventLogTableExist <- doesTableExist "hdb_catalog" "event_log"
|
||||
sourceVersionTableExist <- doesTableExist "hdb_catalog" "hdb_source_catalog_version"
|
||||
@ -185,19 +185,53 @@ postDropSourceHook
|
||||
=> PGSourceConfig -> m ()
|
||||
postDropSourceHook sourceConfig = do
|
||||
-- Clean traces of Hasura in source database
|
||||
--
|
||||
-- There are three type of database we have to consider here, which we
|
||||
-- refer to as types 1, 2, and 3 below:
|
||||
-- 1. default postgres source (no separate metadata database)
|
||||
-- In this case, we want to drop nothing.
|
||||
--
|
||||
-- 2. dedicated metadata database
|
||||
-- In this case, we want to only drop source-related tables ("event_log",
|
||||
-- "hdb_source_catalog_version", etc), leaving the rest of the schema intact.
|
||||
--
|
||||
-- 3. non-default postgres source (necessarily without metadata tables)
|
||||
-- In this case, we want to drop the entire "hdb_catalog" schema.
|
||||
liftEitherM $ runPgSourceWriteTx sourceConfig $ do
|
||||
hdbMetadataTableExist <- doesTableExist "hdb_catalog" "hdb_metadata"
|
||||
eventLogTableExist <- doesTableExist "hdb_catalog" "event_log"
|
||||
-- If "hdb_metadata" and "event_log" tables found in the "hdb_catalog" schema
|
||||
-- then this infers the source is being used as default potgres source (--database-url option).
|
||||
-- In this case don't drop any thing in the catalog schema.
|
||||
if | hdbMetadataTableExist && eventLogTableExist -> pure ()
|
||||
-- Otherwise, if only "hdb_metadata" table exist, then this infers the source is
|
||||
-- being used as metadata storage (--metadata-database-url option). In this case
|
||||
-- drop only source related tables and not "hdb_catalog" schema
|
||||
| hdbMetadataTableExist ->
|
||||
Q.multiQE defaultTxErrorHandler $(makeRelativeToProject "src-rsr/drop_pg_source.sql" >>= Q.sqlFromFile)
|
||||
-- Otherwise, drop "hdb_catalog" schema.
|
||||
if
|
||||
-- If "hdb_metadata" and "event_log" tables are found in the "hdb_catalog" schema,
|
||||
-- then this implies the source is being used as the default postgres source, i.e.
|
||||
-- this is a default postgres source (type 1 above).
|
||||
-- In this case we don't drop anything in the catalog schema.
|
||||
| hdbMetadataTableExist && eventLogTableExist -> pure ()
|
||||
-- However, it is possible that the above condition is not met for a default
|
||||
-- postgres source. This will happen if no event triggers have been defined,
|
||||
-- because we initialise event catalog tables only when required (i.e. when
|
||||
-- a trigger is defined).
|
||||
--
|
||||
-- This could lead to a possible problem where "hdb_metadata" exists, "event_log"
|
||||
-- does not exist, but the _other_ source-related tables exist. In that case, we
|
||||
-- would end up dropping them here, which would go against our requirements above.
|
||||
-- However, observe that these tables are always all created or destroyed together,
|
||||
-- in single transactions where we run setup/teardown SQL files, so this condition
|
||||
-- is guaranteed to not take place.
|
||||
--
|
||||
-- So if only "hdb_metadata" exists, we have one of two possible cases:
|
||||
-- * this is a metadata database (type 2) and we can drop all source-related tables
|
||||
-- * this is a default database (type 1) which has no source-related tables (because
|
||||
-- it has no "event_log" table, it cannot have the others, because of the previous
|
||||
-- argument)
|
||||
--
|
||||
-- It should be clear that we can now safely issue DROP IF EXISTS statements for
|
||||
-- all source-related tables now according to the spec above. The IF EXISTS lets us
|
||||
-- handle both cases uniformly, doing nothing in the second case, and for metadata
|
||||
-- databases, we drop only source-related tables from the database's "hdb_catalog" schema.
|
||||
| hdbMetadataTableExist -> Q.multiQE
|
||||
defaultTxErrorHandler $(makeRelativeToProject "src-rsr/drop_pg_source.sql" >>= Q.sqlFromFile)
|
||||
-- Otherwise, we have a non-default postgres source, which has no metadata tables.
|
||||
-- We drop the entire "hdb_catalog" schema as discussed above.
|
||||
| otherwise -> dropHdbCatalogSchema
|
||||
|
||||
-- Destory postgres source connection
|
||||
|
@ -260,33 +260,39 @@ processEventQueue logger logenv httpMgr getSchemaCache EventEngineCtx{..} Locked
|
||||
(delivered=t or error=t or archived=t) after a fixed number of tries (assuming it begins with locked='f').
|
||||
-}
|
||||
pgSources <- scSources <$> liftIO getSchemaCache
|
||||
liftIO $ fmap concat $ forM (M.toList pgSources) $ \(sourceName, sourceCache) ->
|
||||
case unsafeSourceConfiguration @('Postgres 'Vanilla) sourceCache of
|
||||
Nothing -> pure []
|
||||
Just sourceConfig -> do
|
||||
fetchEventsTxE <-
|
||||
case maintenanceMode of
|
||||
MaintenanceModeEnabled -> do
|
||||
maintenanceModeVersion <- runPgSourceReadTx sourceConfig getMaintenanceModeVersion
|
||||
pure $ fmap (fetchEventsMaintenanceMode sourceName fetchBatchSize) maintenanceModeVersion
|
||||
MaintenanceModeDisabled -> return $ Right $ fetchEvents sourceName fetchBatchSize
|
||||
liftIO $ do
|
||||
case fetchEventsTxE of
|
||||
Left err -> do
|
||||
liftIO $ L.unLogger logger $ EventInternalErr err
|
||||
return []
|
||||
Right fetchEventsTx ->
|
||||
runPgSourceWriteTx sourceConfig fetchEventsTx >>= \case
|
||||
liftIO . fmap concat $
|
||||
for (M.toList pgSources) \(sourceName, sourceCache) -> concat . toList <$>
|
||||
for (unsafeSourceTables @('Postgres 'Vanilla) sourceCache) \tables -> liftIO do
|
||||
-- count the number of event triggers on tables in this source
|
||||
let eventTriggerCount = sum (M.size . _tiEventTriggerInfoMap <$> tables)
|
||||
|
||||
-- only process events for this source if at least one event trigger exists
|
||||
if eventTriggerCount > 0 then fmap (concat . toList) $
|
||||
for (unsafeSourceConfiguration @('Postgres 'Vanilla) sourceCache) \sourceConfig -> do
|
||||
fetchEventsTxE <-
|
||||
case maintenanceMode of
|
||||
MaintenanceModeEnabled -> do
|
||||
maintenanceModeVersion <- runPgSourceReadTx sourceConfig getMaintenanceModeVersion
|
||||
pure $ fmap (fetchEventsMaintenanceMode sourceName fetchBatchSize) maintenanceModeVersion
|
||||
MaintenanceModeDisabled -> return $ Right $ fetchEvents sourceName fetchBatchSize
|
||||
liftIO $ do
|
||||
case fetchEventsTxE of
|
||||
Left err -> do
|
||||
liftIO $ L.unLogger logger $ EventInternalErr err
|
||||
return []
|
||||
Right events -> do
|
||||
-- Track number of events fetched in EKG
|
||||
_ <- liftIO $ EKG.Distribution.add (smNumEventsFetchedPerBatch serverMetrics) (fromIntegral $ length events)
|
||||
-- The time when the events were fetched. This is used to calculate the average lock time of an event.
|
||||
eventsFetchedTime <- liftIO getCurrentTime
|
||||
saveLockedEvents (map eId events) leEvents
|
||||
return $ map (, sourceConfig, eventsFetchedTime) events
|
||||
Right fetchEventsTx ->
|
||||
runPgSourceWriteTx sourceConfig fetchEventsTx >>= \case
|
||||
Left err -> do
|
||||
liftIO $ L.unLogger logger $ EventInternalErr err
|
||||
return []
|
||||
Right events -> do
|
||||
-- Track number of events fetched in EKG
|
||||
_ <- liftIO $ EKG.Distribution.add (smNumEventsFetchedPerBatch serverMetrics) (fromIntegral $ length events)
|
||||
-- The time when the events were fetched. This is used to calculate the average lock time of an event.
|
||||
eventsFetchedTime <- liftIO getCurrentTime
|
||||
saveLockedEvents (map eId events) leEvents
|
||||
return $ map (, sourceConfig, eventsFetchedTime) events
|
||||
else pure []
|
||||
|
||||
-- !!! CAREFUL !!!
|
||||
-- The logic here in particular is subtle and has been fixed, broken,
|
||||
|
@ -27,6 +27,7 @@ import qualified Data.HashMap.Strict.InsOrd as OMap
|
||||
import qualified Data.HashSet as HS
|
||||
import qualified Data.HashSet.InsOrd as HSIns
|
||||
import qualified Data.Set as S
|
||||
import qualified Database.PG.Query as Q
|
||||
import qualified Language.GraphQL.Draft.Syntax as G
|
||||
|
||||
import Control.Arrow.Extended
|
||||
@ -40,9 +41,11 @@ import Network.HTTP.Client.Extended hiding (Proxy)
|
||||
|
||||
import qualified Hasura.Incremental as Inc
|
||||
import qualified Hasura.SQL.AnyBackend as AB
|
||||
import qualified Hasura.SQL.Tag as Tag
|
||||
import qualified Hasura.Tracing as Tracing
|
||||
|
||||
import Hasura.Backends.Postgres.Connection
|
||||
import Hasura.Backends.Postgres.DDL.Source (initCatalogForSource)
|
||||
import Hasura.Base.Error
|
||||
import Hasura.GraphQL.Execute.Types
|
||||
import Hasura.GraphQL.Schema (buildGQLContext)
|
||||
@ -268,7 +271,6 @@ buildSchemaCacheRule env = proc (metadata, invalidationKeys) -> do
|
||||
, MonadIO m, MonadBaseControl IO m
|
||||
, MonadResolveSource m
|
||||
, BackendMetadata b
|
||||
, HasServerConfigCtx m
|
||||
)
|
||||
=> ( Inc.Dependency (HashMap SourceName Inc.InvalidationKey)
|
||||
, SourceMetadata b
|
||||
@ -276,15 +278,45 @@ buildSchemaCacheRule env = proc (metadata, invalidationKeys) -> do
|
||||
resolveSourceIfNeeded = Inc.cache proc (invalidationKeys, sourceMetadata) -> do
|
||||
let sourceName = _smName sourceMetadata
|
||||
metadataObj = MetadataObject (MOSource sourceName) $ toJSON sourceName
|
||||
maintenanceMode <- bindA -< _sccMaintenanceMode <$> askServerConfigCtx
|
||||
maybeSourceConfig <- getSourceConfigIfNeeded @b -< (invalidationKeys, sourceName, _smConfiguration sourceMetadata)
|
||||
case maybeSourceConfig of
|
||||
Nothing -> returnA -< Nothing
|
||||
Just sourceConfig ->
|
||||
(| withRecordInconsistency (
|
||||
liftEitherA <<< bindA -< resolveDatabaseMetadata sourceConfig maintenanceMode)
|
||||
liftEitherA <<< bindA -< resolveDatabaseMetadata sourceConfig)
|
||||
|) metadataObj
|
||||
|
||||
|
||||
-- impl notes (swann):
|
||||
--
|
||||
-- as our cache invalidation key (in a sense) we use the number of event triggers
|
||||
-- present, rerunning catalog init when this changes. this is correct, because we
|
||||
-- only care about the transition from zero event triggers to nonzero (not
|
||||
-- necessarily one, as Anon has observed, because replace_metadata can add multiple
|
||||
-- event triggers in one go)
|
||||
--
|
||||
-- a future optimisation would be to cache, on a per-source basis, whether or not
|
||||
-- the event catalog itself exists, and to then trigger catalog init when an event
|
||||
-- trigger is created _but only if_ this cached information says the event catalog
|
||||
-- doesn't already exist.
|
||||
|
||||
initCatalogIfNeeded
|
||||
:: forall b arr m.
|
||||
( ArrowChoice arr, Inc.ArrowCache m arr
|
||||
, MonadIO m, MonadBaseControl IO m
|
||||
, BackendMetadata b
|
||||
, HasServerConfigCtx m
|
||||
)
|
||||
=> (Int, SourceConfig b) `arr` ()
|
||||
initCatalogIfNeeded = Inc.cache proc (numEventTriggers, sc) -> do
|
||||
arrM id -< do
|
||||
when (numEventTriggers > 0) do
|
||||
case backendTag @b of
|
||||
Tag.PostgresVanillaTag -> void do
|
||||
maintenanceMode <- _sccMaintenanceMode <$> askServerConfigCtx
|
||||
runExceptT do runLazyTx (_pscExecCtx sc) Q.ReadWrite (initCatalogForSource maintenanceMode)
|
||||
_ -> pure ()
|
||||
|
||||
buildSource
|
||||
:: forall b arr m
|
||||
. ( ArrowChoice arr, Inc.ArrowDistribute arr, Inc.ArrowCache m arr
|
||||
@ -308,6 +340,9 @@ buildSchemaCacheRule env = proc (metadata, invalidationKeys) -> do
|
||||
alignTableMap :: HashMap (TableName b) a -> HashMap (TableName b) c -> HashMap (TableName b) (a, c)
|
||||
alignTableMap = M.intersectionWith (,)
|
||||
metadataInvalidationKey = Inc.selectD #_ikMetadata invalidationKeys
|
||||
numEventTriggers = sum $ map (length . snd) eventTriggers
|
||||
|
||||
initCatalogIfNeeded @b -< (numEventTriggers, sourceConfig)
|
||||
|
||||
-- tables
|
||||
tableRawInfos <- buildTableCache -< ( source, sourceConfig, dbTables
|
||||
|
@ -64,7 +64,6 @@ class (Backend b) => BackendMetadata (b :: BackendType) where
|
||||
resolveDatabaseMetadata
|
||||
:: (MonadIO m, MonadBaseControl IO m, MonadResolveSource m)
|
||||
=> SourceConfig b
|
||||
-> MaintenanceMode
|
||||
-> m (Either QErr (ResolvedSource b))
|
||||
|
||||
createTableEventTrigger
|
||||
|
@ -62,8 +62,9 @@ $(concat <$> forEachBackend \b -> do
|
||||
-- is generated with Template Haskell for each 'Backend'. The case
|
||||
-- switch looks like this:
|
||||
--
|
||||
-- PostgresTag -> Postgres
|
||||
-- MSSQLTag -> MSSQL
|
||||
-- PostgresVanillaTag -> Postgres Vanilla
|
||||
-- PostgresCitusTag -> Postgres Citus
|
||||
-- MSSQLTag -> MSSQL
|
||||
-- ...
|
||||
reify :: BackendTag b -> BackendType
|
||||
reify t = $(backendCase
|
||||
|
@ -1,4 +1,4 @@
|
||||
DROP TABLE hdb_catalog.hdb_source_catalog_version;
|
||||
DROP FUNCTION hdb_catalog.insert_event_log(text, text, text, text, json);
|
||||
DROP TABLE hdb_catalog.event_invocation_logs;
|
||||
DROP TABLE hdb_catalog.event_log;
|
||||
DROP TABLE IF EXISTS hdb_catalog.hdb_source_catalog_version;
|
||||
DROP FUNCTION IF EXISTS hdb_catalog.insert_event_log(text, text, text, text, json);
|
||||
DROP TABLE IF EXISTS hdb_catalog.event_invocation_logs;
|
||||
DROP TABLE IF EXISTS hdb_catalog.event_log;
|
||||
|
Loading…
Reference in New Issue
Block a user