mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-10-04 22:07:40 +03:00
server: fix schema registry server bugs
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/9644 Co-authored-by: Auke Booij <164426+abooij@users.noreply.github.com> GitOrigin-RevId: dc7be1dd63c2e3a7af8582cc6109992517386faf
This commit is contained in:
parent
3a81007de7
commit
bd55c39d3a
@ -604,11 +604,10 @@ buildFirstSchemaCache
|
||||
httpManager
|
||||
mSchemaRegistryContext = do
|
||||
let cacheBuildParams = CacheBuildParams httpManager pgSourceResolver mssqlSourceResolver cacheStaticConfig
|
||||
buildReason = CatalogSync
|
||||
result <-
|
||||
runExceptT
|
||||
$ runCacheBuild cacheBuildParams
|
||||
$ buildRebuildableSchemaCacheWithReason buildReason logger env metadataWithVersion cacheDynamicConfig mSchemaRegistryContext
|
||||
$ buildRebuildableSchemaCache logger env metadataWithVersion cacheDynamicConfig mSchemaRegistryContext
|
||||
result `onLeft` \err -> do
|
||||
-- TODO: we used to bundle the first schema cache build with the catalog
|
||||
-- migration, using the same error handler for both, meaning that an
|
||||
|
@ -14,7 +14,6 @@ module Hasura.RQL.DDL.Schema.Cache
|
||||
( RebuildableSchemaCache,
|
||||
lastBuiltSchemaCache,
|
||||
buildRebuildableSchemaCache,
|
||||
buildRebuildableSchemaCacheWithReason,
|
||||
CacheRWT,
|
||||
runCacheRWT,
|
||||
mkBooleanPermissionMap,
|
||||
@ -160,20 +159,9 @@ buildRebuildableSchemaCache ::
|
||||
CacheDynamicConfig ->
|
||||
Maybe SchemaRegistryContext ->
|
||||
CacheBuild RebuildableSchemaCache
|
||||
buildRebuildableSchemaCache =
|
||||
buildRebuildableSchemaCacheWithReason CatalogSync
|
||||
|
||||
buildRebuildableSchemaCacheWithReason ::
|
||||
BuildReason ->
|
||||
Logger Hasura ->
|
||||
Env.Environment ->
|
||||
MetadataWithResourceVersion ->
|
||||
CacheDynamicConfig ->
|
||||
Maybe SchemaRegistryContext ->
|
||||
CacheBuild RebuildableSchemaCache
|
||||
buildRebuildableSchemaCacheWithReason reason logger env metadataWithVersion dynamicConfig mSchemaRegistryContext = do
|
||||
buildRebuildableSchemaCache logger env metadataWithVersion dynamicConfig mSchemaRegistryContext = do
|
||||
result <-
|
||||
flip runReaderT reason
|
||||
flip runReaderT CatalogSync
|
||||
$ Inc.build (buildSchemaCacheRule logger env mSchemaRegistryContext) (metadataWithVersion, dynamicConfig, initialInvalidationKeys, Nothing)
|
||||
|
||||
pure $ RebuildableSchemaCache (fst $ Inc.result result) initialInvalidationKeys (Inc.rebuildRule result)
|
||||
@ -188,7 +176,7 @@ newtype CacheRWT m a
|
||||
-- passing the 'CacheDynamicConfig' to every function that builds the cache. It
|
||||
-- should ultimately be reduced to 'AppContext', or even better a relevant
|
||||
-- subset thereof.
|
||||
CacheRWT (ReaderT CacheDynamicConfig (StateT (RebuildableSchemaCache, CacheInvalidations, SourcesIntrospectionStatus) m) a)
|
||||
CacheRWT (ReaderT CacheDynamicConfig (StateT (RebuildableSchemaCache, CacheInvalidations, SourcesIntrospectionStatus, SchemaRegistryAction) m) a)
|
||||
deriving newtype
|
||||
( Functor,
|
||||
Applicative,
|
||||
@ -223,11 +211,11 @@ runCacheRWT ::
|
||||
CacheDynamicConfig ->
|
||||
RebuildableSchemaCache ->
|
||||
CacheRWT m a ->
|
||||
m (a, RebuildableSchemaCache, CacheInvalidations, SourcesIntrospectionStatus)
|
||||
m (a, RebuildableSchemaCache, CacheInvalidations, SourcesIntrospectionStatus, SchemaRegistryAction)
|
||||
runCacheRWT config cache (CacheRWT m) = do
|
||||
(v, (newCache, invalidations, introspection)) <-
|
||||
runStateT (runReaderT m config) (cache, mempty, SourcesIntrospectionUnchanged)
|
||||
pure (v, newCache, invalidations, introspection)
|
||||
(v, (newCache, invalidations, introspection, schemaRegistryAction)) <-
|
||||
runStateT (runReaderT m config) (cache, mempty, SourcesIntrospectionUnchanged, Nothing)
|
||||
pure (v, newCache, invalidations, introspection, schemaRegistryAction)
|
||||
|
||||
instance MonadTrans CacheRWT where
|
||||
lift = CacheRWT . lift . lift
|
||||
@ -279,26 +267,26 @@ instance
|
||||
) =>
|
||||
CacheRWM (CacheRWT m)
|
||||
where
|
||||
tryBuildSchemaCacheWithOptions buildReason invalidations metadata validateNewSchemaCache = CacheRWT do
|
||||
tryBuildSchemaCacheWithOptions buildReason invalidations newMetadata validateNewSchemaCache = CacheRWT do
|
||||
dynamicConfig <- ask
|
||||
staticConfig <- askCacheStaticConfig
|
||||
(RebuildableSchemaCache lastBuiltSC invalidationKeys rule, oldInvalidations, _) <- get
|
||||
let metadataVersion = scMetadataResourceVersion lastBuiltSC
|
||||
metadataWithVersion = MetadataWithResourceVersion metadata metadataVersion
|
||||
(RebuildableSchemaCache lastBuiltSC invalidationKeys rule, oldInvalidations, _, _) <- get
|
||||
let oldMetadataVersion = scMetadataResourceVersion lastBuiltSC
|
||||
metadataWithVersion = MetadataWithResourceVersion newMetadata $ MetadataResourceVersion (-1)
|
||||
newInvalidationKeys = invalidateKeys invalidations invalidationKeys
|
||||
storedIntrospection <- loadStoredIntrospection (_cscLogger staticConfig) metadataVersion
|
||||
storedIntrospection <- loadStoredIntrospection (_cscLogger staticConfig) oldMetadataVersion
|
||||
result <-
|
||||
runCacheBuildM
|
||||
$ flip runReaderT buildReason
|
||||
$ Inc.build rule (metadataWithVersion, dynamicConfig, newInvalidationKeys, storedIntrospection)
|
||||
|
||||
let (schemaCache, storedIntrospectionStatus) = Inc.result result
|
||||
let (schemaCache, (storedIntrospectionStatus, schemaRegistryAction)) = Inc.result result
|
||||
prunedInvalidationKeys = pruneInvalidationKeys schemaCache newInvalidationKeys
|
||||
!newCache = RebuildableSchemaCache schemaCache prunedInvalidationKeys (Inc.rebuildRule result)
|
||||
!newInvalidations = oldInvalidations <> invalidations
|
||||
|
||||
case validateNewSchemaCache lastBuiltSC schemaCache of
|
||||
(KeepNewSchemaCache, valueToReturn) -> put (newCache, newInvalidations, storedIntrospectionStatus) >> pure valueToReturn
|
||||
(KeepNewSchemaCache, valueToReturn) -> put (newCache, newInvalidations, storedIntrospectionStatus, schemaRegistryAction) >> pure valueToReturn
|
||||
(DiscardNewSchemaCache, valueToReturn) -> pure valueToReturn
|
||||
where
|
||||
-- Prunes invalidation keys that no longer exist in the schema to avoid leaking memory by
|
||||
@ -308,7 +296,7 @@ instance
|
||||
name `elem` getAllRemoteSchemas schemaCache
|
||||
|
||||
setMetadataResourceVersionInSchemaCache resourceVersion = CacheRWT $ do
|
||||
(rebuildableSchemaCache, invalidations, introspection) <- get
|
||||
(rebuildableSchemaCache, invalidations, introspection, schemaRegistryAction) <- get
|
||||
put
|
||||
( rebuildableSchemaCache
|
||||
{ lastBuiltSchemaCache =
|
||||
@ -317,7 +305,8 @@ instance
|
||||
}
|
||||
},
|
||||
invalidations,
|
||||
introspection
|
||||
introspection,
|
||||
schemaRegistryAction
|
||||
)
|
||||
|
||||
-- | Generate health checks related cache from sources metadata
|
||||
@ -439,8 +428,8 @@ buildSchemaCacheRule ::
|
||||
Logger Hasura ->
|
||||
Env.Environment ->
|
||||
Maybe SchemaRegistryContext ->
|
||||
(MetadataWithResourceVersion, CacheDynamicConfig, InvalidationKeys, Maybe StoredIntrospection) `arr` (SchemaCache, SourcesIntrospectionStatus)
|
||||
buildSchemaCacheRule logger env mSchemaRegistryContext = proc (MetadataWithResourceVersion metadataNoDefaults metadataResourceVersion, dynamicConfig, invalidationKeys, storedIntrospection) -> do
|
||||
(MetadataWithResourceVersion, CacheDynamicConfig, InvalidationKeys, Maybe StoredIntrospection) `arr` (SchemaCache, (SourcesIntrospectionStatus, SchemaRegistryAction))
|
||||
buildSchemaCacheRule logger env mSchemaRegistryContext = proc (MetadataWithResourceVersion metadataNoDefaults lastKnownMetadataResourceVersion, dynamicConfig, invalidationKeys, storedIntrospection) -> do
|
||||
invalidationKeysDep <- Inc.newDependency -< invalidationKeys
|
||||
let metadataDefaults = _cdcMetadataDefaults dynamicConfig
|
||||
metadata@Metadata {..} = overrideMetadataDefaults metadataNoDefaults metadataDefaults
|
||||
@ -503,8 +492,17 @@ buildSchemaCacheRule logger env mSchemaRegistryContext = proc (MetadataWithResou
|
||||
_ <-
|
||||
bindA
|
||||
-< do
|
||||
for_ schemaRegistryAction $ \action -> do
|
||||
liftIO $ action metadataResourceVersion
|
||||
buildReason <- ask
|
||||
case buildReason of
|
||||
-- If this is a catalog sync then we know for sure that the schema has more chances of being committed as some
|
||||
-- other instance of Hasura has already committed the schema. So we can safely write the schema to the registry
|
||||
-- service.
|
||||
CatalogSync ->
|
||||
for_ schemaRegistryAction $ \action -> do
|
||||
liftIO $ action lastKnownMetadataResourceVersion
|
||||
-- If this is a metadata event then we cannot be sure that the schema will be committed. So we write the schema
|
||||
-- to the registry service only after the schema is committed.
|
||||
CatalogUpdate _ -> pure ()
|
||||
|
||||
let schemaCache =
|
||||
SchemaCache
|
||||
@ -538,7 +536,12 @@ buildSchemaCacheRule logger env mSchemaRegistryContext = proc (MetadataWithResou
|
||||
<> inconsistentQueryCollections,
|
||||
scApiLimits = _metaApiLimits,
|
||||
scMetricsConfig = _metaMetricsConfig,
|
||||
scMetadataResourceVersion = metadataResourceVersion,
|
||||
-- Please note that we are setting the metadata resource version to the last known metadata resource
|
||||
-- version. This might not be the final value of the metadata resource version. This is because, if we are
|
||||
-- in the middle of a metadata operation, we might push the metadata to the database and thus this version
|
||||
-- will be of the older metadata. As a fix, we do update the metadata resource version to the latest value
|
||||
-- after the metadata operation is complete (see the usage of `setMetadataResourceVersionInSchemaCache`).
|
||||
scMetadataResourceVersion = lastKnownMetadataResourceVersion,
|
||||
scSetGraphqlIntrospectionOptions = _metaSetGraphqlIntrospectionOptions,
|
||||
scTlsAllowlist = networkTlsAllowlist _metaNetwork,
|
||||
scQueryCollections = _metaQueryCollections,
|
||||
@ -547,7 +550,7 @@ buildSchemaCacheRule logger env mSchemaRegistryContext = proc (MetadataWithResou
|
||||
scSourcePingConfig = buildSourcePingCache _metaSources,
|
||||
scOpenTelemetryConfig = openTelemetryInfo
|
||||
}
|
||||
returnA -< (schemaCache, storedIntrospectionStatus)
|
||||
returnA -< (schemaCache, (storedIntrospectionStatus, schemaRegistryAction))
|
||||
where
|
||||
-- See Note [Avoiding GraphQL schema rebuilds when changing irrelevant Metadata]
|
||||
buildOutputsAndSchema = proc (metadataDep, dynamicConfig, invalidationKeysDep, storedIntrospection) -> do
|
||||
|
@ -56,6 +56,7 @@ import Hasura.Incremental qualified as Inc
|
||||
import Hasura.LogicalModel.Types (LogicalModelName)
|
||||
import Hasura.Prelude
|
||||
import Hasura.RQL.DDL.Schema.Cache.Config
|
||||
import Hasura.RQL.DDL.SchemaRegistry (SchemaRegistryAction)
|
||||
import Hasura.RQL.Types.Backend
|
||||
import Hasura.RQL.Types.BackendType
|
||||
import Hasura.RQL.Types.Common
|
||||
@ -274,7 +275,7 @@ data SourcesIntrospectionStatus
|
||||
data RebuildableSchemaCache = RebuildableSchemaCache
|
||||
{ lastBuiltSchemaCache :: SchemaCache,
|
||||
_rscInvalidationMap :: InvalidationKeys,
|
||||
_rscRebuild :: Inc.Rule (ReaderT BuildReason CacheBuild) (MetadataWithResourceVersion, CacheDynamicConfig, InvalidationKeys, Maybe StoredIntrospection) (SchemaCache, SourcesIntrospectionStatus)
|
||||
_rscRebuild :: Inc.Rule (ReaderT BuildReason CacheBuild) (MetadataWithResourceVersion, CacheDynamicConfig, InvalidationKeys, Maybe StoredIntrospection) (SchemaCache, (SourcesIntrospectionStatus, SchemaRegistryAction))
|
||||
}
|
||||
|
||||
withRecordDependencies ::
|
||||
|
@ -192,7 +192,7 @@ withRecordInconsistencies = recordInconsistenciesWith recordInconsistencies
|
||||
-- operations for triggering a schema cache rebuild
|
||||
|
||||
class (CacheRM m) => CacheRWM m where
|
||||
tryBuildSchemaCacheWithOptions :: BuildReason -> CacheInvalidations -> Metadata -> (ValidateNewSchemaCache a) -> m a
|
||||
tryBuildSchemaCacheWithOptions :: BuildReason -> CacheInvalidations -> Metadata -> ValidateNewSchemaCache a -> m a
|
||||
setMetadataResourceVersionInSchemaCache :: MetadataResourceVersion -> m ()
|
||||
|
||||
buildSchemaCacheWithOptions :: (CacheRWM m) => BuildReason -> CacheInvalidations -> Metadata -> m ()
|
||||
@ -310,7 +310,10 @@ buildSchemaCacheWithInvalidations :: (MetadataM m, CacheRWM m) => CacheInvalidat
|
||||
buildSchemaCacheWithInvalidations cacheInvalidations MetadataModifier {..} = do
|
||||
metadata <- getMetadata
|
||||
let modifiedMetadata = runMetadataModifier metadata
|
||||
buildSchemaCacheWithOptions (CatalogUpdate mempty) cacheInvalidations modifiedMetadata
|
||||
buildSchemaCacheWithOptions
|
||||
(CatalogUpdate mempty)
|
||||
cacheInvalidations
|
||||
modifiedMetadata
|
||||
putMetadata modifiedMetadata
|
||||
|
||||
buildSchemaCache :: (MetadataM m, CacheRWM m) => MetadataModifier -> m ()
|
||||
@ -338,7 +341,12 @@ tryBuildSchemaCacheWithModifiers modifiers = do
|
||||
metadata <- getMetadata
|
||||
foldM (flip ($)) metadata modifiers
|
||||
|
||||
newInconsistentObjects <- tryBuildSchemaCacheWithOptions (CatalogUpdate mempty) mempty modifiedMetadata validateNewSchemaCache
|
||||
newInconsistentObjects <-
|
||||
tryBuildSchemaCacheWithOptions
|
||||
(CatalogUpdate mempty)
|
||||
mempty
|
||||
modifiedMetadata
|
||||
validateNewSchemaCache
|
||||
when (newInconsistentObjects == mempty)
|
||||
$ putMetadata modifiedMetadata
|
||||
pure $ newInconsistentObjects
|
||||
|
@ -138,7 +138,7 @@ runMetadataQuery appContext schemaCache closeWebsocketsOnMetadataChange RQLMetad
|
||||
then emptyMetadataDefaults
|
||||
else acMetadataDefaults appContext
|
||||
let dynamicConfig = buildCacheDynamicConfig appContext
|
||||
((r, modMetadata), modSchemaCache, cacheInvalidations, sourcesIntrospection) <-
|
||||
((r, modMetadata), modSchemaCache, cacheInvalidations, sourcesIntrospection, schemaRegistryAction) <-
|
||||
runMetadataQueryM
|
||||
(acEnvironment appContext)
|
||||
appEnvCheckFeatureFlag
|
||||
@ -172,6 +172,12 @@ runMetadataQuery appContext schemaCache closeWebsocketsOnMetadataChange RQLMetad
|
||||
Tracing.newSpan "storeSourcesIntrospection"
|
||||
$ saveSourcesIntrospection logger sourcesIntrospection newResourceVersion
|
||||
|
||||
-- run the schema registry action
|
||||
Tracing.newSpan "runSchemaRegistryAction"
|
||||
$ for_ schemaRegistryAction
|
||||
$ \action -> do
|
||||
liftIO $ action newResourceVersion
|
||||
|
||||
-- notify schema cache sync
|
||||
Tracing.newSpan "notifySchemaCacheSync"
|
||||
$ liftEitherM
|
||||
@ -182,7 +188,7 @@ runMetadataQuery appContext schemaCache closeWebsocketsOnMetadataChange RQLMetad
|
||||
$ "Inserted schema cache sync notification at resource version:"
|
||||
<> showMetadataResourceVersion newResourceVersion
|
||||
|
||||
(_, modSchemaCache', _, _) <-
|
||||
(_, modSchemaCache', _, _, _) <-
|
||||
Tracing.newSpan "setMetadataResourceVersionInSchemaCache"
|
||||
$ setMetadataResourceVersionInSchemaCache newResourceVersion
|
||||
& runCacheRWT dynamicConfig modSchemaCache
|
||||
|
@ -211,24 +211,35 @@ runQuery appContext sc query = do
|
||||
let dynamicConfig = buildCacheDynamicConfig appContext
|
||||
|
||||
MetadataWithResourceVersion metadata currentResourceVersion <- liftEitherM fetchMetadata
|
||||
((result, updatedMetadata), updatedCache, invalidations, sourcesIntrospection) <-
|
||||
((result, updatedMetadata), modSchemaCache, invalidations, sourcesIntrospection, schemaRegistryAction) <-
|
||||
runQueryM (acEnvironment appContext) (acSQLGenCtx appContext) query
|
||||
-- TODO: remove this straight runReaderT that provides no actual new info
|
||||
& flip runReaderT logger
|
||||
& runMetadataT metadata metadataDefaults
|
||||
& runCacheRWT dynamicConfig sc
|
||||
when (queryModifiesSchemaCache query) $ do
|
||||
case appEnvEnableMaintenanceMode of
|
||||
if queryModifiesSchemaCache query
|
||||
then case appEnvEnableMaintenanceMode of
|
||||
MaintenanceModeDisabled -> do
|
||||
-- set modified metadata in storage
|
||||
newResourceVersion <- liftEitherM $ setMetadata currentResourceVersion updatedMetadata
|
||||
|
||||
(_, modSchemaCache', _, _, _) <-
|
||||
Tracing.newSpan "setMetadataResourceVersionInSchemaCache"
|
||||
$ setMetadataResourceVersionInSchemaCache newResourceVersion
|
||||
& runCacheRWT dynamicConfig modSchemaCache
|
||||
|
||||
-- save sources introspection to stored-introspection DB
|
||||
saveSourcesIntrospection logger sourcesIntrospection newResourceVersion
|
||||
-- run schema registry action
|
||||
for_ schemaRegistryAction $ \action -> do
|
||||
liftIO $ action newResourceVersion
|
||||
-- notify schema cache sync
|
||||
liftEitherM $ notifySchemaCacheSync newResourceVersion appEnvInstanceId invalidations
|
||||
|
||||
pure (result, modSchemaCache')
|
||||
MaintenanceModeEnabled () ->
|
||||
throw500 "metadata cannot be modified in maintenance mode"
|
||||
pure (result, updatedCache)
|
||||
else pure (result, modSchemaCache)
|
||||
|
||||
-- | A predicate that determines whether the given query might modify/rebuild the schema cache. If
|
||||
-- so, it needs to acquire the global lock on the schema cache so that other queries do not modify
|
||||
|
@ -124,13 +124,13 @@ runQuery appContext schemaCache rqlQuery = do
|
||||
|
||||
let dynamicConfig = buildCacheDynamicConfig appContext
|
||||
MetadataWithResourceVersion metadata currentResourceVersion <- Tracing.newSpan "fetchMetadata" $ liftEitherM fetchMetadata
|
||||
((result, updatedMetadata), updatedCache, invalidations, sourcesIntrospection) <-
|
||||
((result, updatedMetadata), modSchemaCache, invalidations, sourcesIntrospection, schemaRegistryAction) <-
|
||||
runQueryM (acSQLGenCtx appContext) rqlQuery
|
||||
-- We can use defaults here unconditionally, since there is no MD export function in V2Query
|
||||
& runMetadataT metadata (acMetadataDefaults appContext)
|
||||
& runCacheRWT dynamicConfig schemaCache
|
||||
when (queryModifiesSchema rqlQuery) $ do
|
||||
case appEnvEnableMaintenanceMode of
|
||||
if queryModifiesSchema rqlQuery
|
||||
then case appEnvEnableMaintenanceMode of
|
||||
MaintenanceModeDisabled -> do
|
||||
-- set modified metadata in storage
|
||||
newResourceVersion <-
|
||||
@ -138,17 +138,30 @@ runQuery appContext schemaCache rqlQuery = do
|
||||
$ liftEitherM
|
||||
$ setMetadata currentResourceVersion updatedMetadata
|
||||
|
||||
(_, modSchemaCache', _, _, _) <-
|
||||
Tracing.newSpan "setMetadataResourceVersionInSchemaCache"
|
||||
$ setMetadataResourceVersionInSchemaCache newResourceVersion
|
||||
& runCacheRWT dynamicConfig modSchemaCache
|
||||
|
||||
-- save sources introspection to stored-introspection DB
|
||||
Tracing.newSpan "storeSourcesIntrospection"
|
||||
$ saveSourcesIntrospection (_lsLogger appEnvLoggers) sourcesIntrospection newResourceVersion
|
||||
|
||||
-- run schema registry action
|
||||
Tracing.newSpan "runSchemaRegistryAction"
|
||||
$ for_ schemaRegistryAction
|
||||
$ \action -> do
|
||||
liftIO $ action newResourceVersion
|
||||
|
||||
-- notify schema cache sync
|
||||
Tracing.newSpan "notifySchemaCacheSync"
|
||||
$ liftEitherM
|
||||
$ notifySchemaCacheSync newResourceVersion appEnvInstanceId invalidations
|
||||
|
||||
pure (result, modSchemaCache')
|
||||
MaintenanceModeEnabled () ->
|
||||
throw500 "metadata cannot be modified in maintenance mode"
|
||||
pure (result, updatedCache)
|
||||
else pure (result, modSchemaCache)
|
||||
|
||||
queryModifiesSchema :: RQLQuery -> Bool
|
||||
queryModifiesSchema = \case
|
||||
|
@ -32,6 +32,7 @@ import Control.Concurrent.STM qualified as STM
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Data.IORef
|
||||
import Hasura.App.State
|
||||
import Hasura.Base.Error
|
||||
import Hasura.Logging qualified as L
|
||||
import Hasura.Prelude hiding (get, put)
|
||||
import Hasura.RQL.DDL.Schema
|
||||
@ -106,49 +107,25 @@ initialiseAppStateRef (TLSAllowListRef tlsAllowListRef) metricsConfigRefM server
|
||||
liftIO $ writeIORef metricsConfigRef (scMetricsConfig <$> getSchemaCache ref)
|
||||
pure ref
|
||||
|
||||
-- | Set the 'AppStateRef' to the 'RebuildableSchemaCache' produced by the
|
||||
-- given action.
|
||||
--
|
||||
-- An internal lock ensures that at most one update to the 'AppStateRef' may
|
||||
-- proceed at a time.
|
||||
-- TODO: This function might not be needed at all. This function is used only in `refreshSchemaCache` and we
|
||||
-- can use `withSchemaCacheReadUpdate` there.
|
||||
withSchemaCacheUpdate ::
|
||||
(MonadIO m, MonadBaseControl IO m) =>
|
||||
(MonadIO m, MonadBaseControl IO m, MonadError QErr m) =>
|
||||
(AppStateRef impl) ->
|
||||
L.Logger L.Hasura ->
|
||||
Maybe (STM.TVar Bool) ->
|
||||
m (a, RebuildableSchemaCache) ->
|
||||
m a
|
||||
withSchemaCacheUpdate (AppStateRef lock cacheRef metadataVersionGauge) logger mLogCheckerTVar action =
|
||||
withMVarMasked lock $ const do
|
||||
(!res, !newSC) <- action
|
||||
liftIO do
|
||||
-- update schemacache in IO reference
|
||||
modifyIORef' cacheRef $ \appState ->
|
||||
let !newVer = incSchemaCacheVer (snd $ asSchemaCache appState)
|
||||
in appState {asSchemaCache = (newSC, newVer)}
|
||||
|
||||
-- update metric with new metadata version
|
||||
updateMetadataVersionGauge metadataVersionGauge newSC
|
||||
|
||||
let inconsistentObjectsList = scInconsistentObjs $ lastBuiltSchemaCache newSC
|
||||
logInconsistentMetadata' = logInconsistentMetadata logger inconsistentObjectsList
|
||||
-- log any inconsistent objects only once and not everytime this method is called
|
||||
case mLogCheckerTVar of
|
||||
Nothing -> logInconsistentMetadata'
|
||||
Just logCheckerTVar -> do
|
||||
logCheck <- STM.readTVarIO logCheckerTVar
|
||||
if null inconsistentObjectsList && logCheck
|
||||
then do
|
||||
STM.atomically $ STM.writeTVar logCheckerTVar False
|
||||
else do
|
||||
unless (logCheck || null inconsistentObjectsList) $ do
|
||||
STM.atomically $ STM.writeTVar logCheckerTVar True
|
||||
logInconsistentMetadata'
|
||||
|
||||
pure res
|
||||
withSchemaCacheUpdate asr logger mLogCheckerTVar action =
|
||||
withSchemaCacheReadUpdate asr logger mLogCheckerTVar (const action)
|
||||
|
||||
-- | Set the 'AppStateRef' to the 'RebuildableSchemaCache' produced by the
|
||||
-- given action.
|
||||
--
|
||||
-- An internal lock ensures that at most one update to the 'AppStateRef' may
|
||||
-- proceed at a time.
|
||||
withSchemaCacheReadUpdate ::
|
||||
(MonadIO m, MonadBaseControl IO m) =>
|
||||
(MonadIO m, MonadBaseControl IO m, MonadError QErr m) =>
|
||||
(AppStateRef impl) ->
|
||||
L.Logger L.Hasura ->
|
||||
Maybe (STM.TVar Bool) ->
|
||||
@ -158,6 +135,8 @@ withSchemaCacheReadUpdate (AppStateRef lock cacheRef metadataVersionGauge) logge
|
||||
withMVarMasked lock $ const do
|
||||
(rebuildableSchemaCache, _) <- asSchemaCache <$> liftIO (readIORef cacheRef)
|
||||
(!res, !newSC) <- action rebuildableSchemaCache
|
||||
when (scMetadataResourceVersion (lastBuiltSchemaCache newSC) == MetadataResourceVersion (-1))
|
||||
$ throw500 "Programming error: attempting to save Schema Cache with incorrect mrv. Please report this to Hasura."
|
||||
liftIO do
|
||||
-- update schemacache in IO reference
|
||||
modifyIORef' cacheRef $ \appState ->
|
||||
|
@ -291,7 +291,7 @@ refreshSchemaCache
|
||||
let dynamicConfig = buildCacheDynamicConfig appContext
|
||||
-- the instance which triggered the schema sync event would have stored
|
||||
-- the source introspection, hence we can ignore it here
|
||||
(msg, cache, _, _sourcesIntrospection) <-
|
||||
(msg, cache, _, _sourcesIntrospection, _schemaRegistryAction) <-
|
||||
runCacheRWT dynamicConfig rebuildableCache $ do
|
||||
schemaCache <- askSchemaCache
|
||||
let engineResourceVersion = scMetadataResourceVersion schemaCache
|
||||
|
@ -87,13 +87,13 @@ instance
|
||||
tryBuildSchemaCacheWithOptions reason invalidations metadata validateNewSchemaCache = do
|
||||
(dynamicConfig, scVar) <- ask
|
||||
modifyMVar scVar \schemaCache -> do
|
||||
(valueToReturn, cache, _, _) <- runCacheRWT dynamicConfig schemaCache (tryBuildSchemaCacheWithOptions reason invalidations metadata validateNewSchemaCache)
|
||||
(valueToReturn, cache, _, _, _) <- runCacheRWT dynamicConfig schemaCache (tryBuildSchemaCacheWithOptions reason invalidations metadata validateNewSchemaCache)
|
||||
pure (cache, valueToReturn)
|
||||
|
||||
setMetadataResourceVersionInSchemaCache resourceVersion = do
|
||||
(dynamicConfig, scVar) <- ask
|
||||
modifyMVar scVar \schemaCache -> do
|
||||
((), cache, _, _) <- runCacheRWT dynamicConfig schemaCache (setMetadataResourceVersionInSchemaCache resourceVersion)
|
||||
((), cache, _, _, _) <- runCacheRWT dynamicConfig schemaCache (setMetadataResourceVersionInSchemaCache resourceVersion)
|
||||
pure (cache, ())
|
||||
|
||||
instance Example (MetadataT (CacheRefT m) ()) where
|
||||
|
Loading…
Reference in New Issue
Block a user