mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-14 17:02:49 +03:00
server: add metric for the metadata resource version
PR-URL: https://github.com/hasura/graphql-engine-mono/pull/3687 Co-authored-by: awjchen <13142944+awjchen@users.noreply.github.com> GitOrigin-RevId: ae217690ee0371f6fc696fc2e18c72e087dcaff2
This commit is contained in:
parent
aa19f1e0d0
commit
f481507595
@ -536,6 +536,7 @@ library
|
||||
, Hasura.Server.Middleware
|
||||
, Hasura.Server.Cors
|
||||
, Hasura.Server.CheckUpdates
|
||||
, Hasura.Server.SchemaCacheRef
|
||||
, Hasura.Server.SchemaUpdate
|
||||
, Hasura.Server.Migrate.Version
|
||||
, Hasura.Server.Migrate.Internal
|
||||
|
@ -62,7 +62,7 @@ runApp env (HGEOptionsG rci metadataDbUrl hgeCmd) = do
|
||||
-- It'd be nice if we didn't have to call runManagedT twice here, but
|
||||
-- there is a data dependency problem since the call to runPGMetadataStorageApp
|
||||
-- below depends on serveCtx.
|
||||
runManagedT (initialiseServeCtx env globalCtx serveOptions) $ \serveCtx -> do
|
||||
runManagedT (initialiseServeCtx env globalCtx serveOptions serverMetrics) $ \serveCtx -> do
|
||||
-- Catches the SIGTERM signal and initiates a graceful shutdown.
|
||||
-- Graceful shutdown for regular HTTP requests is already implemented in
|
||||
-- Warp, and is triggered by invoking the 'closeSocket' callback.
|
||||
|
@ -59,7 +59,6 @@ import Data.ByteString.Lazy.Char8 qualified as BLC
|
||||
import Data.Environment qualified as Env
|
||||
import Data.FileEmbed (makeRelativeToProject)
|
||||
import Data.HashMap.Strict qualified as HM
|
||||
import Data.IORef (readIORef)
|
||||
import Data.Text qualified as T
|
||||
import Data.Time.Clock (UTCTime)
|
||||
import Data.Time.Clock qualified as Clock
|
||||
@ -107,6 +106,12 @@ import Hasura.Server.Limits
|
||||
import Hasura.Server.Logging
|
||||
import Hasura.Server.Metrics (ServerMetrics (..))
|
||||
import Hasura.Server.Migrate (migrateCatalog)
|
||||
import Hasura.Server.SchemaCacheRef
|
||||
( SchemaCacheRef,
|
||||
getSchemaCache,
|
||||
initialiseSchemaCacheRef,
|
||||
logInconsistentMetadata,
|
||||
)
|
||||
import Hasura.Server.SchemaUpdate
|
||||
import Hasura.Server.Telemetry
|
||||
import Hasura.Server.Types
|
||||
@ -231,9 +236,7 @@ data GlobalCtx = GlobalCtx
|
||||
}
|
||||
|
||||
readTlsAllowlist :: SchemaCacheRef -> IO [TlsAllow]
|
||||
readTlsAllowlist scRef = do
|
||||
(rbsc, _) <- readIORef (_scrCache scRef)
|
||||
pure $ scTlsAllowlist $ lastBuiltSchemaCache rbsc
|
||||
readTlsAllowlist scRef = scTlsAllowlist <$> getSchemaCache scRef
|
||||
|
||||
initGlobalCtx ::
|
||||
(MonadIO m) =>
|
||||
@ -341,8 +344,9 @@ initialiseServeCtx ::
|
||||
Env.Environment ->
|
||||
GlobalCtx ->
|
||||
ServeOptions Hasura ->
|
||||
ServerMetrics ->
|
||||
ManagedT m ServeCtx
|
||||
initialiseServeCtx env GlobalCtx {..} so@ServeOptions {..} = do
|
||||
initialiseServeCtx env GlobalCtx {..} so@ServeOptions {..} serverMetrics = do
|
||||
instanceId <- liftIO generateInstanceId
|
||||
latch <- liftIO newShutdownLatch
|
||||
loggers@(Loggers loggerCtx logger pgLogger) <- mkLoggers soEnabledLogTypes soLogLevel
|
||||
@ -402,7 +406,7 @@ initialiseServeCtx env GlobalCtx {..} so@ServeOptions {..} = do
|
||||
unLogger logger $ mkGenericStrLog LevelInfo "schema-sync" ("Schema sync enabled. Polling at " <> show i)
|
||||
void $ startSchemaSyncListenerThread logger metadataDbPool instanceId i metaVersionRef
|
||||
|
||||
schemaCacheRef <- initialiseCache rebuildableSchemaCache
|
||||
schemaCacheRef <- initialiseSchemaCacheRef serverMetrics rebuildableSchemaCache
|
||||
|
||||
srvMgr <- liftIO $ mkHttpManager (readTlsAllowlist schemaCacheRef) mempty
|
||||
|
||||
@ -718,12 +722,12 @@ mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} initTime postPollHook
|
||||
soReadOnlyMode
|
||||
|
||||
-- Log Warning if deprecated environment variables are used
|
||||
sources <- scSources <$> liftIO (getSCFromRef cacheRef)
|
||||
sources <- scSources <$> liftIO (getSchemaCache cacheRef)
|
||||
liftIO $ logDeprecatedEnvVars logger env sources
|
||||
|
||||
-- log inconsistent schema objects
|
||||
inconsObjs <- scInconsistentObjs <$> liftIO (getSCFromRef cacheRef)
|
||||
liftIO $ logInconsObjs logger inconsObjs
|
||||
inconsObjs <- scInconsistentObjs <$> liftIO (getSchemaCache cacheRef)
|
||||
liftIO $ logInconsistentMetadata logger inconsObjs
|
||||
|
||||
-- NOTE: `newLogTVar` is being used to make sure that the metadata logger runs only once
|
||||
-- while logging errors or any `inconsistent_metadata` logs.
|
||||
@ -762,7 +766,7 @@ mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} initTime postPollHook
|
||||
-- start a background thread to create new cron events
|
||||
_cronEventsThread <-
|
||||
C.forkManagedT "runCronEventsGenerator" logger $
|
||||
runCronEventsGenerator logger (getSCFromRef cacheRef)
|
||||
runCronEventsGenerator logger (getSchemaCache cacheRef)
|
||||
|
||||
startScheduledEventsPollerThread logger eventLogBehavior lockedEventsCtx cacheRef
|
||||
EventingDisabled ->
|
||||
@ -787,7 +791,7 @@ mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} initTime postPollHook
|
||||
|
||||
telemetryThread <-
|
||||
C.forkManagedT "runTelemetry" logger $
|
||||
liftIO $ runTelemetry logger _scHttpManager (getSCFromRef cacheRef) dbId _scInstanceId pgVersion
|
||||
liftIO $ runTelemetry logger _scHttpManager (getSchemaCache cacheRef) dbId _scInstanceId pgVersion
|
||||
return $ Just telemetryThread
|
||||
else return Nothing
|
||||
|
||||
@ -915,7 +919,7 @@ mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} initTime postPollHook
|
||||
logger
|
||||
eventLogBehavior
|
||||
_scHttpManager
|
||||
(getSCFromRef cacheRef)
|
||||
(getSchemaCache cacheRef)
|
||||
eventEngineCtx
|
||||
lockedEventsCtx
|
||||
serverMetrics
|
||||
@ -946,7 +950,7 @@ mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} initTime postPollHook
|
||||
$ asyncActionsProcessor
|
||||
env
|
||||
logger
|
||||
(_scrCache cacheRef)
|
||||
(getSchemaCache cacheRef)
|
||||
(leActionEvents lockedEventsCtx)
|
||||
_scHttpManager
|
||||
sleepTime
|
||||
@ -984,7 +988,7 @@ mkHGEServer setupHook env ServeOptions {..} ServeCtx {..} initTime postPollHook
|
||||
logger
|
||||
eventLogBehavior
|
||||
_scHttpManager
|
||||
(getSCFromRef cacheRef)
|
||||
(getSchemaCache cacheRef)
|
||||
lockedEventsCtx
|
||||
|
||||
instance (Monad m) => Tracing.HasReporter (PGMetadataStorageAppT m)
|
||||
|
@ -29,7 +29,6 @@ import Data.CaseInsensitive qualified as CI
|
||||
import Data.Environment qualified as Env
|
||||
import Data.Has
|
||||
import Data.HashMap.Strict qualified as Map
|
||||
import Data.IORef
|
||||
import Data.Set (Set)
|
||||
import Data.TByteString qualified as TBS
|
||||
import Data.Text.Extended
|
||||
@ -52,7 +51,6 @@ import Hasura.Logging qualified as L
|
||||
import Hasura.Metadata.Class
|
||||
import Hasura.Prelude
|
||||
import Hasura.RQL.DDL.Headers
|
||||
import Hasura.RQL.DDL.Schema.Cache
|
||||
import Hasura.RQL.DDL.Webhook.Transform
|
||||
import Hasura.RQL.DDL.Webhook.Transform.Class (mkReqTransformCtx)
|
||||
import Hasura.RQL.IR.Action qualified as RA
|
||||
@ -331,17 +329,17 @@ asyncActionsProcessor ::
|
||||
) =>
|
||||
Env.Environment ->
|
||||
L.Logger L.Hasura ->
|
||||
IORef (RebuildableSchemaCache, SchemaCacheVer) ->
|
||||
IO SchemaCache ->
|
||||
STM.TVar (Set LockedActionEventId) ->
|
||||
HTTP.Manager ->
|
||||
Milliseconds ->
|
||||
Maybe GH.GQLQueryText ->
|
||||
m (Forever m)
|
||||
asyncActionsProcessor env logger cacheRef lockedActionEvents httpManager sleepTime gqlQueryText =
|
||||
asyncActionsProcessor env logger getSCFromRef' lockedActionEvents httpManager sleepTime gqlQueryText =
|
||||
return $
|
||||
Forever () $
|
||||
const $ do
|
||||
actionCache <- scActions . lastBuiltSchemaCache . fst <$> liftIO (readIORef cacheRef)
|
||||
actionCache <- scActions <$> liftIO getSCFromRef'
|
||||
let asyncActions =
|
||||
Map.filter ((== ActionMutation ActionAsynchronous) . (^. aiDefinition . adType)) actionCache
|
||||
unless (Map.null asyncActions) $ do
|
||||
|
@ -8,26 +8,19 @@ module Hasura.Server.App
|
||||
HasuraApp (HasuraApp),
|
||||
MonadConfigApiHandler (..),
|
||||
MonadMetadataApiAuthorization (..),
|
||||
SchemaCacheRef (..),
|
||||
ServerCtx (scManager),
|
||||
boolToText,
|
||||
configApiGetHandler,
|
||||
getSCFromRef,
|
||||
initialiseCache,
|
||||
isAdminSecretSet,
|
||||
logInconsObjs,
|
||||
mkGetHandler,
|
||||
mkSpockAction,
|
||||
mkWaiApp,
|
||||
onlyAdmin,
|
||||
renderHtmlTemplate,
|
||||
withSCUpdate,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.Async.Lifted.Safe qualified as LA
|
||||
import Control.Concurrent.MVar.Lifted
|
||||
import Control.Concurrent.STM qualified as STM
|
||||
import Control.Exception (IOException, try)
|
||||
import Control.Monad.Stateless
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
@ -40,7 +33,6 @@ import Data.CaseInsensitive qualified as CI
|
||||
import Data.Environment qualified as Env
|
||||
import Data.HashMap.Strict qualified as M
|
||||
import Data.HashSet qualified as S
|
||||
import Data.IORef
|
||||
import Data.String (fromString)
|
||||
import Data.Text qualified as T
|
||||
import Data.Text.Conversions (convertText)
|
||||
@ -84,6 +76,12 @@ import Hasura.Server.Metrics (ServerMetrics)
|
||||
import Hasura.Server.Middleware (corsMiddleware)
|
||||
import Hasura.Server.OpenAPI (serveJSON)
|
||||
import Hasura.Server.Rest
|
||||
import Hasura.Server.SchemaCacheRef
|
||||
( SchemaCacheRef,
|
||||
getSchemaCache,
|
||||
readSchemaCacheRef,
|
||||
withSchemaCacheUpdate,
|
||||
)
|
||||
import Hasura.Server.Types
|
||||
import Hasura.Server.Utils
|
||||
import Hasura.Server.Version
|
||||
@ -103,25 +101,6 @@ import Text.Mustache qualified as M
|
||||
import Web.Spock.Core ((<//>))
|
||||
import Web.Spock.Core qualified as Spock
|
||||
|
||||
data SchemaCacheRef = SchemaCacheRef
|
||||
{ -- | The idea behind explicit locking here is to
|
||||
--
|
||||
-- 1. Allow maximum throughput for serving requests (/v1/graphql) (as each
|
||||
-- request reads the current schemacache)
|
||||
-- 2. We don't want to process more than one request at any point of time
|
||||
-- which would modify the schema cache as such queries are expensive.
|
||||
--
|
||||
-- Another option is to consider removing this lock in place of `_scrCache ::
|
||||
-- MVar ...` if it's okay or in fact correct to block during schema update in
|
||||
-- e.g. _wseGCtxMap. Vamshi says: It is theoretically possible to have a
|
||||
-- situation (in between building new schemacache and before writing it to
|
||||
-- the IORef) where we serve a request with a stale schemacache but I guess
|
||||
-- it is an okay trade-off to pay for a higher throughput (I remember doing a
|
||||
-- bunch of benchmarks to test this hypothesis).
|
||||
_scrLock :: MVar (),
|
||||
_scrCache :: IORef (RebuildableSchemaCache, SchemaCacheVer)
|
||||
}
|
||||
|
||||
data ServerCtx = ServerCtx
|
||||
{ scLogger :: !(L.Logger L.Hasura),
|
||||
scCacheRef :: !SchemaCacheRef,
|
||||
@ -177,49 +156,6 @@ isAdminSecretSet :: AuthMode -> Text
|
||||
isAdminSecretSet AMNoAuth = boolToText False
|
||||
isAdminSecretSet _ = boolToText True
|
||||
|
||||
getSCFromRef :: (MonadIO m) => SchemaCacheRef -> m SchemaCache
|
||||
getSCFromRef scRef = lastBuiltSchemaCache . fst <$> liftIO (readIORef $ _scrCache scRef)
|
||||
|
||||
logInconsObjs :: L.Logger L.Hasura -> [InconsistentMetadata] -> IO ()
|
||||
logInconsObjs logger objs =
|
||||
unless (null objs) $
|
||||
L.unLogger logger $ mkInconsMetadataLog objs
|
||||
|
||||
withSCUpdate ::
|
||||
(MonadIO m, MonadBaseControl IO m) =>
|
||||
SchemaCacheRef ->
|
||||
L.Logger L.Hasura ->
|
||||
Maybe (STM.TVar Bool) ->
|
||||
m (a, RebuildableSchemaCache) ->
|
||||
m a
|
||||
withSCUpdate scr logger mLogCheckerTVar action =
|
||||
withMVarMasked lk $ \() -> do
|
||||
(!res, !newSC) <- action
|
||||
liftIO $ do
|
||||
-- update schemacache in IO reference
|
||||
modifyIORef' cacheRef $ \(_, prevVer) ->
|
||||
let !newVer = incSchemaCacheVer prevVer
|
||||
in (newSC, newVer)
|
||||
|
||||
let inconsistentObjectsList = scInconsistentObjs $ lastBuiltSchemaCache newSC
|
||||
logInconsistentMetadata = logInconsObjs logger inconsistentObjectsList
|
||||
-- log any inconsistent objects only once and not everytime this method is called
|
||||
case mLogCheckerTVar of
|
||||
Nothing -> do logInconsistentMetadata
|
||||
Just logCheckerTVar -> do
|
||||
logCheck <- liftIO $ STM.readTVarIO logCheckerTVar
|
||||
if null inconsistentObjectsList && logCheck
|
||||
then do
|
||||
STM.atomically $ STM.writeTVar logCheckerTVar False
|
||||
else do
|
||||
when (not logCheck && not (null inconsistentObjectsList)) $ do
|
||||
STM.atomically $ STM.writeTVar logCheckerTVar True
|
||||
logInconsistentMetadata
|
||||
|
||||
return res
|
||||
where
|
||||
SchemaCacheRef lk cacheRef = scr
|
||||
|
||||
mkGetHandler :: Handler m (HttpLogMetadata m, APIResp) -> APIHandler m ()
|
||||
mkGetHandler = AHGet
|
||||
|
||||
@ -451,13 +387,13 @@ v1QueryHandler query = do
|
||||
(liftEitherM . authorizeV1QueryApi query) =<< ask
|
||||
scRef <- asks (scCacheRef . hcServerCtx)
|
||||
logger <- asks (scLogger . hcServerCtx)
|
||||
res <- bool (fst <$> (action logger)) (withSCUpdate scRef logger Nothing (action logger)) $ queryModifiesSchemaCache query
|
||||
res <- bool (fst <$> (action logger)) (withSchemaCacheUpdate scRef logger Nothing (action logger)) $ queryModifiesSchemaCache query
|
||||
return $ HttpResponse res []
|
||||
where
|
||||
action logger = do
|
||||
userInfo <- asks hcUser
|
||||
scRef <- asks (scCacheRef . hcServerCtx)
|
||||
schemaCache <- fmap fst $ liftIO $ readIORef $ _scrCache scRef
|
||||
schemaCache <- liftIO $ fst <$> readSchemaCacheRef scRef
|
||||
httpMgr <- asks (scManager . hcServerCtx)
|
||||
sqlGenCtx <- asks (scSQLGenCtx . hcServerCtx)
|
||||
instanceId <- asks (scInstanceId . hcServerCtx)
|
||||
@ -494,7 +430,7 @@ v1MetadataHandler query = do
|
||||
(liftEitherM . authorizeV1MetadataApi query) =<< ask
|
||||
userInfo <- asks hcUser
|
||||
scRef <- asks (scCacheRef . hcServerCtx)
|
||||
schemaCache <- fmap fst $ liftIO $ readIORef $ _scrCache scRef
|
||||
schemaCache <- liftIO $ fst <$> readSchemaCacheRef scRef
|
||||
httpMgr <- asks (scManager . hcServerCtx)
|
||||
_sccSQLGenCtx <- asks (scSQLGenCtx . hcServerCtx)
|
||||
env <- asks (scEnvironment . hcServerCtx)
|
||||
@ -508,7 +444,7 @@ v1MetadataHandler query = do
|
||||
_sccReadOnlyMode <- asks (scEnableReadOnlyMode . hcServerCtx)
|
||||
let serverConfigCtx = ServerConfigCtx {..}
|
||||
r <-
|
||||
withSCUpdate
|
||||
withSchemaCacheUpdate
|
||||
scRef
|
||||
logger
|
||||
Nothing
|
||||
@ -540,7 +476,7 @@ v2QueryHandler query = do
|
||||
scRef <- asks (scCacheRef . hcServerCtx)
|
||||
logger <- asks (scLogger . hcServerCtx)
|
||||
res <-
|
||||
bool (fst <$> dbAction) (withSCUpdate scRef logger Nothing dbAction) $
|
||||
bool (fst <$> dbAction) (withSchemaCacheUpdate scRef logger Nothing dbAction) $
|
||||
V2Q.queryModifiesSchema query
|
||||
return $ HttpResponse res []
|
||||
where
|
||||
@ -548,7 +484,7 @@ v2QueryHandler query = do
|
||||
dbAction = do
|
||||
userInfo <- asks hcUser
|
||||
scRef <- asks (scCacheRef . hcServerCtx)
|
||||
schemaCache <- fmap fst $ liftIO $ readIORef $ _scrCache scRef
|
||||
schemaCache <- liftIO $ fst <$> readSchemaCacheRef scRef
|
||||
httpMgr <- asks (scManager . hcServerCtx)
|
||||
sqlGenCtx <- asks (scSQLGenCtx . hcServerCtx)
|
||||
instanceId <- asks (scInstanceId . hcServerCtx)
|
||||
@ -601,7 +537,7 @@ mkExecutionContext ::
|
||||
mkExecutionContext = do
|
||||
manager <- asks (scManager . hcServerCtx)
|
||||
scRef <- asks (scCacheRef . hcServerCtx)
|
||||
(sc, scVer) <- liftIO $ readIORef $ _scrCache scRef
|
||||
(sc, scVer) <- liftIO $ readSchemaCacheRef scRef
|
||||
sqlGenCtx <- asks (scSQLGenCtx . hcServerCtx)
|
||||
enableAL <- asks (scEnableAllowlist . hcServerCtx)
|
||||
logger <- asks (scLogger . hcServerCtx)
|
||||
@ -658,7 +594,7 @@ gqlExplainHandler ::
|
||||
gqlExplainHandler query = do
|
||||
onlyAdmin
|
||||
scRef <- asks (scCacheRef . hcServerCtx)
|
||||
sc <- getSCFromRef scRef
|
||||
sc <- liftIO $ getSchemaCache scRef
|
||||
res <- GE.explainGQLQuery sc query
|
||||
return $ HttpResponse res []
|
||||
|
||||
@ -666,7 +602,7 @@ v1Alpha1PGDumpHandler :: (MonadIO m, MonadError QErr m, MonadReader HandlerCtx m
|
||||
v1Alpha1PGDumpHandler b = do
|
||||
onlyAdmin
|
||||
scRef <- asks (scCacheRef . hcServerCtx)
|
||||
sc <- getSCFromRef scRef
|
||||
sc <- liftIO $ getSchemaCache scRef
|
||||
let sources = scSources sc
|
||||
sourceName = PGD.prbSource b
|
||||
sourceConfig = unsafeSourceConfiguration @('Postgres 'Vanilla) =<< M.lookup sourceName sources
|
||||
@ -846,7 +782,7 @@ mkWaiApp
|
||||
experimentalFeatures
|
||||
enabledLogTypes
|
||||
wsConnInitTimeout = do
|
||||
let getSchemaCache = first lastBuiltSchemaCache <$> readIORef (_scrCache schemaCacheRef)
|
||||
let getSchemaCache' = first lastBuiltSchemaCache <$> readSchemaCacheRef schemaCacheRef
|
||||
|
||||
let corsPolicy = mkDefaultCorsPolicy corsCfg
|
||||
postPollHook = fromMaybe (EL.defaultLiveQueryPostPollHook logger) liveQueryHook
|
||||
@ -856,7 +792,7 @@ mkWaiApp
|
||||
WS.createWSServerEnv
|
||||
logger
|
||||
lqState
|
||||
getSchemaCache
|
||||
getSchemaCache'
|
||||
httpManager
|
||||
corsPolicy
|
||||
sqlGenCtx
|
||||
@ -901,13 +837,6 @@ mkWaiApp
|
||||
|
||||
return $ HasuraApp waiApp schemaCacheRef (EL._lqsAsyncActions lqState) stopWSServer
|
||||
|
||||
initialiseCache :: MonadIO m => RebuildableSchemaCache -> m SchemaCacheRef
|
||||
initialiseCache schemaCache = do
|
||||
cacheLock <- liftIO $ newMVar ()
|
||||
cacheCell <- liftIO $ newIORef (schemaCache, initSchemaCacheVer)
|
||||
let cacheRef = SchemaCacheRef cacheLock cacheCell
|
||||
pure cacheRef
|
||||
|
||||
httpApp ::
|
||||
forall m.
|
||||
( MonadIO m,
|
||||
@ -960,7 +889,7 @@ httpApp setupHook corsCfg serverCtx enableConsole consoleAssetsDir enableTelemet
|
||||
Spock.setStatus HTTP.status500 >> Spock.text errorMsg
|
||||
Right True -> do
|
||||
-- healthy
|
||||
sc <- getSCFromRef $ scCacheRef serverCtx
|
||||
sc <- liftIO $ getSchemaCache $ scCacheRef serverCtx
|
||||
let responseText =
|
||||
if null (scInconsistentObjs sc)
|
||||
then "OK"
|
||||
@ -994,7 +923,7 @@ httpApp setupHook corsCfg serverCtx enableConsole consoleAssetsDir enableTelemet
|
||||
Handler (Tracing.TraceT n) (HttpLogMetadata n, APIResp)
|
||||
customEndpointHandler restReq = do
|
||||
scRef <- asks (scCacheRef . hcServerCtx)
|
||||
endpoints <- scEndpoints <$> getSCFromRef scRef
|
||||
endpoints <- liftIO $ scEndpoints <$> getSchemaCache scRef
|
||||
execCtx <- mkExecutionContext
|
||||
env <- asks (scEnvironment . hcServerCtx)
|
||||
requestId <- asks hcRequestId
|
||||
@ -1106,7 +1035,7 @@ httpApp setupHook corsCfg serverCtx enableConsole consoleAssetsDir enableTelemet
|
||||
spockAction encodeQErr id $
|
||||
mkGetHandler $ do
|
||||
onlyAdmin
|
||||
sc <- getSCFromRef $ scCacheRef serverCtx
|
||||
sc <- liftIO $ getSchemaCache $ scCacheRef serverCtx
|
||||
let json = serveJSON sc
|
||||
return (emptyHttpLogMetadata @m, JSONResp $ HttpResponse (encJFromJValue json) [])
|
||||
|
||||
|
@ -61,6 +61,12 @@ data
|
||||
"event_queue_time"
|
||||
'DistributionType
|
||||
()
|
||||
-- | The current schema cache metadata resource version
|
||||
SchemaCacheMetadataResourceVersion ::
|
||||
ServerMetricsSpec
|
||||
"schema_cache_metadata_resource_version"
|
||||
'GaugeType
|
||||
()
|
||||
|
||||
-- | Mutable references for the server metrics. See `ServerMetricsSpec` for a
|
||||
-- description of each metric.
|
||||
@ -70,7 +76,8 @@ data ServerMetrics = ServerMetrics
|
||||
smActiveSubscriptions :: !Gauge,
|
||||
smNumEventsFetchedPerBatch :: !Distribution,
|
||||
smNumEventHTTPWorkers :: !Gauge,
|
||||
smEventQueueTime :: !Distribution
|
||||
smEventQueueTime :: !Distribution,
|
||||
smSchemaCacheMetadataResourceVersion :: !Gauge
|
||||
}
|
||||
|
||||
createServerMetrics :: Store ServerMetricsSpec -> IO ServerMetrics
|
||||
@ -81,4 +88,5 @@ createServerMetrics store = do
|
||||
smNumEventsFetchedPerBatch <- createDistribution NumEventsFetchedPerBatch () store
|
||||
smNumEventHTTPWorkers <- createGauge NumEventHTTPWorkers () store
|
||||
smEventQueueTime <- createDistribution EventQueueTime () store
|
||||
smSchemaCacheMetadataResourceVersion <- createGauge SchemaCacheMetadataResourceVersion () store
|
||||
pure ServerMetrics {..}
|
||||
|
131
server/src-lib/Hasura/Server/SchemaCacheRef.hs
Normal file
131
server/src-lib/Hasura/Server/SchemaCacheRef.hs
Normal file
@ -0,0 +1,131 @@
|
||||
{-# LANGUAGE CPP #-}
|
||||
|
||||
module Hasura.Server.SchemaCacheRef
|
||||
( SchemaCacheRef,
|
||||
initialiseSchemaCacheRef,
|
||||
withSchemaCacheUpdate,
|
||||
readSchemaCacheRef,
|
||||
getSchemaCache,
|
||||
|
||||
-- * Utility
|
||||
logInconsistentMetadata,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.MVar.Lifted
|
||||
import Control.Concurrent.STM qualified as STM
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Data.IORef
|
||||
import Hasura.Logging qualified as L
|
||||
import Hasura.Prelude hiding (get, put)
|
||||
import Hasura.RQL.DDL.Schema
|
||||
import Hasura.RQL.Types
|
||||
import Hasura.Server.Logging
|
||||
import Hasura.Server.Metrics
|
||||
( ServerMetrics (smSchemaCacheMetadataResourceVersion),
|
||||
)
|
||||
import System.Metrics.Gauge (Gauge)
|
||||
import System.Metrics.Gauge qualified as Gauge
|
||||
|
||||
-- | A mutable reference to a 'RebuildableSchemaCache', plus
|
||||
--
|
||||
-- * a write lock,
|
||||
-- * update version tracking, and
|
||||
-- * a gauge metric that tracks the metadata version of the 'SchemaCache'.
|
||||
data SchemaCacheRef = SchemaCacheRef
|
||||
{ -- | The idea behind explicit locking here is to
|
||||
--
|
||||
-- 1. Allow maximum throughput for serving requests (/v1/graphql) (as each
|
||||
-- request reads the current schemacache)
|
||||
-- 2. We don't want to process more than one request at any point of time
|
||||
-- which would modify the schema cache as such queries are expensive.
|
||||
--
|
||||
-- Another option is to consider removing this lock in place of `_scrCache ::
|
||||
-- MVar ...` if it's okay or in fact correct to block during schema update in
|
||||
-- e.g. _wseGCtxMap. Vamshi says: It is theoretically possible to have a
|
||||
-- situation (in between building new schemacache and before writing it to
|
||||
-- the IORef) where we serve a request with a stale schemacache but I guess
|
||||
-- it is an okay trade-off to pay for a higher throughput (I remember doing a
|
||||
-- bunch of benchmarks to test this hypothesis).
|
||||
_scrLock :: MVar (),
|
||||
_scrCache :: IORef (RebuildableSchemaCache, SchemaCacheVer),
|
||||
-- | The gauge metric that tracks the current metadata version.
|
||||
--
|
||||
-- Invariant: This gauge must be updated via 'updateMetadataVersionGauge'
|
||||
-- whenever the _scrCache IORef is updated.
|
||||
_scrMetadataVersionGauge :: Gauge
|
||||
}
|
||||
|
||||
-- | Build a new 'SchemaCacheRef'
|
||||
initialiseSchemaCacheRef ::
|
||||
MonadIO m => ServerMetrics -> RebuildableSchemaCache -> m SchemaCacheRef
|
||||
initialiseSchemaCacheRef serverMetrics schemaCache = liftIO $ do
|
||||
cacheLock <- newMVar ()
|
||||
cacheCell <- newIORef (schemaCache, initSchemaCacheVer)
|
||||
let metadataVersionGauge = smSchemaCacheMetadataResourceVersion serverMetrics
|
||||
updateMetadataVersionGauge metadataVersionGauge schemaCache
|
||||
pure $ SchemaCacheRef cacheLock cacheCell metadataVersionGauge
|
||||
|
||||
-- | Set the 'SchemaCacheRef' to the 'RebuildableSchemaCache' produced by the
|
||||
-- given action.
|
||||
--
|
||||
-- An internal lock ensures that at most one update to the 'SchemaCacheRef' may
|
||||
-- proceed at a time.
|
||||
withSchemaCacheUpdate ::
|
||||
(MonadIO m, MonadBaseControl IO m) =>
|
||||
SchemaCacheRef ->
|
||||
L.Logger L.Hasura ->
|
||||
Maybe (STM.TVar Bool) ->
|
||||
m (a, RebuildableSchemaCache) ->
|
||||
m a
|
||||
withSchemaCacheUpdate (SchemaCacheRef lock cacheRef metadataVersionGauge) logger mLogCheckerTVar action =
|
||||
withMVarMasked lock $ \() -> do
|
||||
(!res, !newSC) <- action
|
||||
liftIO $ do
|
||||
-- update schemacache in IO reference
|
||||
modifyIORef' cacheRef $ \(_, prevVer) ->
|
||||
let !newVer = incSchemaCacheVer prevVer
|
||||
in (newSC, newVer)
|
||||
|
||||
-- update metric with new metadata version
|
||||
updateMetadataVersionGauge metadataVersionGauge newSC
|
||||
|
||||
let inconsistentObjectsList = scInconsistentObjs $ lastBuiltSchemaCache newSC
|
||||
logInconsistentMetadata' = logInconsistentMetadata logger inconsistentObjectsList
|
||||
-- log any inconsistent objects only once and not everytime this method is called
|
||||
case mLogCheckerTVar of
|
||||
Nothing -> do logInconsistentMetadata'
|
||||
Just logCheckerTVar -> do
|
||||
logCheck <- liftIO $ STM.readTVarIO logCheckerTVar
|
||||
if null inconsistentObjectsList && logCheck
|
||||
then do
|
||||
STM.atomically $ STM.writeTVar logCheckerTVar False
|
||||
else do
|
||||
when (not logCheck && not (null inconsistentObjectsList)) $ do
|
||||
STM.atomically $ STM.writeTVar logCheckerTVar True
|
||||
logInconsistentMetadata'
|
||||
|
||||
return res
|
||||
|
||||
-- | Read the contents of the 'SchemaCacheRef'
|
||||
readSchemaCacheRef :: SchemaCacheRef -> IO (RebuildableSchemaCache, SchemaCacheVer)
|
||||
readSchemaCacheRef scRef = readIORef $ _scrCache scRef
|
||||
|
||||
-- | Utility function. Read the latest 'SchemaCache' from the 'SchemaCacheRef'.
|
||||
--
|
||||
-- > getSchemaCache == fmap (lastBuiltSchemaCache . fst) . readSchemaCacheRef
|
||||
getSchemaCache :: SchemaCacheRef -> IO SchemaCache
|
||||
getSchemaCache scRef = lastBuiltSchemaCache . fst <$> readSchemaCacheRef scRef
|
||||
|
||||
-- | Utility function
|
||||
logInconsistentMetadata :: L.Logger L.Hasura -> [InconsistentMetadata] -> IO ()
|
||||
logInconsistentMetadata logger objs =
|
||||
unless (null objs) $
|
||||
L.unLogger logger $ mkInconsMetadataLog objs
|
||||
|
||||
-- Internal helper. Set the gague metric to the metadata version of the schema
|
||||
-- cache, if it exists.
|
||||
updateMetadataVersionGauge :: MonadIO m => Gauge -> RebuildableSchemaCache -> m ()
|
||||
updateMetadataVersionGauge metadataVersionGauge schemaCache = do
|
||||
let metadataVersion = scMetadataResourceVersion . lastBuiltSchemaCache $ schemaCache
|
||||
liftIO $ traverse_ (Gauge.set metadataVersionGauge . getMetadataResourceVersion) metadataVersion
|
@ -19,7 +19,6 @@ import Data.Aeson.Casing
|
||||
import Data.Aeson.TH
|
||||
import Data.HashMap.Strict qualified as HM
|
||||
import Data.HashSet qualified as HS
|
||||
import Data.IORef
|
||||
import Database.PG.Query qualified as Q
|
||||
import Hasura.Base.Error
|
||||
import Hasura.Logging
|
||||
@ -29,8 +28,12 @@ import Hasura.RQL.DDL.Schema (runCacheRWT)
|
||||
import Hasura.RQL.DDL.Schema.Catalog
|
||||
import Hasura.RQL.Types
|
||||
import Hasura.RQL.Types.Run
|
||||
import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate)
|
||||
import Hasura.Server.Logging
|
||||
import Hasura.Server.SchemaCacheRef
|
||||
( SchemaCacheRef,
|
||||
readSchemaCacheRef,
|
||||
withSchemaCacheUpdate,
|
||||
)
|
||||
import Hasura.Server.Types (InstanceId (..))
|
||||
import Hasura.Session
|
||||
import Network.HTTP.Client qualified as HTTP
|
||||
@ -318,8 +321,8 @@ refreshSchemaCache
|
||||
serverConfigCtx
|
||||
logTVar = do
|
||||
respErr <- runExceptT $
|
||||
withSCUpdate cacheRef logger (Just logTVar) $ do
|
||||
rebuildableCache <- fst <$> liftIO (readIORef $ _scrCache cacheRef)
|
||||
withSchemaCacheUpdate cacheRef logger (Just logTVar) $ do
|
||||
rebuildableCache <- liftIO $ fst <$> readSchemaCacheRef cacheRef
|
||||
(msg, cache, _) <- peelRun runCtx $
|
||||
runCacheRWT rebuildableCache $ do
|
||||
schemaCache <- askSchemaCache
|
||||
|
@ -217,7 +217,7 @@ runApp serveOptions = do
|
||||
serverMetrics <-
|
||||
liftIO $ createServerMetrics $ EKG.subset ServerSubset store
|
||||
pure (EKG.subset EKG.emptyOf store, serverMetrics)
|
||||
runManagedT (App.initialiseServeCtx env globalCtx serveOptions) $ \serveCtx ->
|
||||
runManagedT (App.initialiseServeCtx env globalCtx serveOptions serverMetrics) $ \serveCtx ->
|
||||
do
|
||||
let Loggers _ _logger pgLogger = _scLoggers serveCtx
|
||||
flip App.runPGMetadataStorageAppT (_scMetadataDbPool serveCtx, pgLogger)
|
||||
|
@ -264,7 +264,7 @@ withHasuraTestApp metadataDbUrl action = do
|
||||
|
||||
flip runTestM testConfig $
|
||||
lowerManagedT $ do
|
||||
serveCtx <- initialiseServeCtx env globalCtx serveOptions
|
||||
serveCtx <- initialiseServeCtx env globalCtx serveOptions serverMetrics
|
||||
waiApp <-
|
||||
mkHGEServer
|
||||
setupHook
|
||||
|
Loading…
Reference in New Issue
Block a user