mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-14 17:02:49 +03:00
Explicitly invalidate enum values when metadata is reloaded
This fixes #3759. Also, while we’re at it, also improve the way invalidations are synced across instances so enums and remote schemas are appropriately reloaded by the schema syncing process.
This commit is contained in:
parent
3cdb3841e6
commit
8ef205fba5
@ -46,7 +46,7 @@ runApp (HGEOptionsG rci hgeCmd) =
|
||||
execQuery queryBs
|
||||
& runHasSystemDefinedT (SystemDefined False)
|
||||
& runCacheRWT schemaCache
|
||||
& fmap fst
|
||||
& fmap (\(res, _, _) -> res)
|
||||
either printErrJExit (liftIO . BLC.putStrLn) res
|
||||
|
||||
HCVersion -> liftIO $ putStrLn $ "Hasura GraphQL Engine: " ++ convertText currentVersion
|
||||
|
@ -380,7 +380,7 @@ runExportMetadata _ =
|
||||
|
||||
runReloadMetadata :: (QErrM m, CacheRWM m) => ReloadMetadata -> m EncJSON
|
||||
runReloadMetadata ReloadMetadata = do
|
||||
buildSchemaCache
|
||||
buildSchemaCacheWithOptions CatalogUpdate mempty { ciMetadata = True }
|
||||
return successMsg
|
||||
|
||||
runDumpInternalState
|
||||
|
@ -13,6 +13,7 @@ import Hasura.Prelude
|
||||
|
||||
import qualified Data.Aeson as J
|
||||
import qualified Data.HashMap.Strict as Map
|
||||
import qualified Data.HashSet as S
|
||||
import qualified Database.PG.Query as Q
|
||||
|
||||
import Hasura.GraphQL.RemoteServer
|
||||
@ -87,8 +88,8 @@ runReloadRemoteSchema (RemoteSchemaNameQuery name) = do
|
||||
void $ onNothing (Map.lookup name rmSchemas) $
|
||||
throw400 NotExists $ "remote schema with name " <> name <<> " does not exist"
|
||||
|
||||
invalidateCachedRemoteSchema name
|
||||
withNewInconsistentObjsCheck buildSchemaCache
|
||||
let invalidations = mempty { ciRemoteSchemas = S.singleton name }
|
||||
withNewInconsistentObjsCheck $ buildSchemaCacheWithOptions CatalogUpdate invalidations
|
||||
pure successMsg
|
||||
|
||||
addRemoteSchemaToCatalog
|
||||
|
@ -66,39 +66,45 @@ buildRebuildableSchemaCache = do
|
||||
pure $ RebuildableSchemaCache (Inc.result result) initialInvalidationKeys (Inc.rebuildRule result)
|
||||
|
||||
newtype CacheRWT m a
|
||||
= CacheRWT { unCacheRWT :: StateT (RebuildableSchemaCache m) m a }
|
||||
-- The CacheInvalidations component of the state could actually be collected using WriterT, but
|
||||
-- WriterT implementations prior to transformers-0.5.6.0 (which added
|
||||
-- Control.Monad.Trans.Writer.CPS) are leaky, and we don’t have that yet.
|
||||
= CacheRWT (StateT (RebuildableSchemaCache m, CacheInvalidations) m a)
|
||||
deriving
|
||||
( Functor, Applicative, Monad, MonadIO, MonadReader r, MonadError e, MonadWriter w, MonadTx
|
||||
( Functor, Applicative, Monad, MonadIO, MonadReader r, MonadError e, MonadTx
|
||||
, UserInfoM, HasHttpManager, HasSQLGenCtx, HasSystemDefined )
|
||||
|
||||
runCacheRWT :: RebuildableSchemaCache m -> CacheRWT m a -> m (a, RebuildableSchemaCache m)
|
||||
runCacheRWT cache = flip runStateT cache . unCacheRWT
|
||||
runCacheRWT
|
||||
:: Functor m
|
||||
=> RebuildableSchemaCache m -> CacheRWT m a -> m (a, RebuildableSchemaCache m, CacheInvalidations)
|
||||
runCacheRWT cache (CacheRWT m) =
|
||||
runStateT m (cache, mempty) <&> \(v, (newCache, invalidations)) -> (v, newCache, invalidations)
|
||||
|
||||
instance MonadTrans CacheRWT where
|
||||
lift = CacheRWT . lift
|
||||
|
||||
instance (Monad m) => TableCoreInfoRM (CacheRWT m)
|
||||
instance (Monad m) => CacheRM (CacheRWT m) where
|
||||
askSchemaCache = CacheRWT $ gets lastBuiltSchemaCache
|
||||
askSchemaCache = CacheRWT $ gets (lastBuiltSchemaCache . fst)
|
||||
|
||||
instance (MonadIO m, MonadTx m) => CacheRWM (CacheRWT m) where
|
||||
buildSchemaCacheWithOptions buildReason = CacheRWT do
|
||||
RebuildableSchemaCache _ invalidationKeys rule <- get
|
||||
buildSchemaCacheWithOptions buildReason invalidations = CacheRWT do
|
||||
(RebuildableSchemaCache _ invalidationKeys rule, oldInvalidations) <- get
|
||||
let newInvalidationKeys = invalidateKeys invalidations invalidationKeys
|
||||
catalogMetadata <- liftTx fetchCatalogData
|
||||
result <- lift $ flip runReaderT buildReason $ Inc.build rule (catalogMetadata, invalidationKeys)
|
||||
result <- lift $ flip runReaderT buildReason $
|
||||
Inc.build rule (catalogMetadata, newInvalidationKeys)
|
||||
let schemaCache = Inc.result result
|
||||
prunedInvalidationKeys = pruneInvalidationKeys schemaCache invalidationKeys
|
||||
put $! RebuildableSchemaCache schemaCache prunedInvalidationKeys (Inc.rebuildRule result)
|
||||
prunedInvalidationKeys = pruneInvalidationKeys schemaCache newInvalidationKeys
|
||||
!newCache = RebuildableSchemaCache schemaCache prunedInvalidationKeys (Inc.rebuildRule result)
|
||||
!newInvalidations = oldInvalidations <> invalidations
|
||||
put (newCache, newInvalidations)
|
||||
where
|
||||
-- Prunes invalidation keys that no longer exist in the schema to avoid leaking memory by
|
||||
-- hanging onto unnecessary keys.
|
||||
pruneInvalidationKeys schemaCache = over ikRemoteSchemas $ M.filterWithKey \name _ ->
|
||||
M.member name (scRemoteSchemas schemaCache)
|
||||
|
||||
invalidateCachedRemoteSchema name =
|
||||
CacheRWT $ modifying (rscInvalidationMap . ikRemoteSchemas . at name) $
|
||||
Just . maybe Inc.initialInvalidationKey Inc.invalidate
|
||||
|
||||
buildSchemaCacheRule
|
||||
-- Note: by supplying BuildReason via MonadReader, it does not participate in caching, which is
|
||||
-- what we want!
|
||||
@ -145,7 +151,7 @@ buildSchemaCacheRule = proc (catalogMetadata, invalidationKeys) -> do
|
||||
computedFields = catalogMetadata
|
||||
|
||||
-- tables
|
||||
tableRawInfos <- buildTableCache -< tables
|
||||
tableRawInfos <- buildTableCache -< (tables, Inc.selectD #_ikMetadata invalidationKeys)
|
||||
|
||||
-- relationships and computed fields
|
||||
let relationshipsByTable = M.groupOn _crTable relationships
|
||||
|
@ -1,4 +1,5 @@
|
||||
{-# LANGUAGE Arrows #-}
|
||||
{-# LANGUAGE Arrows #-}
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
|
||||
-- | Types/functions shared between modules that implement "Hasura.RQL.DDL.Schema.Cache". Other
|
||||
-- modules should not import this module directly.
|
||||
@ -6,6 +7,7 @@ module Hasura.RQL.DDL.Schema.Cache.Common where
|
||||
|
||||
import Hasura.Prelude
|
||||
|
||||
import qualified Data.HashMap.Strict as M
|
||||
import qualified Data.HashSet as HS
|
||||
import qualified Data.Sequence as Seq
|
||||
|
||||
@ -20,11 +22,10 @@ import Hasura.RQL.Types.QueryCollection
|
||||
import Hasura.RQL.Types.Run
|
||||
import Hasura.SQL.Types
|
||||
|
||||
-- | 'InvalidationKeys' used to apply requested 'CacheInvalidations'.
|
||||
data InvalidationKeys = InvalidationKeys
|
||||
{ _ikMetadata :: !Inc.InvalidationKey
|
||||
-- ^ Invalidated by the @reload_metadata@ API.
|
||||
, _ikRemoteSchemas :: !(HashMap RemoteSchemaName Inc.InvalidationKey)
|
||||
-- ^ Invalidated by the @reload_remote_schema@ API.
|
||||
} deriving (Eq, Generic)
|
||||
instance Inc.Cacheable InvalidationKeys
|
||||
instance Inc.Select InvalidationKeys
|
||||
@ -33,6 +34,13 @@ $(makeLenses ''InvalidationKeys)
|
||||
initialInvalidationKeys :: InvalidationKeys
|
||||
initialInvalidationKeys = InvalidationKeys Inc.initialInvalidationKey mempty
|
||||
|
||||
invalidateKeys :: CacheInvalidations -> InvalidationKeys -> InvalidationKeys
|
||||
invalidateKeys CacheInvalidations{..} InvalidationKeys{..} = InvalidationKeys
|
||||
{ _ikMetadata = if ciMetadata then Inc.invalidate _ikMetadata else _ikMetadata
|
||||
, _ikRemoteSchemas = foldl' (flip invalidateRemoteSchema) _ikRemoteSchemas ciRemoteSchemas }
|
||||
where
|
||||
invalidateRemoteSchema = M.alter $ Just . maybe Inc.initialInvalidationKey Inc.invalidate
|
||||
|
||||
data BuildInputs
|
||||
= BuildInputs
|
||||
{ _biReason :: !BuildReason
|
||||
|
@ -292,10 +292,14 @@ buildTableCache
|
||||
:: forall arr m
|
||||
. ( ArrowChoice arr, Inc.ArrowDistribute arr, ArrowWriter (Seq CollectedInfo) arr
|
||||
, Inc.ArrowCache m arr, MonadTx m )
|
||||
=> [CatalogTable] `arr` M.HashMap QualifiedTable TableRawInfo
|
||||
buildTableCache = Inc.cache proc catalogTables -> do
|
||||
=> ( [CatalogTable]
|
||||
, Inc.Dependency Inc.InvalidationKey
|
||||
) `arr` M.HashMap QualifiedTable TableRawInfo
|
||||
buildTableCache = Inc.cache proc (catalogTables, reloadMetadataInvalidationKey) -> do
|
||||
rawTableInfos <-
|
||||
(| Inc.keyed (| withTable (\tables -> buildRawTableInfo <<< noDuplicateTables -< tables) |)
|
||||
(| Inc.keyed (| withTable (\tables
|
||||
-> (tables, reloadMetadataInvalidationKey)
|
||||
>- first noDuplicateTables >>> buildRawTableInfo) |)
|
||||
|) (M.groupOnNE _ctName catalogTables)
|
||||
let rawTableCache = M.catMaybes rawTableInfos
|
||||
enumTables = flip M.mapMaybe rawTableCache \rawTableInfo ->
|
||||
@ -314,8 +318,13 @@ buildTableCache = Inc.cache proc catalogTables -> do
|
||||
_ -> throwA -< err400 AlreadyExists "duplication definition for table"
|
||||
|
||||
-- Step 1: Build the raw table cache from metadata information.
|
||||
buildRawTableInfo :: ErrorA QErr arr CatalogTable (TableCoreInfoG PGRawColumnInfo PGCol)
|
||||
buildRawTableInfo = Inc.cache proc (CatalogTable name systemDefined isEnum config maybeInfo) -> do
|
||||
buildRawTableInfo
|
||||
:: ErrorA QErr arr
|
||||
( CatalogTable
|
||||
, Inc.Dependency Inc.InvalidationKey
|
||||
) (TableCoreInfoG PGRawColumnInfo PGCol)
|
||||
buildRawTableInfo = Inc.cache proc (catalogTable, reloadMetadataInvalidationKey) -> do
|
||||
let CatalogTable name systemDefined isEnum config maybeInfo = catalogTable
|
||||
catalogInfo <-
|
||||
(| onNothingA (throwA -<
|
||||
err400 NotExists $ "no such table/view exists in postgres: " <>> name)
|
||||
@ -326,7 +335,11 @@ buildTableCache = Inc.cache proc catalogTables -> do
|
||||
primaryKey = _ctiPrimaryKey catalogInfo
|
||||
rawPrimaryKey <- liftEitherA -< traverse (resolvePrimaryKeyColumns columnMap) primaryKey
|
||||
enumValues <- if isEnum
|
||||
then bindErrorA -< Just <$> fetchAndValidateEnumValues name rawPrimaryKey columns
|
||||
then do
|
||||
-- We want to make sure we reload enum values whenever someone explicitly calls
|
||||
-- `reload_metadata`.
|
||||
Inc.dependOn -< reloadMetadataInvalidationKey
|
||||
bindErrorA -< Just <$> fetchAndValidateEnumValues name rawPrimaryKey columns
|
||||
else returnA -< Nothing
|
||||
|
||||
returnA -< TableCoreInfo
|
||||
|
@ -13,6 +13,7 @@ module Hasura.RQL.Types.SchemaCache.Build
|
||||
|
||||
, CacheRWM(..)
|
||||
, BuildReason(..)
|
||||
, CacheInvalidations(..)
|
||||
, buildSchemaCache
|
||||
, buildSchemaCacheFor
|
||||
, buildSchemaCacheStrict
|
||||
@ -28,6 +29,8 @@ import qualified Data.Text as T
|
||||
import Control.Arrow.Extended
|
||||
import Control.Lens
|
||||
import Data.Aeson (toJSON)
|
||||
import Data.Aeson.Casing
|
||||
import Data.Aeson.TH
|
||||
import Data.List (nub)
|
||||
|
||||
import Hasura.RQL.Types.Error
|
||||
@ -96,8 +99,7 @@ withRecordInconsistency f = proc (e, (metadataObject, s)) -> do
|
||||
-- operations for triggering a schema cache rebuild
|
||||
|
||||
class (CacheRM m) => CacheRWM m where
|
||||
buildSchemaCacheWithOptions :: BuildReason -> m ()
|
||||
invalidateCachedRemoteSchema :: RemoteSchemaName -> m ()
|
||||
buildSchemaCacheWithOptions :: BuildReason -> CacheInvalidations -> m ()
|
||||
|
||||
data BuildReason
|
||||
-- | The build was triggered by an update this instance made to the catalog (in the
|
||||
@ -110,12 +112,26 @@ data BuildReason
|
||||
| CatalogSync
|
||||
deriving (Show, Eq)
|
||||
|
||||
data CacheInvalidations = CacheInvalidations
|
||||
{ ciMetadata :: !Bool
|
||||
-- ^ Force reloading of all database information, including information not technically stored in
|
||||
-- metadata (currently just enum values). Set by the @reload_metadata@ API.
|
||||
, ciRemoteSchemas :: !(HashSet RemoteSchemaName)
|
||||
-- ^ Force refetching of the given remote schemas, even if their definition has not changed. Set
|
||||
-- by the @reload_remote_schema@ API.
|
||||
}
|
||||
$(deriveJSON (aesonDrop 2 snakeCase) ''CacheInvalidations)
|
||||
|
||||
instance Semigroup CacheInvalidations where
|
||||
CacheInvalidations a1 b1 <> CacheInvalidations a2 b2 = CacheInvalidations (a1 || a2) (b1 <> b2)
|
||||
instance Monoid CacheInvalidations where
|
||||
mempty = CacheInvalidations False mempty
|
||||
|
||||
instance (CacheRWM m) => CacheRWM (ReaderT r m) where
|
||||
buildSchemaCacheWithOptions = lift . buildSchemaCacheWithOptions
|
||||
invalidateCachedRemoteSchema = lift . invalidateCachedRemoteSchema
|
||||
buildSchemaCacheWithOptions a b = lift $ buildSchemaCacheWithOptions a b
|
||||
|
||||
buildSchemaCache :: (CacheRWM m) => m ()
|
||||
buildSchemaCache = buildSchemaCacheWithOptions CatalogUpdate
|
||||
buildSchemaCache = buildSchemaCacheWithOptions CatalogUpdate mempty
|
||||
|
||||
-- | Rebuilds the schema cache. If an object with the given object id became newly inconsistent,
|
||||
-- raises an error about it specifically. Otherwise, raises a generic metadata inconsistency error.
|
||||
|
@ -5,6 +5,7 @@ module Hasura.Server.App where
|
||||
|
||||
import Control.Concurrent.MVar
|
||||
import Control.Exception (IOException, try)
|
||||
import Control.Lens (view, _2)
|
||||
import Control.Monad.Stateless
|
||||
import Data.Aeson hiding (json)
|
||||
import Data.Either (isRight)
|
||||
@ -461,9 +462,9 @@ mkWaiApp isoLevel logger sqlGenCtx enableAL pool ci httpManager mode corsCfg ena
|
||||
(cacheRef, cacheBuiltTime) <- do
|
||||
pgResp <- runExceptT $ peelRun runCtx pgExecCtxSer Q.ReadWrite $
|
||||
(,) <$> buildRebuildableSchemaCache <*> liftTx fetchLastUpdate
|
||||
(schemaCache, time) <- liftIO $ either initErrExit return pgResp
|
||||
(schemaCache, event) <- liftIO $ either initErrExit return pgResp
|
||||
scRef <- liftIO $ newIORef (schemaCache, initSchemaCacheVer)
|
||||
return (scRef, snd <$> time)
|
||||
return (scRef, view _2 <$> event)
|
||||
|
||||
cacheLock <- liftIO $ newMVar ()
|
||||
planCache <- liftIO $ E.initPlanCache planCacheOptions
|
||||
|
@ -16,9 +16,6 @@ module Hasura.Server.Migrate
|
||||
, dropCatalog
|
||||
) where
|
||||
|
||||
import Control.Monad.Unique
|
||||
import Data.Time.Clock (UTCTime)
|
||||
|
||||
import Hasura.Prelude
|
||||
|
||||
import qualified Data.Aeson as A
|
||||
@ -29,6 +26,10 @@ import qualified Database.PG.Query.Connection as Q
|
||||
import qualified Language.Haskell.TH.Lib as TH
|
||||
import qualified Language.Haskell.TH.Syntax as TH
|
||||
|
||||
import Control.Lens (view, _2)
|
||||
import Control.Monad.Unique
|
||||
import Data.Time.Clock (UTCTime)
|
||||
|
||||
import Hasura.Logging (Hasura, LogLevel (..), ToEngineLog (..))
|
||||
import Hasura.RQL.DDL.Relationship
|
||||
import Hasura.RQL.DDL.Schema
|
||||
@ -163,7 +164,7 @@ migrateCatalog migrationTime = do
|
||||
buildCacheAndRecreateSystemMetadata :: m (RebuildableSchemaCache m)
|
||||
buildCacheAndRecreateSystemMetadata = do
|
||||
schemaCache <- buildRebuildableSchemaCache
|
||||
snd <$> runCacheRWT schemaCache recreateSystemMetadata
|
||||
view _2 <$> runCacheRWT schemaCache recreateSystemMetadata
|
||||
|
||||
-- the old 0.8 catalog version is non-integral, so we store it in the database as a string
|
||||
getCatalogVersion = liftTx $ runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler
|
||||
|
@ -12,7 +12,7 @@ import Hasura.Prelude
|
||||
import qualified Data.Text as T
|
||||
|
||||
latestCatalogVersion :: Integer
|
||||
latestCatalogVersion = 30
|
||||
latestCatalogVersion = 31
|
||||
|
||||
latestCatalogVersionString :: T.Text
|
||||
latestCatalogVersionString = T.pack $ show latestCatalogVersion
|
||||
|
@ -150,23 +150,21 @@ $(deriveJSON
|
||||
''RQLQueryV2
|
||||
)
|
||||
|
||||
fetchLastUpdate :: Q.TxE QErr (Maybe (InstanceId, UTCTime))
|
||||
fetchLastUpdate = do
|
||||
Q.withQE defaultTxErrorHandler
|
||||
[Q.sql|
|
||||
SELECT instance_id::text, occurred_at
|
||||
FROM hdb_catalog.hdb_schema_update_event
|
||||
ORDER BY occurred_at DESC LIMIT 1
|
||||
|] () True
|
||||
fetchLastUpdate :: Q.TxE QErr (Maybe (InstanceId, UTCTime, CacheInvalidations))
|
||||
fetchLastUpdate = over (_Just._3) Q.getAltJ <$> Q.withQE defaultTxErrorHandler [Q.sql|
|
||||
SELECT instance_id::text, occurred_at, invalidations
|
||||
FROM hdb_catalog.hdb_schema_update_event
|
||||
ORDER BY occurred_at DESC LIMIT 1
|
||||
|] () True
|
||||
|
||||
recordSchemaUpdate :: InstanceId -> Q.TxE QErr ()
|
||||
recordSchemaUpdate instanceId =
|
||||
recordSchemaUpdate :: InstanceId -> CacheInvalidations -> Q.TxE QErr ()
|
||||
recordSchemaUpdate instanceId invalidations =
|
||||
liftTx $ Q.unitQE defaultTxErrorHandler [Q.sql|
|
||||
INSERT INTO hdb_catalog.hdb_schema_update_event
|
||||
(instance_id, occurred_at) VALUES ($1::uuid, DEFAULT)
|
||||
(instance_id, occurred_at, invalidations) VALUES ($1::uuid, DEFAULT, $2::json)
|
||||
ON CONFLICT ((occurred_at IS NOT NULL))
|
||||
DO UPDATE SET instance_id = $1::uuid, occurred_at = DEFAULT
|
||||
|] (Identity instanceId) True
|
||||
DO UPDATE SET instance_id = $1::uuid, occurred_at = DEFAULT, invalidations = $2::json
|
||||
|] (instanceId, Q.AltJ invalidations) True
|
||||
|
||||
runQuery
|
||||
:: (HasVersion, MonadIO m, MonadError QErr m)
|
||||
@ -184,12 +182,12 @@ runQuery pgExecCtx instanceId userInfo sc hMgr sqlGenCtx systemDefined query = d
|
||||
either throwError withReload resE
|
||||
where
|
||||
runCtx = RunCtx userInfo hMgr sqlGenCtx
|
||||
withReload r = do
|
||||
withReload (result, updatedCache, invalidations) = do
|
||||
when (queryModifiesSchemaCache query) $ do
|
||||
e <- liftIO $ runExceptT $ runLazyTx pgExecCtx Q.ReadWrite
|
||||
$ liftTx $ recordSchemaUpdate instanceId
|
||||
e <- liftIO $ runExceptT $ runLazyTx pgExecCtx Q.ReadWrite $ liftTx $
|
||||
recordSchemaUpdate instanceId invalidations
|
||||
liftEither e
|
||||
return r
|
||||
return (result, updatedCache)
|
||||
|
||||
-- | A predicate that determines whether the given query might modify/rebuild the schema cache. If
|
||||
-- so, it needs to acquire the global lock on the schema cache so that other queries do not modify
|
||||
|
@ -58,9 +58,10 @@ instance ToEngineLog SchemaSyncThreadLog Hasura where
|
||||
|
||||
data EventPayload
|
||||
= EventPayload
|
||||
{ _epInstanceId :: !InstanceId
|
||||
, _epOccurredAt :: !UTC.UTCTime
|
||||
} deriving (Show, Eq)
|
||||
{ _epInstanceId :: !InstanceId
|
||||
, _epOccurredAt :: !UTC.UTCTime
|
||||
, _epInvalidations :: !CacheInvalidations
|
||||
}
|
||||
$(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload)
|
||||
|
||||
data ThreadError
|
||||
@ -136,9 +137,9 @@ listener sqlGenCtx pool logger httpMgr updateEventRef
|
||||
Just time -> (dbInstId /= instanceId) && accrdAt > time
|
||||
|
||||
refreshCache Nothing = return ()
|
||||
refreshCache (Just (dbInstId, accrdAt)) =
|
||||
refreshCache (Just (dbInstId, accrdAt, invalidations)) =
|
||||
when (shouldRefresh dbInstId accrdAt) $
|
||||
refreshSchemaCache sqlGenCtx pool logger httpMgr cacheRef
|
||||
refreshSchemaCache sqlGenCtx pool logger httpMgr cacheRef invalidations
|
||||
threadType "schema cache reloaded after postgres listen init"
|
||||
|
||||
notifyHandler = \case
|
||||
@ -179,7 +180,7 @@ processor sqlGenCtx pool logger httpMgr updateEventRef
|
||||
event <- STM.atomically getLatestEvent
|
||||
logInfo logger threadType $ object ["processed_event" .= event]
|
||||
when (shouldReload event) $
|
||||
refreshSchemaCache sqlGenCtx pool logger httpMgr cacheRef
|
||||
refreshSchemaCache sqlGenCtx pool logger httpMgr cacheRef (_epInvalidations event)
|
||||
threadType "schema cache reloaded"
|
||||
where
|
||||
-- checks if there is an event
|
||||
@ -202,15 +203,17 @@ refreshSchemaCache
|
||||
-> Logger Hasura
|
||||
-> HTTP.Manager
|
||||
-> SchemaCacheRef
|
||||
-> CacheInvalidations
|
||||
-> ThreadType
|
||||
-> T.Text -> IO ()
|
||||
refreshSchemaCache sqlGenCtx pool logger httpManager cacheRef threadType msg = do
|
||||
refreshSchemaCache sqlGenCtx pool logger httpManager cacheRef invalidations threadType msg = do
|
||||
-- Reload schema cache from catalog
|
||||
resE <- liftIO $ runExceptT $ withSCUpdate cacheRef logger do
|
||||
rebuildableCache <- fst <$> liftIO (readIORef $ _scrCache cacheRef)
|
||||
buildSchemaCacheWithOptions CatalogSync
|
||||
((), cache, _) <- buildSchemaCacheWithOptions CatalogSync invalidations
|
||||
& runCacheRWT rebuildableCache
|
||||
& peelRun runCtx pgCtx PG.ReadWrite
|
||||
pure ((), cache)
|
||||
case resE of
|
||||
Left e -> logError logger threadType $ TEQueryError e
|
||||
Right () -> logInfo logger threadType $ object ["message" .= msg]
|
||||
|
@ -411,7 +411,8 @@ CREATE TABLE hdb_catalog.remote_schemas (
|
||||
|
||||
CREATE TABLE hdb_catalog.hdb_schema_update_event (
|
||||
instance_id uuid NOT NULL,
|
||||
occurred_at timestamptz NOT NULL DEFAULT NOW()
|
||||
occurred_at timestamptz NOT NULL DEFAULT NOW(),
|
||||
invalidations json NOT NULL
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX hdb_schema_update_event_one_row
|
||||
@ -422,13 +423,16 @@ $function$
|
||||
DECLARE
|
||||
instance_id uuid;
|
||||
occurred_at timestamptz;
|
||||
invalidations json;
|
||||
curr_rec record;
|
||||
BEGIN
|
||||
instance_id = NEW.instance_id;
|
||||
occurred_at = NEW.occurred_at;
|
||||
invalidations = NEW.invalidations;
|
||||
PERFORM pg_notify('hasura_schema_update', json_build_object(
|
||||
'instance_id', instance_id,
|
||||
'occurred_at', occurred_at
|
||||
'occurred_at', occurred_at,
|
||||
'invalidations', invalidations
|
||||
)::text);
|
||||
RETURN curr_rec;
|
||||
END;
|
||||
@ -657,9 +661,9 @@ CREATE VIEW hdb_catalog.hdb_computed_field_function AS
|
||||
FROM hdb_catalog.hdb_computed_field
|
||||
);
|
||||
|
||||
CREATE OR REPLACE FUNCTION hdb_catalog.check_violation(msg text) RETURNS bool AS
|
||||
CREATE OR REPLACE FUNCTION hdb_catalog.check_violation(msg text) RETURNS bool AS
|
||||
$$
|
||||
BEGIN
|
||||
RAISE check_violation USING message=msg;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
22
server/src-rsr/migrations/30_to_31.sql
Normal file
22
server/src-rsr/migrations/30_to_31.sql
Normal file
@ -0,0 +1,22 @@
|
||||
TRUNCATE hdb_catalog.hdb_schema_update_event;
|
||||
ALTER TABLE hdb_catalog.hdb_schema_update_event ADD COLUMN invalidations json NOT NULL;
|
||||
CREATE OR REPLACE FUNCTION hdb_catalog.hdb_schema_update_event_notifier() RETURNS trigger AS
|
||||
$function$
|
||||
DECLARE
|
||||
instance_id uuid;
|
||||
occurred_at timestamptz;
|
||||
invalidations json;
|
||||
curr_rec record;
|
||||
BEGIN
|
||||
instance_id = NEW.instance_id;
|
||||
occurred_at = NEW.occurred_at;
|
||||
invalidations = NEW.invalidations;
|
||||
PERFORM pg_notify('hasura_schema_update', json_build_object(
|
||||
'instance_id', instance_id,
|
||||
'occurred_at', occurred_at,
|
||||
'invalidations', invalidations
|
||||
)::text);
|
||||
RETURN curr_rec;
|
||||
END;
|
||||
$function$
|
||||
LANGUAGE plpgsql;
|
19
server/src-rsr/migrations/31_to_30.sql
Normal file
19
server/src-rsr/migrations/31_to_30.sql
Normal file
@ -0,0 +1,19 @@
|
||||
TRUNCATE hdb_catalog.hdb_schema_update_event;
|
||||
CREATE OR REPLACE FUNCTION hdb_catalog.hdb_schema_update_event_notifier() RETURNS trigger AS
|
||||
$function$
|
||||
DECLARE
|
||||
instance_id uuid;
|
||||
occurred_at timestamptz;
|
||||
curr_rec record;
|
||||
BEGIN
|
||||
instance_id = NEW.instance_id;
|
||||
occurred_at = NEW.occurred_at;
|
||||
PERFORM pg_notify('hasura_schema_update', json_build_object(
|
||||
'instance_id', instance_id,
|
||||
'occurred_at', occurred_at
|
||||
)::text);
|
||||
RETURN curr_rec;
|
||||
END;
|
||||
$function$
|
||||
LANGUAGE plpgsql;
|
||||
ALTER TABLE hdb_catalog.hdb_schema_update_event DROP COLUMN invalidations;
|
@ -36,11 +36,10 @@ instance (MonadBase IO m) => TableCoreInfoRM (CacheRefT m)
|
||||
instance (MonadBase IO m) => CacheRM (CacheRefT m) where
|
||||
askSchemaCache = CacheRefT (fmap lastBuiltSchemaCache . readMVar)
|
||||
|
||||
instance (MonadIO m, MonadBaseControl IO m, MonadTx m, MonadUnique m) => CacheRWM (CacheRefT m) where
|
||||
buildSchemaCacheWithOptions options = CacheRefT $ flip modifyMVar \schemaCache ->
|
||||
swap <$> runCacheRWT schemaCache (buildSchemaCacheWithOptions options)
|
||||
invalidateCachedRemoteSchema name = CacheRefT $ flip modifyMVar \schemaCache ->
|
||||
swap <$> runCacheRWT schemaCache (invalidateCachedRemoteSchema name)
|
||||
instance (MonadIO m, MonadBaseControl IO m, MonadTx m) => CacheRWM (CacheRefT m) where
|
||||
buildSchemaCacheWithOptions reason invalidations = CacheRefT $ flip modifyMVar \schemaCache -> do
|
||||
((), cache, _) <- runCacheRWT schemaCache (buildSchemaCacheWithOptions reason invalidations)
|
||||
pure (cache, ())
|
||||
|
||||
instance Example (CacheRefT m ()) where
|
||||
type Arg (CacheRefT m ()) = CacheRefT m :~> IO
|
||||
|
@ -0,0 +1,55 @@
|
||||
# This is a regression test for #3791.
|
||||
- description: Setup enum table, create relationship, and insert invalid enum value
|
||||
url: /v1/query
|
||||
status: 200
|
||||
query:
|
||||
type: bulk
|
||||
args:
|
||||
- type: set_table_is_enum
|
||||
args:
|
||||
table: weekdays
|
||||
is_enum: true
|
||||
- type: create_object_relationship
|
||||
args:
|
||||
table: employees
|
||||
name: favorite_color_object
|
||||
using:
|
||||
foreign_key_constraint_on: favorite_color
|
||||
- type: run_sql
|
||||
args:
|
||||
sql: INSERT INTO colors (value, comment) VALUES ('illegal+graphql+identifier', '')
|
||||
- type: reload_metadata
|
||||
args: {}
|
||||
|
||||
- description: Query inconsistent objects
|
||||
url: /v1/query
|
||||
status: 200
|
||||
response:
|
||||
is_consistent: false
|
||||
inconsistent_objects:
|
||||
- definition:
|
||||
schema: public
|
||||
name: colors
|
||||
reason: the table "colors" cannot be used as an enum because the value
|
||||
"illegal+graphql+identifier" is not a valid GraphQL enum value name
|
||||
type: table
|
||||
- definition:
|
||||
using:
|
||||
foreign_key_constraint_on: favorite_color
|
||||
name: favorite_color_object
|
||||
comment:
|
||||
table:
|
||||
schema: public
|
||||
name: employees
|
||||
reason: table "colors" is not tracked
|
||||
type: object_relation
|
||||
query:
|
||||
type: get_inconsistent_metadata
|
||||
args: {}
|
||||
|
||||
- description: Drop inconsistent objects
|
||||
url: /v1/query
|
||||
status: 200
|
||||
query:
|
||||
type: drop_inconsistent_metadata
|
||||
args: {}
|
@ -679,6 +679,9 @@ class TestSetTableIsEnum(DefaultTestQueries):
|
||||
def test_add_test_schema_enum_table(self, hge_ctx):
|
||||
check_query_f(hge_ctx, self.dir() + '/add_test_schema_enum_table.yaml')
|
||||
|
||||
def test_relationship_with_inconsistent_enum_table(self, hge_ctx):
|
||||
check_query_f(hge_ctx, self.dir() + '/relationship_with_inconsistent_enum_table.yaml')
|
||||
|
||||
class TestSetTableCustomFields(DefaultTestQueries):
|
||||
@classmethod
|
||||
def dir(cls):
|
||||
@ -735,5 +738,3 @@ class TestBulkQuery(DefaultTestQueries):
|
||||
|
||||
def test_run_bulk_with_select_and_reads(self, hge_ctx):
|
||||
check_query_f(hge_ctx, self.dir() + '/select_with_reads.yaml')
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user