mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-18 13:02:11 +03:00
7aa341944b
## Description This PR is a incremental step towards achieving the goal of #8344. It is a less ambitious version of #8484. This PR removes all references to `HasServerConfigCtx` from the cache build and removes `ServerConfigCtx` from `CacheBuildParams`, making `ServerConfigCtx` an argument being passed around manually instead. This has several benefits: by making it an arrow argument, we now properly integrate the fields that change over time in the dependency framework, as they should be, and we can clean up some of the top-level app code. ## Implementation In practice, this PR introduces a `HasServerConfigCtx` instance for `CacheRWT`, the monad we use to build the cache, so we can retrieve the `ServerConfigCtx` in the implementation of `CacheRWM`. This contributes to reducing the amount of `HasServerConfigCtx` in the code: we can remove `SchemaUpdateT` altogether, and we can remove the `HasServerConfigCtx` instance of `Handler`. This makes `HasServerConfigCtx` almost **an implementation detail of the Metadata API**. This first step is enough to achieve the goal of #8344: we can now build the schema cache in the app monad, since we no longer rely on `HasServerConfigCtx` to build it. ## Drawbacks This PR does not attempt to remove the use of `ServerConfigCtx` itself in the schema cache build: doing so would make this PR much much bigger. Ideally, to avoid having all the static fields given as arrow-ish arguments to the cache, we could depend on `HasAppEnv` in the cache build, and use `AppContext` as an arrow argument. But making the cache build depend on the full `AppEnv` and `AppContext` creates a lot of circular imports; and since removing `ServerConfigCtx` itself isn't required to achieve #8344, this PR keeps it wholesale and defers cleaning it to a future PR. A negative consequence of this is that we need an `Eq` instance on `ServerConfigCtx`, and that instance is inelegant. ## Future work There are several further steps we can take in parallel after this is merged. First, again, we can make a new version of #8344, removing `CacheBuild`, FINALLY. As for `ServerConfigCtx`, we can split it / rename it to make ad-hoc structures. If it turns out that `ServerConfigCtx` is only ever used for the schema cache build, we could split it between `CacheBuildEnv` and `CacheBuildContext`, which will be subsets of `AppEnv` and `AppContext`, avoiding import loops. PR-URL: https://github.com/hasura/graphql-engine-mono/pull/8509 GitOrigin-RevId: 01b37cc3fd3490d6b117701e22fc4ac88b62b6b5
375 lines
14 KiB
Haskell
375 lines
14 KiB
Haskell
{-# LANGUAGE CPP #-}
|
|
{-# LANGUAGE TemplateHaskell #-}
|
|
|
|
module Hasura.Server.SchemaUpdate
|
|
( startSchemaSyncListenerThread,
|
|
startSchemaSyncProcessorThread,
|
|
SchemaSyncThreadType (..),
|
|
)
|
|
where
|
|
|
|
import Control.Concurrent.Extended qualified as C
|
|
import Control.Concurrent.STM qualified as STM
|
|
import Control.Immortal qualified as Immortal
|
|
import Control.Monad.Loops qualified as L
|
|
import Control.Monad.Trans.Control (MonadBaseControl)
|
|
import Control.Monad.Trans.Managed (ManagedT)
|
|
import Data.Aeson
|
|
import Data.Aeson.Casing
|
|
import Data.Aeson.TH
|
|
import Data.HashMap.Strict qualified as HM
|
|
import Data.HashSet qualified as HS
|
|
import Data.Text qualified as T
|
|
import Database.PG.Query qualified as PG
|
|
import Hasura.App.State
|
|
import Hasura.Base.Error
|
|
import Hasura.Logging
|
|
import Hasura.Metadata.Class
|
|
import Hasura.Prelude
|
|
import Hasura.RQL.DDL.Schema (runCacheRWT)
|
|
import Hasura.RQL.DDL.Schema.Catalog
|
|
import Hasura.RQL.Types.SchemaCache
|
|
import Hasura.RQL.Types.SchemaCache.Build
|
|
import Hasura.RQL.Types.Source
|
|
import Hasura.SQL.Backend (BackendType (..))
|
|
import Hasura.SQL.BackendMap qualified as BackendMap
|
|
import Hasura.Server.AppStateRef
|
|
( AppStateRef,
|
|
getAppContext,
|
|
readSchemaCacheRef,
|
|
withSchemaCacheUpdate,
|
|
)
|
|
import Hasura.Server.Logging
|
|
import Hasura.Server.Types
|
|
import Hasura.Services
|
|
import Refined (NonNegative, Refined, unrefine)
|
|
|
|
data ThreadError
|
|
= TEPayloadParse !Text
|
|
| TEQueryError !QErr
|
|
|
|
$( deriveToJSON
|
|
defaultOptions
|
|
{ constructorTagModifier = snakeCase . drop 2,
|
|
sumEncoding = TaggedObject "type" "info"
|
|
}
|
|
''ThreadError
|
|
)
|
|
|
|
logThreadStarted ::
|
|
(MonadIO m) =>
|
|
Logger Hasura ->
|
|
InstanceId ->
|
|
SchemaSyncThreadType ->
|
|
Immortal.Thread ->
|
|
m ()
|
|
logThreadStarted logger instanceId threadType thread =
|
|
let msg = tshow threadType <> " thread started"
|
|
in unLogger logger $
|
|
StartupLog LevelInfo "schema-sync" $
|
|
object
|
|
[ "instance_id" .= getInstanceId instanceId,
|
|
"thread_id" .= show (Immortal.threadId thread),
|
|
"message" .= msg
|
|
]
|
|
|
|
{- Note [Schema Cache Sync]
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
When multiple graphql-engine instances are serving on same metadata storage,
|
|
each instance should have schema cache in sync with latest metadata. Somehow
|
|
all instances should communicate each other when any request has modified metadata.
|
|
|
|
We track the metadata schema version in postgres and poll for this
|
|
value in a thread. When the schema version has changed, the instance
|
|
will update its local metadata schema and remove any invalidated schema cache data.
|
|
|
|
The following steps take place when an API request made to update metadata:
|
|
|
|
1. After handling the request we insert the new metadata schema json
|
|
into a postgres tablealong with a schema version.
|
|
|
|
2. On start up, before initialising schema cache, an async thread is
|
|
invoked to continuously poll the Postgres notifications table for
|
|
the latest metadata schema version. The schema version is pushed to
|
|
a shared `TMVar`.
|
|
|
|
3. Before starting API server, another async thread is invoked to
|
|
process events pushed by the listener thread via the `TMVar`. If
|
|
the instance's schema version is not current with the freshly
|
|
updated TMVar version then we update the local metadata.
|
|
|
|
Why we need two threads if we can capture and reload schema cache in a single thread?
|
|
|
|
If we want to implement schema sync in a single async thread we have to invoke the same
|
|
after initialising schema cache. We may loose events that published after schema cache
|
|
init and before invoking the thread. In such case, schema cache is not in sync with metadata.
|
|
So we choose two threads in which one will start listening before schema cache init and the
|
|
other after it.
|
|
|
|
What happens if listen connection to Postgres is lost?
|
|
|
|
Listener thread will keep trying to establish connection to Postgres for every one second.
|
|
Once connection established, it pushes @'SSEListenStart' event with time. We aren't sure
|
|
about any metadata modify requests made in meanwhile. So we reload schema cache unconditionally
|
|
if listen started after schema cache init start time.
|
|
|
|
-}
|
|
|
|
-- | An async thread which listen to Postgres notify to enable schema syncing
|
|
-- See Note [Schema Cache Sync]
|
|
startSchemaSyncListenerThread ::
|
|
C.ForkableMonadIO m =>
|
|
Logger Hasura ->
|
|
PG.PGPool ->
|
|
InstanceId ->
|
|
Refined NonNegative Milliseconds ->
|
|
STM.TMVar MetadataResourceVersion ->
|
|
ManagedT m (Immortal.Thread)
|
|
startSchemaSyncListenerThread logger pool instanceId interval metaVersionRef = do
|
|
-- Start listener thread
|
|
listenerThread <-
|
|
C.forkManagedT "SchemeUpdate.listener" logger $
|
|
listener logger pool metaVersionRef (unrefine interval)
|
|
logThreadStarted logger instanceId TTListener listenerThread
|
|
pure listenerThread
|
|
|
|
-- | An async thread which processes the schema sync events
|
|
-- See Note [Schema Cache Sync]
|
|
startSchemaSyncProcessorThread ::
|
|
( C.ForkableMonadIO m,
|
|
HasAppEnv m,
|
|
MonadMetadataStorage m,
|
|
MonadResolveSource m,
|
|
ProvidesNetwork m
|
|
) =>
|
|
AppStateRef impl ->
|
|
STM.TVar Bool ->
|
|
ManagedT m Immortal.Thread
|
|
startSchemaSyncProcessorThread appStateRef logTVar = do
|
|
AppEnv {..} <- lift askAppEnv
|
|
let logger = _lsLogger appEnvLoggers
|
|
-- Start processor thread
|
|
processorThread <-
|
|
C.forkManagedT "SchemeUpdate.processor" logger $
|
|
processor appEnvMetadataVersionRef appStateRef logTVar
|
|
logThreadStarted logger appEnvInstanceId TTProcessor processorThread
|
|
pure processorThread
|
|
|
|
-- TODO: This is also defined in multitenant, consider putting it in a library somewhere
|
|
forcePut :: STM.TMVar a -> a -> IO ()
|
|
forcePut v a = STM.atomically $ STM.tryTakeTMVar v >> STM.putTMVar v a
|
|
|
|
schemaVersionCheckHandler ::
|
|
PG.PGPool -> STM.TMVar MetadataResourceVersion -> IO (Either QErr ())
|
|
schemaVersionCheckHandler pool metaVersionRef =
|
|
runExceptT
|
|
( PG.runTx pool (PG.RepeatableRead, Nothing) $
|
|
fetchMetadataResourceVersionFromCatalog
|
|
)
|
|
>>= \case
|
|
Right version -> Right <$> forcePut metaVersionRef version
|
|
Left err -> pure $ Left err
|
|
|
|
data ErrorState = ErrorState
|
|
{ _esLastErrorSeen :: !(Maybe QErr),
|
|
_esLastMetadataVersion :: !(Maybe MetadataResourceVersion)
|
|
}
|
|
deriving (Eq)
|
|
|
|
-- NOTE: The ErrorState type is to be used mainly for the `listener` method below.
|
|
-- This will help prevent logging the same error with the same MetadataResourceVersion
|
|
-- multiple times consecutively. When the `listener` is in ErrorState we don't log the
|
|
-- next error until the resource version has changed/updated.
|
|
|
|
defaultErrorState :: ErrorState
|
|
defaultErrorState = ErrorState Nothing Nothing
|
|
|
|
-- | NOTE: this can be updated to use lenses
|
|
updateErrorInState :: ErrorState -> QErr -> MetadataResourceVersion -> ErrorState
|
|
updateErrorInState es qerr mrv =
|
|
es
|
|
{ _esLastErrorSeen = Just qerr,
|
|
_esLastMetadataVersion = Just mrv
|
|
}
|
|
|
|
isInErrorState :: ErrorState -> Bool
|
|
isInErrorState es =
|
|
(isJust . _esLastErrorSeen) es && (isJust . _esLastMetadataVersion) es
|
|
|
|
toLogError :: ErrorState -> QErr -> MetadataResourceVersion -> Bool
|
|
toLogError es qerr mrv = not $ isQErrLastSeen || isMetadataResourceVersionLastSeen
|
|
where
|
|
isQErrLastSeen = case _esLastErrorSeen es of
|
|
Just lErrS -> lErrS == qerr
|
|
Nothing -> False
|
|
|
|
isMetadataResourceVersionLastSeen = case _esLastMetadataVersion es of
|
|
Just lMRV -> lMRV == mrv
|
|
Nothing -> False
|
|
|
|
-- | An IO action that listens to postgres for events and pushes them to a Queue, in a loop forever.
|
|
listener ::
|
|
MonadIO m =>
|
|
Logger Hasura ->
|
|
PG.PGPool ->
|
|
STM.TMVar MetadataResourceVersion ->
|
|
Milliseconds ->
|
|
m void
|
|
listener logger pool metaVersionRef interval = L.iterateM_ listenerLoop defaultErrorState
|
|
where
|
|
listenerLoop errorState = do
|
|
mrv <- liftIO $ STM.atomically $ STM.tryTakeTMVar metaVersionRef
|
|
resp <- liftIO $ schemaVersionCheckHandler pool metaVersionRef
|
|
let metadataVersion = fromMaybe initialResourceVersion mrv
|
|
nextErr <- case resp of
|
|
Left respErr -> do
|
|
if (toLogError errorState respErr metadataVersion)
|
|
then do
|
|
logError logger TTListener $ TEQueryError respErr
|
|
logInfo logger TTListener $ object ["metadataResourceVersion" .= toJSON metadataVersion]
|
|
pure $ updateErrorInState errorState respErr metadataVersion
|
|
else do
|
|
pure errorState
|
|
Right _ -> do
|
|
when (isInErrorState errorState) $
|
|
logInfo logger TTListener $
|
|
object ["message" .= ("SchemaSync Restored..." :: Text)]
|
|
pure defaultErrorState
|
|
liftIO $ C.sleep $ milliseconds interval
|
|
pure nextErr
|
|
|
|
-- | An IO action that processes events from Queue, in a loop forever.
|
|
processor ::
|
|
forall m void impl.
|
|
( C.ForkableMonadIO m,
|
|
HasAppEnv m,
|
|
MonadMetadataStorage m,
|
|
MonadResolveSource m,
|
|
ProvidesNetwork m
|
|
) =>
|
|
STM.TMVar MetadataResourceVersion ->
|
|
AppStateRef impl ->
|
|
STM.TVar Bool ->
|
|
m void
|
|
processor
|
|
metaVersionRef
|
|
appStateRef
|
|
logTVar = forever do
|
|
metaVersion <- liftIO $ STM.atomically $ STM.takeTMVar metaVersionRef
|
|
refreshSchemaCache metaVersion appStateRef TTProcessor logTVar
|
|
|
|
refreshSchemaCache ::
|
|
( MonadIO m,
|
|
MonadBaseControl IO m,
|
|
HasAppEnv m,
|
|
MonadMetadataStorage m,
|
|
MonadResolveSource m,
|
|
ProvidesNetwork m
|
|
) =>
|
|
MetadataResourceVersion ->
|
|
AppStateRef impl ->
|
|
SchemaSyncThreadType ->
|
|
STM.TVar Bool ->
|
|
m ()
|
|
refreshSchemaCache
|
|
resourceVersion
|
|
appStateRef
|
|
threadType
|
|
logTVar = do
|
|
appEnv@AppEnv {..} <- askAppEnv
|
|
let logger = _lsLogger appEnvLoggers
|
|
respErr <- runExceptT $
|
|
withSchemaCacheUpdate appStateRef logger (Just logTVar) $ do
|
|
rebuildableCache <- liftIO $ fst <$> readSchemaCacheRef appStateRef
|
|
appContext <- liftIO $ getAppContext appStateRef
|
|
let serverConfigCtx = buildServerConfigCtx appEnv appContext
|
|
(msg, cache, _) <-
|
|
runCacheRWT serverConfigCtx rebuildableCache $ do
|
|
schemaCache <- askSchemaCache
|
|
case scMetadataResourceVersion schemaCache of
|
|
-- While starting up, the metadata resource version is set to nothing, so we want to set the version
|
|
-- without fetching the database metadata (as we have already fetched it during the startup, so, we
|
|
-- skip fetching it twice)
|
|
Nothing -> do
|
|
setMetadataResourceVersionInSchemaCache resourceVersion
|
|
logInfo logger threadType $
|
|
String $
|
|
T.unwords
|
|
[ "Received metadata resource version:",
|
|
showMetadataResourceVersion resourceVersion,
|
|
"as an initial version. Not updating the schema cache."
|
|
]
|
|
Just engineResourceVersion ->
|
|
unless (engineResourceVersion == resourceVersion) $ do
|
|
logInfo logger threadType $
|
|
String $
|
|
T.unwords
|
|
[ "Received metadata resource version:",
|
|
showMetadataResourceVersion resourceVersion <> ",",
|
|
"different from the current engine resource version:",
|
|
showMetadataResourceVersion engineResourceVersion <> ".",
|
|
"Trying to update the schema cache."
|
|
]
|
|
|
|
(metadata, latestResourceVersion) <- liftEitherM fetchMetadata
|
|
|
|
logInfo logger threadType $
|
|
String $
|
|
T.unwords
|
|
[ "Fetched metadata with resource version:",
|
|
showMetadataResourceVersion latestResourceVersion
|
|
]
|
|
|
|
notifications <- liftEitherM $ fetchMetadataNotifications engineResourceVersion appEnvInstanceId
|
|
|
|
case notifications of
|
|
[] -> do
|
|
logInfo logger threadType $
|
|
String $
|
|
T.unwords
|
|
[ "Fetched metadata notifications and received no notifications. Not updating the schema cache.",
|
|
"Only setting resource version:",
|
|
showMetadataResourceVersion latestResourceVersion,
|
|
"in schema cache"
|
|
]
|
|
setMetadataResourceVersionInSchemaCache latestResourceVersion
|
|
_ -> do
|
|
logInfo logger threadType $
|
|
String "Fetched metadata notifications and received some notifications. Updating the schema cache."
|
|
let cacheInvalidations =
|
|
if any ((== (engineResourceVersion + 1)) . fst) notifications
|
|
then -- If (engineResourceVersion + 1) is in the list of notifications then
|
|
-- we know that we haven't missed any.
|
|
mconcat $ snd <$> notifications
|
|
else -- Otherwise we may have missed some notifications so we need to invalidate the
|
|
-- whole cache.
|
|
|
|
CacheInvalidations
|
|
{ ciMetadata = True,
|
|
ciRemoteSchemas = HS.fromList $ getAllRemoteSchemas schemaCache,
|
|
ciSources = HS.fromList $ HM.keys $ scSources schemaCache,
|
|
ciDataConnectors =
|
|
maybe mempty (HS.fromList . HM.keys . unBackendInfoWrapper) $
|
|
BackendMap.lookup @'DataConnector $
|
|
scBackendCache schemaCache
|
|
}
|
|
buildSchemaCacheWithOptions CatalogSync cacheInvalidations metadata
|
|
setMetadataResourceVersionInSchemaCache latestResourceVersion
|
|
logInfo logger threadType $
|
|
String $
|
|
"Schema cache updated with resource version: " <> showMetadataResourceVersion latestResourceVersion
|
|
pure (msg, cache)
|
|
onLeft respErr (logError logger threadType . TEQueryError)
|
|
|
|
logInfo :: (MonadIO m) => Logger Hasura -> SchemaSyncThreadType -> Value -> m ()
|
|
logInfo logger threadType val =
|
|
unLogger logger $
|
|
SchemaSyncLog LevelInfo threadType val
|
|
|
|
logError :: (MonadIO m, ToJSON a) => Logger Hasura -> SchemaSyncThreadType -> a -> m ()
|
|
logError logger threadType err =
|
|
unLogger logger $
|
|
SchemaSyncLog LevelError threadType $
|
|
object ["error" .= toJSON err]
|