mirror of
https://github.com/hasura/graphql-engine.git
synced 2025-01-05 14:27:59 +03:00
59c01786fe
Add optimistic concurrency control to the ‘replace_metadata’ call. Prevents users from submitting out-of-date metadata to metadata-mutating APIs. See https://github.com/hasura/graphql-engine-mono/issues/472 for details. GitOrigin-RevId: 5f220f347a3eba288a9098b01e9913ffd7e38166
338 lines
12 KiB
Haskell
338 lines
12 KiB
Haskell
{-# LANGUAGE CPP #-}
|
|
module Hasura.Server.SchemaUpdate
|
|
( startSchemaSyncListenerThread
|
|
, startSchemaSyncProcessorThread
|
|
, EventPayload(..)
|
|
, SchemaSyncCtx(..)
|
|
, SchemaSyncEvent(..)
|
|
, SchemaSyncEventRef
|
|
)
|
|
where
|
|
|
|
import Hasura.Logging
|
|
import Hasura.Metadata.Class
|
|
import Hasura.Prelude
|
|
import Hasura.RQL.DDL.Schema (runCacheRWT)
|
|
import Hasura.RQL.Types
|
|
import Hasura.RQL.Types.Run
|
|
import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate)
|
|
import Hasura.Server.Logging
|
|
import Hasura.Server.Types (InstanceId (..))
|
|
import Hasura.Session
|
|
|
|
import Control.Monad.Trans.Control (MonadBaseControl)
|
|
import Control.Monad.Trans.Managed (ManagedT)
|
|
import Data.Aeson
|
|
import Data.Aeson.Casing
|
|
import Data.Aeson.TH
|
|
import Data.IORef
|
|
#ifndef PROFILING
|
|
import GHC.AssertNF
|
|
#endif
|
|
|
|
import qualified Control.Concurrent.Extended as C
|
|
import qualified Control.Concurrent.STM as STM
|
|
import qualified Control.Immortal as Immortal
|
|
import qualified Data.Text as T
|
|
import qualified Data.Time as UTC
|
|
import qualified Database.PG.Query as PG
|
|
import qualified Database.PostgreSQL.LibPQ as PQ
|
|
import qualified Network.HTTP.Client as HTTP
|
|
|
|
pgChannel :: PG.PGChannel
|
|
pgChannel = "hasura_schema_update"
|
|
|
|
data ThreadType
|
|
= TTListener
|
|
| TTProcessor
|
|
deriving (Eq)
|
|
|
|
instance Show ThreadType where
|
|
show TTListener = "listener"
|
|
show TTProcessor = "processor"
|
|
|
|
|
|
data SchemaSyncThreadLog
|
|
= SchemaSyncThreadLog
|
|
{ suelLogLevel :: !LogLevel
|
|
, suelThreadType :: !ThreadType
|
|
, suelInfo :: !Value
|
|
} deriving (Show, Eq)
|
|
|
|
instance ToJSON SchemaSyncThreadLog where
|
|
toJSON (SchemaSyncThreadLog _ t info) =
|
|
object [ "thread_type" .= show t
|
|
, "info" .= info
|
|
]
|
|
|
|
instance ToEngineLog SchemaSyncThreadLog Hasura where
|
|
toEngineLog threadLog =
|
|
(suelLogLevel threadLog, ELTInternal ILTSchemaSyncThread, toJSON threadLog)
|
|
|
|
data EventPayload
|
|
= EventPayload
|
|
{ _epInstanceId :: !InstanceId
|
|
, _epOccurredAt :: !UTC.UTCTime
|
|
, _epInvalidations :: !CacheInvalidations
|
|
}
|
|
$(deriveJSON hasuraJSON ''EventPayload)
|
|
|
|
data SchemaSyncEvent
|
|
= SSEListenStart !UTC.UTCTime
|
|
| SSEPayload !Value
|
|
|
|
instance ToJSON SchemaSyncEvent where
|
|
toJSON = \case
|
|
SSEListenStart time -> String $ "event listening started at " <> tshow time
|
|
SSEPayload payload -> toJSON payload
|
|
|
|
data ThreadError
|
|
= TEPayloadParse !Text
|
|
| TEQueryError !QErr
|
|
$(deriveToJSON
|
|
defaultOptions { constructorTagModifier = snakeCase . drop 2
|
|
, sumEncoding = TaggedObject "type" "info"
|
|
}
|
|
''ThreadError)
|
|
|
|
type SchemaSyncEventRef = STM.TVar (Maybe SchemaSyncEvent)
|
|
|
|
-- | Context required for schema syncing. Listener thread id,
|
|
-- event references and cache init time
|
|
data SchemaSyncCtx
|
|
= SchemaSyncCtx
|
|
{ _sscListenerThreadId :: !Immortal.Thread
|
|
, _sscSyncEventRef :: !SchemaSyncEventRef
|
|
, _sscCacheInitStartTime :: !UTC.UTCTime
|
|
}
|
|
|
|
logThreadStarted
|
|
:: (MonadIO m)
|
|
=> Logger Hasura -> InstanceId -> ThreadType -> Immortal.Thread -> m ()
|
|
logThreadStarted logger instanceId threadType thread =
|
|
let msg = tshow threadType <> " thread started"
|
|
in unLogger logger $
|
|
StartupLog LevelInfo "schema-sync" $
|
|
object [ "instance_id" .= getInstanceId instanceId
|
|
, "thread_id" .= show (Immortal.threadId thread)
|
|
, "message" .= msg
|
|
]
|
|
|
|
{- Note [Schema Cache Sync]
|
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
|
|
|
When multiple graphql-engine instances are serving on same metadata storage,
|
|
each instance should have schema cache in sync with latest metadata. Somehow
|
|
all instances should communicate each other when any request has modified metadata.
|
|
We make use of Postgres listen-notify to notify metadata updates and listen to those
|
|
events. Each instance is identified by an @'InstanceId'. We choose 'hasura_schema_update'
|
|
as our notify channel. Following steps take place when an API request made to update
|
|
metadata:
|
|
|
|
1. After handling the request we publish an event via a postgres channel with payload
|
|
containing instance id and other information to build schema cache.
|
|
See @'notifySchemaCacheSync'.
|
|
|
|
2. On start up, before initialising schema cache, an async thread is invoked to
|
|
continuously listen to Postgres events on 'hasura_schema_update' channel. The payload
|
|
present in the event is decoded and pushed to a shared @'SchemaSyncEventRef' reference.
|
|
See @'startSchemaSyncListenerThread'.
|
|
|
|
3. Before starting API server, another async thread is invoked to process events pushed
|
|
by the listener thread via @'SchemaSyncEventRef' reference. Based on the event instance id
|
|
or listen start time we decide to reload schema cache or not.
|
|
See @'startSchemaSyncProcessorThread'.
|
|
|
|
Why we need two threads if we can capture and reload schema cache in a single thread?
|
|
|
|
If we want to implement schema sync in a single async thread we have to invoke the same
|
|
after initialising schema cache. We may loose events that published after schema cache
|
|
init and before invoking the thread. In such case, schema cache is not in sync with metadata.
|
|
So we choose two threads in which one will start listening before schema cache init and the
|
|
other after it.
|
|
|
|
What happens if listen connection to Postgres is lost?
|
|
|
|
Listener thread will keep trying to establish connection to Postgres for every one second.
|
|
Once connection established, it pushes @'SSEListenStart' event with time. We aren't sure
|
|
about any metadata modify requests made in meanwhile. So we reload schema cache unconditionally
|
|
if listen started after schema cache init start time.
|
|
|
|
-}
|
|
|
|
-- | An async thread which listen to Postgres notify to enable schema syncing
|
|
-- See Note [Schema Cache Sync]
|
|
startSchemaSyncListenerThread
|
|
:: C.ForkableMonadIO m
|
|
=> PG.PGPool
|
|
-> Logger Hasura
|
|
-> InstanceId
|
|
-> ManagedT m (Immortal.Thread, SchemaSyncEventRef)
|
|
startSchemaSyncListenerThread pool logger instanceId = do
|
|
-- only the latest event is recorded here
|
|
-- we don't want to store and process all the events, only the latest event
|
|
schemaSyncEventRef <- liftIO $ STM.newTVarIO Nothing
|
|
|
|
-- Start listener thread
|
|
listenerThread <- C.forkManagedT "SchemeUpdate.listener" logger . liftIO $
|
|
listener pool logger schemaSyncEventRef
|
|
logThreadStarted logger instanceId TTListener listenerThread
|
|
pure (listenerThread, schemaSyncEventRef)
|
|
|
|
-- | An async thread which processes the schema sync events
|
|
-- See Note [Schema Cache Sync]
|
|
startSchemaSyncProcessorThread
|
|
:: ( C.ForkableMonadIO m
|
|
, MonadMetadataStorage (MetadataStorageT m)
|
|
, MonadResolveSource m
|
|
)
|
|
=> Logger Hasura
|
|
-> HTTP.Manager
|
|
-> SchemaSyncEventRef
|
|
-> SchemaCacheRef
|
|
-> InstanceId
|
|
-> UTC.UTCTime
|
|
-> ServerConfigCtx
|
|
-> ManagedT m Immortal.Thread
|
|
startSchemaSyncProcessorThread logger httpMgr
|
|
schemaSyncEventRef cacheRef instanceId cacheInitStartTime serverConfigCtx = do
|
|
-- Start processor thread
|
|
processorThread <- C.forkManagedT "SchemeUpdate.processor" logger $
|
|
processor logger httpMgr schemaSyncEventRef
|
|
cacheRef instanceId cacheInitStartTime serverConfigCtx
|
|
logThreadStarted logger instanceId TTProcessor processorThread
|
|
pure processorThread
|
|
|
|
-- | An IO action that listens to postgres for events and pushes them to a Queue, in a loop forever.
|
|
listener
|
|
:: PG.PGPool
|
|
-> Logger Hasura
|
|
-> SchemaSyncEventRef
|
|
-> IO void
|
|
listener pool logger updateEventRef =
|
|
-- Never exits
|
|
forever $ do
|
|
listenResE <-
|
|
liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler
|
|
onLeft listenResE onError
|
|
logWarn
|
|
-- Trying to start listening after one second.
|
|
-- See Note [Schema Cache Sync].
|
|
C.sleep $ seconds 1
|
|
where
|
|
threadType = TTListener
|
|
|
|
notifyHandler = \case
|
|
PG.PNEOnStart -> do
|
|
time <- UTC.getCurrentTime
|
|
STM.atomically $ STM.writeTVar updateEventRef $ Just $ SSEListenStart time
|
|
PG.PNEPQNotify notif ->
|
|
case eitherDecodeStrict $ PQ.notifyExtra notif of
|
|
Left e -> logError logger threadType $ TEPayloadParse $ T.pack e
|
|
Right payload -> do
|
|
logInfo logger threadType $ object ["received_event" .= payload]
|
|
#ifndef PROFILING
|
|
$assertNFHere payload -- so we don't write thunks to mutable vars
|
|
#endif
|
|
-- Push a notify event to Queue
|
|
STM.atomically $ STM.writeTVar updateEventRef $ Just $ SSEPayload payload
|
|
|
|
onError = logError logger threadType . TEQueryError
|
|
-- NOTE: we handle expected error conditions here, while unexpected exceptions will result in
|
|
-- a restart and log from 'forkImmortal'
|
|
logWarn = unLogger logger $
|
|
SchemaSyncThreadLog LevelWarn TTListener $ String
|
|
"error occurred, retrying postgres listen after 1 second"
|
|
|
|
|
|
-- | An IO action that processes events from Queue, in a loop forever.
|
|
processor
|
|
:: forall m void.
|
|
( C.ForkableMonadIO m
|
|
, MonadMetadataStorage (MetadataStorageT m)
|
|
, MonadResolveSource m
|
|
)
|
|
=> Logger Hasura
|
|
-> HTTP.Manager
|
|
-> SchemaSyncEventRef
|
|
-> SchemaCacheRef
|
|
-> InstanceId
|
|
-> UTC.UTCTime
|
|
-> ServerConfigCtx
|
|
-> m void
|
|
processor logger httpMgr updateEventRef
|
|
cacheRef instanceId cacheInitStartTime serverConfigCtx =
|
|
-- Never exits
|
|
forever $ do
|
|
event <- liftIO $ STM.atomically getLatestEvent
|
|
logInfo logger threadType $ object ["processed_event" .= event]
|
|
(shouldReload, cacheInvalidations) <- case event of
|
|
SSEListenStart time ->
|
|
-- If listening started after cache initialization, just refresh the schema cache unconditionally.
|
|
-- See Note [Schema Cache Sync]
|
|
pure $ (time > cacheInitStartTime, mempty)
|
|
SSEPayload payload -> do
|
|
eitherResult <- runMetadataStorageT $ processSchemaSyncEventPayload instanceId payload
|
|
case eitherResult of
|
|
Left e -> do
|
|
logError logger threadType $ TEPayloadParse $ qeError e
|
|
pure (False, mempty)
|
|
Right SchemaSyncEventProcessResult{..} ->
|
|
pure (_sseprShouldReload, _sseprCacheInvalidations)
|
|
|
|
when shouldReload $
|
|
refreshSchemaCache logger httpMgr cacheRef cacheInvalidations
|
|
threadType serverConfigCtx "schema cache reloaded"
|
|
where
|
|
-- checks if there is an event
|
|
-- and replaces it with Nothing
|
|
getLatestEvent = do
|
|
eventM <- STM.readTVar updateEventRef
|
|
case eventM of
|
|
Just event -> do
|
|
STM.writeTVar updateEventRef Nothing
|
|
return event
|
|
Nothing -> STM.retry
|
|
threadType = TTProcessor
|
|
|
|
refreshSchemaCache
|
|
:: ( MonadIO m
|
|
, MonadBaseControl IO m
|
|
, MonadMetadataStorage (MetadataStorageT m)
|
|
, MonadResolveSource m
|
|
)
|
|
=> Logger Hasura
|
|
-> HTTP.Manager
|
|
-> SchemaCacheRef
|
|
-> CacheInvalidations
|
|
-> ThreadType
|
|
-> ServerConfigCtx
|
|
-> Text
|
|
-> m ()
|
|
refreshSchemaCache logger httpManager
|
|
cacheRef invalidations threadType serverConfigCtx msg = do
|
|
-- Reload schema cache from catalog
|
|
eitherMetadata <- runMetadataStorageT fetchMetadata
|
|
resE <- runExceptT $ do
|
|
(metadata, _) <- liftEither eitherMetadata
|
|
withSCUpdate cacheRef logger do
|
|
rebuildableCache <- fst <$> liftIO (readIORef $ _scrCache cacheRef)
|
|
((), cache, _) <- buildSchemaCacheWithOptions CatalogSync invalidations metadata
|
|
& runCacheRWT rebuildableCache
|
|
& peelRun runCtx
|
|
pure ((), cache)
|
|
case resE of
|
|
Left e -> logError logger threadType $ TEQueryError e
|
|
Right () -> logInfo logger threadType $ object ["message" .= msg]
|
|
where
|
|
runCtx = RunCtx adminUserInfo httpManager serverConfigCtx
|
|
|
|
logInfo :: (MonadIO m) => Logger Hasura -> ThreadType -> Value -> m ()
|
|
logInfo logger threadType val = unLogger logger $
|
|
SchemaSyncThreadLog LevelInfo threadType val
|
|
|
|
logError :: (MonadIO m, ToJSON a) => Logger Hasura -> ThreadType -> a -> m ()
|
|
logError logger threadType err =
|
|
unLogger logger $ SchemaSyncThreadLog LevelError threadType $
|
|
object ["error" .= toJSON err]
|