mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-14 17:02:49 +03:00
server/gardening: refactor scMetadataResourceVersion
in SchemaCache
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/8377 GitOrigin-RevId: 999a5112f8940b267c2765f4bed39bf6151f18f0
This commit is contained in:
parent
bc7225fd1a
commit
bfca9fd986
@ -366,7 +366,7 @@ resolvePostgresConnInfo env dbUrlConf (fromMaybe 1 -> retries) = do
|
||||
-- but that aren't needed in the rest of the application.
|
||||
data AppInit = AppInit
|
||||
{ aiTLSAllowListRef :: TLSAllowListRef Hasura,
|
||||
aiMetadata :: Metadata
|
||||
aiMetadataWithResourceVersion :: MetadataWithResourceVersion
|
||||
}
|
||||
|
||||
-- | Initializes or migrates the catalog and creates the 'AppEnv' required to
|
||||
@ -416,7 +416,7 @@ initialiseAppEnv env BasicConnectionInfo {..} serveOptions@ServeOptions {..} liv
|
||||
(liftIO . PG.destroyPGPool)
|
||||
|
||||
-- Migrate the catalog and fetch the metdata.
|
||||
metadata <-
|
||||
metadataWithVersion <-
|
||||
lift $
|
||||
flip onException (flushLogger loggerCtx) $
|
||||
migrateCatalogAndFetchMetadata
|
||||
@ -427,6 +427,7 @@ initialiseAppEnv env BasicConnectionInfo {..} serveOptions@ServeOptions {..} liv
|
||||
soExtensionsSchema
|
||||
|
||||
-- Create the TLSAllowListRef and the HTTP Manager.
|
||||
let metadata = _mwrvMetadata metadataWithVersion
|
||||
tlsAllowListRef <- liftIO $ createTLSAllowListRef $ networkTlsAllowlist $ _metaNetwork metadata
|
||||
httpManager <- liftIO $ mkHttpManager (readTLSAllowList tlsAllowListRef) mempty
|
||||
|
||||
@ -454,7 +455,7 @@ initialiseAppEnv env BasicConnectionInfo {..} serveOptions@ServeOptions {..} liv
|
||||
pure
|
||||
( AppInit
|
||||
{ aiTLSAllowListRef = tlsAllowListRef,
|
||||
aiMetadata = metadata
|
||||
aiMetadataWithResourceVersion = metadataWithVersion
|
||||
},
|
||||
AppEnv
|
||||
{ appEnvPort = soPort,
|
||||
@ -524,7 +525,7 @@ initialiseAppContext env serveOptions@ServeOptions {..} AppInit {..} = do
|
||||
serverConfigCtx
|
||||
(mkPgSourceResolver pgLogger)
|
||||
mkMSSQLSourceResolver
|
||||
aiMetadata
|
||||
aiMetadataWithResourceVersion
|
||||
appEnvManager
|
||||
|
||||
-- Build the RebuildableAppContext.
|
||||
@ -546,7 +547,7 @@ migrateCatalogAndFetchMetadata ::
|
||||
Maybe (SourceConnConfiguration ('Postgres 'Vanilla)) ->
|
||||
MaintenanceMode () ->
|
||||
ExtensionsSchema ->
|
||||
m Metadata
|
||||
m MetadataWithResourceVersion
|
||||
migrateCatalogAndFetchMetadata
|
||||
logger
|
||||
pool
|
||||
@ -575,9 +576,9 @@ migrateCatalogAndFetchMetadata
|
||||
slInfo = A.toJSON err
|
||||
}
|
||||
liftIO (throwErrJExit DatabaseMigrationError err)
|
||||
Right (migrationResult, metadata) -> do
|
||||
Right (migrationResult, metadataWithVersion) -> do
|
||||
unLogger logger migrationResult
|
||||
pure metadata
|
||||
pure metadataWithVersion
|
||||
|
||||
-- | Build the original 'RebuildableSchemaCache'.
|
||||
--
|
||||
@ -591,7 +592,7 @@ buildFirstSchemaCache ::
|
||||
ServerConfigCtx ->
|
||||
SourceResolver ('Postgres 'Vanilla) ->
|
||||
SourceResolver ('MSSQL) ->
|
||||
Metadata ->
|
||||
MetadataWithResourceVersion ->
|
||||
HTTP.Manager ->
|
||||
m RebuildableSchemaCache
|
||||
buildFirstSchemaCache
|
||||
@ -600,14 +601,14 @@ buildFirstSchemaCache
|
||||
serverConfigCtx
|
||||
pgSourceResolver
|
||||
mssqlSourceResolver
|
||||
metadata
|
||||
metadataWithVersion
|
||||
httpManager = do
|
||||
let cacheBuildParams = CacheBuildParams httpManager pgSourceResolver mssqlSourceResolver
|
||||
buildReason = CatalogSync
|
||||
result <-
|
||||
runExceptT $
|
||||
runCacheBuild cacheBuildParams $
|
||||
buildRebuildableSchemaCacheWithReason buildReason logger env metadata serverConfigCtx
|
||||
buildRebuildableSchemaCacheWithReason buildReason logger env metadataWithVersion serverConfigCtx
|
||||
result `onLeft` \err -> do
|
||||
-- TODO: we used to bundle the first schema cache build with the catalog
|
||||
-- migration, using the same error handler for both, meaning that an
|
||||
|
@ -86,7 +86,7 @@ TODO: Reference to open issue or rfc?
|
||||
class Monad m => MonadMetadataStorage m where
|
||||
-- Metadata
|
||||
fetchMetadataResourceVersion :: m (Either QErr MetadataResourceVersion)
|
||||
fetchMetadata :: m (Either QErr (Metadata, MetadataResourceVersion))
|
||||
fetchMetadata :: m (Either QErr MetadataWithResourceVersion)
|
||||
fetchMetadataNotifications :: MetadataResourceVersion -> InstanceId -> m (Either QErr [(MetadataResourceVersion, CacheInvalidations)])
|
||||
setMetadata :: MetadataResourceVersion -> Metadata -> m (Either QErr MetadataResourceVersion)
|
||||
notifySchemaCacheSync :: MetadataResourceVersion -> InstanceId -> CacheInvalidations -> m (Either QErr ())
|
||||
|
@ -141,7 +141,7 @@ action/function.
|
||||
buildRebuildableSchemaCache ::
|
||||
Logger Hasura ->
|
||||
Env.Environment ->
|
||||
Metadata ->
|
||||
MetadataWithResourceVersion ->
|
||||
ServerConfigCtx ->
|
||||
CacheBuild RebuildableSchemaCache
|
||||
buildRebuildableSchemaCache =
|
||||
@ -151,13 +151,13 @@ buildRebuildableSchemaCacheWithReason ::
|
||||
BuildReason ->
|
||||
Logger Hasura ->
|
||||
Env.Environment ->
|
||||
Metadata ->
|
||||
MetadataWithResourceVersion ->
|
||||
ServerConfigCtx ->
|
||||
CacheBuild RebuildableSchemaCache
|
||||
buildRebuildableSchemaCacheWithReason reason logger env metadata serverConfigCtx = do
|
||||
buildRebuildableSchemaCacheWithReason reason logger env metadataWithVersion serverConfigCtx = do
|
||||
result <-
|
||||
flip runReaderT reason $
|
||||
Inc.build (buildSchemaCacheRule logger env) (metadata, serverConfigCtx, initialInvalidationKeys, Nothing)
|
||||
Inc.build (buildSchemaCacheRule logger env) (metadataWithVersion, serverConfigCtx, initialInvalidationKeys, Nothing)
|
||||
|
||||
pure $ RebuildableSchemaCache (Inc.result result) initialInvalidationKeys (Inc.rebuildRule result)
|
||||
|
||||
@ -230,13 +230,13 @@ instance
|
||||
buildSchemaCacheWithOptions buildReason invalidations metadata = CacheRWT do
|
||||
serverConfigCtx <- ask
|
||||
(RebuildableSchemaCache lastBuiltSC invalidationKeys rule, oldInvalidations) <- get
|
||||
let metadataVersion = scMetadataResourceVersion lastBuiltSC
|
||||
let metadataWithVersion = MetadataWithResourceVersion metadata $ scMetadataResourceVersion lastBuiltSC
|
||||
newInvalidationKeys = invalidateKeys invalidations invalidationKeys
|
||||
result <-
|
||||
runCacheBuildM $
|
||||
flip runReaderT buildReason $
|
||||
Inc.build rule (metadata, serverConfigCtx, newInvalidationKeys, Nothing)
|
||||
let schemaCache = (Inc.result result) {scMetadataResourceVersion = metadataVersion}
|
||||
Inc.build rule (metadataWithVersion, serverConfigCtx, newInvalidationKeys, Nothing)
|
||||
let schemaCache = Inc.result result
|
||||
prunedInvalidationKeys = pruneInvalidationKeys schemaCache newInvalidationKeys
|
||||
!newCache = RebuildableSchemaCache schemaCache prunedInvalidationKeys (Inc.rebuildRule result)
|
||||
!newInvalidations = oldInvalidations <> invalidations
|
||||
@ -254,7 +254,7 @@ instance
|
||||
( rebuildableSchemaCache
|
||||
{ lastBuiltSchemaCache =
|
||||
(lastBuiltSchemaCache rebuildableSchemaCache)
|
||||
{ scMetadataResourceVersion = Just resourceVersion
|
||||
{ scMetadataResourceVersion = resourceVersion
|
||||
}
|
||||
},
|
||||
invalidations
|
||||
@ -334,8 +334,8 @@ buildSchemaCacheRule ::
|
||||
) =>
|
||||
Logger Hasura ->
|
||||
Env.Environment ->
|
||||
(Metadata, ServerConfigCtx, InvalidationKeys, Maybe StoredIntrospection) `arr` SchemaCache
|
||||
buildSchemaCacheRule logger env = proc (metadataNoDefaults, serverConfigCtx, invalidationKeys, storedIntrospection) -> do
|
||||
(MetadataWithResourceVersion, ServerConfigCtx, InvalidationKeys, Maybe StoredIntrospection) `arr` SchemaCache
|
||||
buildSchemaCacheRule logger env = proc (MetadataWithResourceVersion metadataNoDefaults resourceVersion, serverConfigCtx, invalidationKeys, storedIntrospection) -> do
|
||||
invalidationKeysDep <- Inc.newDependency -< invalidationKeys
|
||||
let metadataDefaults = _sccMetadataDefaults serverConfigCtx
|
||||
metadata@Metadata {..} = overrideMetadataDefaults metadataNoDefaults metadataDefaults
|
||||
@ -426,7 +426,7 @@ buildSchemaCacheRule logger env = proc (metadataNoDefaults, serverConfigCtx, inv
|
||||
<> inconsistentQueryCollections,
|
||||
scApiLimits = _metaApiLimits,
|
||||
scMetricsConfig = _metaMetricsConfig,
|
||||
scMetadataResourceVersion = Nothing,
|
||||
scMetadataResourceVersion = resourceVersion,
|
||||
scSetGraphqlIntrospectionOptions = _metaSetGraphqlIntrospectionOptions,
|
||||
scTlsAllowlist = networkTlsAllowlist _metaNetwork,
|
||||
scQueryCollections = _metaQueryCollections,
|
||||
|
@ -309,7 +309,7 @@ runCacheBuildM m = do
|
||||
data RebuildableSchemaCache = RebuildableSchemaCache
|
||||
{ lastBuiltSchemaCache :: SchemaCache,
|
||||
_rscInvalidationMap :: InvalidationKeys,
|
||||
_rscRebuild :: Inc.Rule (ReaderT BuildReason CacheBuild) (Metadata, ServerConfigCtx, InvalidationKeys, Maybe StoredIntrospection) SchemaCache
|
||||
_rscRebuild :: Inc.Rule (ReaderT BuildReason CacheBuild) (MetadataWithResourceVersion, ServerConfigCtx, InvalidationKeys, Maybe StoredIntrospection) SchemaCache
|
||||
}
|
||||
|
||||
bindErrorA ::
|
||||
|
@ -20,6 +20,7 @@ import Hasura.Prelude
|
||||
import Hasura.RQL.Types.Metadata
|
||||
import Hasura.RQL.Types.SchemaCache
|
||||
( MetadataResourceVersion (..),
|
||||
MetadataWithResourceVersion (..),
|
||||
initialResourceVersion,
|
||||
)
|
||||
import Hasura.RQL.Types.SchemaCache.Build (CacheInvalidations)
|
||||
@ -40,7 +41,7 @@ fetchMetadataFromCatalog = do
|
||||
[Identity (PG.ViaJSON metadata)] -> pure metadata
|
||||
_ -> throw500 "multiple rows in hdb_metadata table"
|
||||
|
||||
fetchMetadataAndResourceVersionFromCatalog :: PG.TxE QErr (Metadata, MetadataResourceVersion)
|
||||
fetchMetadataAndResourceVersionFromCatalog :: PG.TxE QErr MetadataWithResourceVersion
|
||||
fetchMetadataAndResourceVersionFromCatalog = do
|
||||
rows <-
|
||||
PG.withQE
|
||||
@ -50,7 +51,7 @@ fetchMetadataAndResourceVersionFromCatalog = do
|
||||
|]
|
||||
()
|
||||
True
|
||||
case rows of
|
||||
uncurry MetadataWithResourceVersion <$> case rows of
|
||||
[] -> pure (emptyMetadata, initialResourceVersion)
|
||||
[(PG.ViaJSON metadata, resourceVersion)] -> pure (metadata, MetadataResourceVersion resourceVersion)
|
||||
_ -> throw500 "multiple rows in hdb_metadata table"
|
||||
|
@ -102,6 +102,7 @@ module Hasura.RQL.Types.SchemaCache
|
||||
MetadataResourceVersion (..),
|
||||
showMetadataResourceVersion,
|
||||
initialResourceVersion,
|
||||
MetadataWithResourceVersion (..),
|
||||
getLogicalModelBoolExpDeps,
|
||||
getBoolExpDeps,
|
||||
InlinedAllowlist,
|
||||
@ -177,6 +178,12 @@ initialResourceVersion = MetadataResourceVersion 0
|
||||
showMetadataResourceVersion :: MetadataResourceVersion -> Text
|
||||
showMetadataResourceVersion (MetadataResourceVersion version) = tshow version
|
||||
|
||||
data MetadataWithResourceVersion = MetadataWithResourceVersion
|
||||
{ _mwrvMetadata :: Metadata,
|
||||
_mwrvResourceVersion :: MetadataResourceVersion
|
||||
}
|
||||
deriving (Eq)
|
||||
|
||||
mkParentDep ::
|
||||
forall b.
|
||||
Backend b =>
|
||||
@ -553,7 +560,7 @@ data SchemaCache = SchemaCache
|
||||
scEndpoints :: EndpointTrie GQLQueryWithText,
|
||||
scApiLimits :: ApiLimit,
|
||||
scMetricsConfig :: MetricsConfig,
|
||||
scMetadataResourceVersion :: Maybe MetadataResourceVersion,
|
||||
scMetadataResourceVersion :: MetadataResourceVersion,
|
||||
scSetGraphqlIntrospectionOptions :: SetGraphqlIntrospectionOptions,
|
||||
scTlsAllowlist :: [TlsAllow],
|
||||
scQueryCollections :: QueryCollections,
|
||||
|
@ -397,7 +397,7 @@ runMetadataQuery ::
|
||||
runMetadataQuery appContext schemaCache RQLMetadata {..} = do
|
||||
appEnv@AppEnv {..} <- askAppEnv
|
||||
let logger = _lsLogger appEnvLoggers
|
||||
(metadata, currentResourceVersion) <- Tracing.newSpan "fetchMetadata" $ liftEitherM fetchMetadata
|
||||
MetadataWithResourceVersion metadata currentResourceVersion <- Tracing.newSpan "fetchMetadata" $ liftEitherM fetchMetadata
|
||||
let exportsMetadata = \case
|
||||
RMV1 (RMExportMetadata _) -> True
|
||||
RMV2 (RMV2ExportMetadata _) -> True
|
||||
|
@ -51,6 +51,7 @@ import Hasura.RQL.Types.Metadata
|
||||
import Hasura.RQL.Types.Permission
|
||||
import Hasura.RQL.Types.QueryCollection
|
||||
import Hasura.RQL.Types.ScheduledTrigger
|
||||
import Hasura.RQL.Types.SchemaCache
|
||||
import Hasura.RQL.Types.SchemaCache.Build
|
||||
import Hasura.RQL.Types.Source
|
||||
import Hasura.RemoteSchema.MetadataAPI
|
||||
@ -207,7 +208,7 @@ runQuery appContext sc query = do
|
||||
else acMetadataDefaults appContext
|
||||
serverConfigCtx = buildServerConfigCtx appEnv appContext
|
||||
|
||||
(metadata, currentResourceVersion) <- liftEitherM fetchMetadata
|
||||
MetadataWithResourceVersion metadata currentResourceVersion <- liftEitherM fetchMetadata
|
||||
((result, updatedMetadata), updatedCache, invalidations) <-
|
||||
runQueryM (acEnvironment appContext) query
|
||||
-- TODO: remove this straight runReaderT that provides no actual new info
|
||||
|
@ -42,6 +42,7 @@ import Hasura.RQL.DML.Types
|
||||
)
|
||||
import Hasura.RQL.DML.Update
|
||||
import Hasura.RQL.Types.Metadata
|
||||
import Hasura.RQL.Types.SchemaCache (MetadataWithResourceVersion (MetadataWithResourceVersion))
|
||||
import Hasura.RQL.Types.SchemaCache.Build
|
||||
import Hasura.RQL.Types.Source
|
||||
import Hasura.SQL.Backend
|
||||
@ -123,7 +124,7 @@ runQuery appContext schemaCache rqlQuery = do
|
||||
throw400 NotSupported "Cannot run write queries when read-only mode is enabled"
|
||||
|
||||
let serverConfigCtx = buildServerConfigCtx appEnv appContext
|
||||
(metadata, currentResourceVersion) <- Tracing.newSpan "fetchMetadata" $ liftEitherM fetchMetadata
|
||||
MetadataWithResourceVersion metadata currentResourceVersion <- Tracing.newSpan "fetchMetadata" $ liftEitherM fetchMetadata
|
||||
((result, updatedMetadata), updatedCache, invalidations) <-
|
||||
runQueryM (acEnvironment appContext) rqlQuery
|
||||
-- We can use defaults here unconditionally, since there is no MD export function in V2Query
|
||||
|
@ -203,4 +203,4 @@ logInconsistentMetadata logger objs =
|
||||
updateMetadataVersionGauge :: MonadIO m => Gauge -> RebuildableSchemaCache -> m ()
|
||||
updateMetadataVersionGauge metadataVersionGauge schemaCache = do
|
||||
let metadataVersion = scMetadataResourceVersion . lastBuiltSchemaCache $ schemaCache
|
||||
liftIO $ traverse_ (Gauge.set metadataVersionGauge . getMetadataResourceVersion) metadataVersion
|
||||
liftIO $ Gauge.set metadataVersionGauge $ getMetadataResourceVersion metadataVersion
|
||||
|
@ -50,6 +50,7 @@ import Hasura.RQL.Types.CustomTypes
|
||||
import Hasura.RQL.Types.Metadata
|
||||
import Hasura.RQL.Types.Network
|
||||
import Hasura.RQL.Types.OpenTelemetry (emptyOpenTelemetryConfig)
|
||||
import Hasura.RQL.Types.SchemaCache
|
||||
import Hasura.RQL.Types.SourceCustomization
|
||||
import Hasura.SQL.AnyBackend qualified as AB
|
||||
import Hasura.SQL.Backend
|
||||
@ -112,7 +113,7 @@ migrateCatalog ::
|
||||
ExtensionsSchema ->
|
||||
MaintenanceMode () ->
|
||||
UTCTime ->
|
||||
m (MigrationResult, Metadata)
|
||||
m (MigrationResult, MetadataWithResourceVersion)
|
||||
migrateCatalog maybeDefaultSourceConfig extensionsSchema maintenanceMode migrationTime = do
|
||||
catalogSchemaExists <- doesSchemaExist (SchemaName "hdb_catalog")
|
||||
versionTableExists <- doesTableExist (SchemaName "hdb_catalog") (TableName "hdb_version")
|
||||
@ -135,8 +136,8 @@ migrateCatalog maybeDefaultSourceConfig extensionsSchema maintenanceMode migrati
|
||||
True -> case versionTableExists of
|
||||
False -> initialize False
|
||||
True -> migrateFrom =<< liftTx getCatalogVersion
|
||||
metadata <- liftTx fetchMetadataFromCatalog
|
||||
pure (migrationResult, metadata)
|
||||
metadataWithVersion <- liftTx fetchMetadataAndResourceVersionFromCatalog
|
||||
pure (migrationResult, metadataWithVersion)
|
||||
where
|
||||
-- initializes the catalog, creating the schema if necessary
|
||||
initialize :: Bool -> m MigrationResult
|
||||
|
@ -287,78 +287,65 @@ refreshSchemaCache
|
||||
(msg, cache, _) <-
|
||||
runCacheRWT serverConfigCtx rebuildableCache $ do
|
||||
schemaCache <- askSchemaCache
|
||||
case scMetadataResourceVersion schemaCache of
|
||||
-- While starting up, the metadata resource version is set to nothing, so we want to set the version
|
||||
-- without fetching the database metadata (as we have already fetched it during the startup, so, we
|
||||
-- skip fetching it twice)
|
||||
Nothing -> do
|
||||
setMetadataResourceVersionInSchemaCache resourceVersion
|
||||
logInfo logger threadType $
|
||||
String $
|
||||
T.unwords
|
||||
[ "Received metadata resource version:",
|
||||
showMetadataResourceVersion resourceVersion,
|
||||
"as an initial version. Not updating the schema cache."
|
||||
]
|
||||
Just engineResourceVersion ->
|
||||
unless (engineResourceVersion == resourceVersion) $ do
|
||||
let engineResourceVersion = scMetadataResourceVersion schemaCache
|
||||
unless (engineResourceVersion == resourceVersion) $ do
|
||||
logInfo logger threadType $
|
||||
String $
|
||||
T.unwords
|
||||
[ "Received metadata resource version:",
|
||||
showMetadataResourceVersion resourceVersion <> ",",
|
||||
"different from the current engine resource version:",
|
||||
showMetadataResourceVersion engineResourceVersion <> ".",
|
||||
"Trying to update the schema cache."
|
||||
]
|
||||
|
||||
MetadataWithResourceVersion metadata latestResourceVersion <- liftEitherM fetchMetadata
|
||||
|
||||
logInfo logger threadType $
|
||||
String $
|
||||
T.unwords
|
||||
[ "Fetched metadata with resource version:",
|
||||
showMetadataResourceVersion latestResourceVersion
|
||||
]
|
||||
|
||||
notifications <- liftEitherM $ fetchMetadataNotifications engineResourceVersion appEnvInstanceId
|
||||
|
||||
case notifications of
|
||||
[] -> do
|
||||
logInfo logger threadType $
|
||||
String $
|
||||
T.unwords
|
||||
[ "Received metadata resource version:",
|
||||
showMetadataResourceVersion resourceVersion <> ",",
|
||||
"different from the current engine resource version:",
|
||||
showMetadataResourceVersion engineResourceVersion <> ".",
|
||||
"Trying to update the schema cache."
|
||||
[ "Fetched metadata notifications and received no notifications. Not updating the schema cache.",
|
||||
"Only setting resource version:",
|
||||
showMetadataResourceVersion latestResourceVersion,
|
||||
"in schema cache"
|
||||
]
|
||||
setMetadataResourceVersionInSchemaCache latestResourceVersion
|
||||
_ -> do
|
||||
logInfo logger threadType $
|
||||
String "Fetched metadata notifications and received some notifications. Updating the schema cache."
|
||||
let cacheInvalidations =
|
||||
if any ((== (engineResourceVersion + 1)) . fst) notifications
|
||||
then -- If (engineResourceVersion + 1) is in the list of notifications then
|
||||
-- we know that we haven't missed any.
|
||||
mconcat $ snd <$> notifications
|
||||
else -- Otherwise we may have missed some notifications so we need to invalidate the
|
||||
-- whole cache.
|
||||
|
||||
(metadata, latestResourceVersion) <- liftEitherM fetchMetadata
|
||||
|
||||
CacheInvalidations
|
||||
{ ciMetadata = True,
|
||||
ciRemoteSchemas = HS.fromList $ getAllRemoteSchemas schemaCache,
|
||||
ciSources = HS.fromList $ HM.keys $ scSources schemaCache,
|
||||
ciDataConnectors =
|
||||
maybe mempty (HS.fromList . HM.keys . unBackendInfoWrapper) $
|
||||
BackendMap.lookup @'DataConnector $
|
||||
scBackendCache schemaCache
|
||||
}
|
||||
buildSchemaCacheWithOptions CatalogSync cacheInvalidations metadata
|
||||
setMetadataResourceVersionInSchemaCache latestResourceVersion
|
||||
logInfo logger threadType $
|
||||
String $
|
||||
T.unwords
|
||||
[ "Fetched metadata with resource version:",
|
||||
showMetadataResourceVersion latestResourceVersion
|
||||
]
|
||||
|
||||
notifications <- liftEitherM $ fetchMetadataNotifications engineResourceVersion appEnvInstanceId
|
||||
|
||||
case notifications of
|
||||
[] -> do
|
||||
logInfo logger threadType $
|
||||
String $
|
||||
T.unwords
|
||||
[ "Fetched metadata notifications and received no notifications. Not updating the schema cache.",
|
||||
"Only setting resource version:",
|
||||
showMetadataResourceVersion latestResourceVersion,
|
||||
"in schema cache"
|
||||
]
|
||||
setMetadataResourceVersionInSchemaCache latestResourceVersion
|
||||
_ -> do
|
||||
logInfo logger threadType $
|
||||
String "Fetched metadata notifications and received some notifications. Updating the schema cache."
|
||||
let cacheInvalidations =
|
||||
if any ((== (engineResourceVersion + 1)) . fst) notifications
|
||||
then -- If (engineResourceVersion + 1) is in the list of notifications then
|
||||
-- we know that we haven't missed any.
|
||||
mconcat $ snd <$> notifications
|
||||
else -- Otherwise we may have missed some notifications so we need to invalidate the
|
||||
-- whole cache.
|
||||
|
||||
CacheInvalidations
|
||||
{ ciMetadata = True,
|
||||
ciRemoteSchemas = HS.fromList $ getAllRemoteSchemas schemaCache,
|
||||
ciSources = HS.fromList $ HM.keys $ scSources schemaCache,
|
||||
ciDataConnectors =
|
||||
maybe mempty (HS.fromList . HM.keys . unBackendInfoWrapper) $
|
||||
BackendMap.lookup @'DataConnector $
|
||||
scBackendCache schemaCache
|
||||
}
|
||||
buildSchemaCacheWithOptions CatalogSync cacheInvalidations metadata
|
||||
setMetadataResourceVersionInSchemaCache latestResourceVersion
|
||||
logInfo logger threadType $
|
||||
String $
|
||||
"Schema cache updated with resource version: " <> showMetadataResourceVersion latestResourceVersion
|
||||
"Schema cache updated with resource version: " <> showMetadataResourceVersion latestResourceVersion
|
||||
pure (msg, cache)
|
||||
onLeft respErr (logError logger threadType . TEQueryError)
|
||||
|
||||
|
@ -34,6 +34,7 @@ import Hasura.RQL.DDL.Schema.Cache.Common
|
||||
import Hasura.RQL.Types.Common
|
||||
import Hasura.RQL.Types.Metadata (emptyMetadataDefaults)
|
||||
import Hasura.RQL.Types.ResizePool
|
||||
import Hasura.RQL.Types.SchemaCache
|
||||
import Hasura.RQL.Types.SchemaCache.Build
|
||||
import Hasura.Server.Init
|
||||
import Hasura.Server.Init.FeatureFlag as FF
|
||||
@ -140,12 +141,12 @@ main = do
|
||||
|
||||
-- why are we building the schema cache here? it's already built in initialiseContext
|
||||
(metadata, schemaCache) <- run do
|
||||
metadata <-
|
||||
metadataWithVersion <-
|
||||
snd
|
||||
<$> (liftEitherM . runExceptT . _pecRunTx pgContext (PGExecCtxInfo (Tx PG.ReadWrite Nothing) InternalRawQuery))
|
||||
(migrateCatalog (Just sourceConfig) defaultPostgresExtensionsSchema maintenanceMode =<< liftIO getCurrentTime)
|
||||
schemaCache <- runCacheBuild cacheBuildParams $ buildRebuildableSchemaCache logger envMap metadata serverConfigCtx
|
||||
pure (metadata, schemaCache)
|
||||
schemaCache <- runCacheBuild cacheBuildParams $ buildRebuildableSchemaCache logger envMap metadataWithVersion serverConfigCtx
|
||||
pure (_mwrvMetadata metadataWithVersion, schemaCache)
|
||||
|
||||
cacheRef <- newMVar schemaCache
|
||||
pure $ NT (run . flip MigrateSuite.runCacheRefT (serverConfigCtx, cacheRef) . fmap fst . runMetadataT metadata emptyMetadataDefaults)
|
||||
|
@ -129,8 +129,8 @@ suite srcConfig pgExecCtx pgConnInfo = do
|
||||
|
||||
migrateCatalogAndBuildCache env time = do
|
||||
serverConfigCtx <- askServerConfigCtx
|
||||
(migrationResult, metadata) <- runTx' pgExecCtx $ migrateCatalog (Just srcConfig) (ExtensionsSchema "public") MaintenanceModeDisabled time
|
||||
(,migrationResult) <$> runCacheBuildM (buildRebuildableSchemaCache logger env metadata serverConfigCtx)
|
||||
(migrationResult, metadataWithVersion) <- runTx' pgExecCtx $ migrateCatalog (Just srcConfig) (ExtensionsSchema "public") MaintenanceModeDisabled time
|
||||
(,migrationResult) <$> runCacheBuildM (buildRebuildableSchemaCache logger env metadataWithVersion serverConfigCtx)
|
||||
|
||||
dropAndInit env time = lift do
|
||||
scVar <- asks snd
|
||||
|
Loading…
Reference in New Issue
Block a user