mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-17 12:31:52 +03:00
d52bfcda4e
* move user info related code to Hasura.User module
* the RFC #4120 implementation; insert permissions with admin secret
* revert back to old RoleName based schema maps
An attempt made to avoid duplication of schema contexts in types
if any role doesn't possess any admin secret specific schema
* fix compile errors in haskell test
* keep 'user_vars' for session variables in http-logs
* no-op refacto
* tests for admin only inserts
* update docs for admin only inserts
* updated CHANGELOG.md
* default behaviour when admin secret is not set
* fix x-hasura-role to X-Hasura-Role in pytests
* introduce effective timeout in actions async tests
* update docs for admin-secret not configured case
* Update docs/graphql/manual/api-reference/schema-metadata-api/permission.rst
Co-Authored-By: Marion Schleifer <marion@hasura.io>
* Apply suggestions from code review
Co-Authored-By: Marion Schleifer <marion@hasura.io>
* a complete iteration
backend insert permissions accessable via 'x-hasura-backend-privilege'
session variable
* console changes for backend-only permissions
* provide tooltip id; update labels and tooltips;
* requested changes
* requested changes
- remove className from Toggle component
- use appropriate function name (capitalizeFirstChar -> capitalize)
* use toggle props from definitelyTyped
* fix accidental commit
* Revert "introduce effective timeout in actions async tests"
This reverts commit b7a59c19d6
.
* generate complete schema for both 'default' and 'backend' sessions
* Apply suggestions from code review
Co-Authored-By: Marion Schleifer <marion@hasura.io>
* remove unnecessary import, export Toggle as is
* update session variable in tooltip
* 'x-hasura-use-backend-only-permissions' variable to switch
* update help texts
* update docs
* update docs
* update console help text
* regenerate package-lock
* serve no backend schema when backend_only: false and header set to true
- Few type name refactor as suggested by @0x777
* update CHANGELOG.md
* Update CHANGELOG.md
* Update CHANGELOG.md
* fix a merge bug where a certain entity didn't get removed
Co-authored-by: Marion Schleifer <marion@hasura.io>
Co-authored-by: Rishichandra Wawhal <rishi@hasura.io>
Co-authored-by: rikinsk <rikin.kachhia@gmail.com>
Co-authored-by: Tirumarai Selvan <tiru@hasura.io>
241 lines
7.9 KiB
Haskell
241 lines
7.9 KiB
Haskell
module Hasura.Server.SchemaUpdate
|
|
(startSchemaSyncThreads)
|
|
where
|
|
|
|
import Hasura.Prelude
|
|
import Hasura.Session
|
|
|
|
import Hasura.Logging
|
|
import Hasura.RQL.DDL.Schema (runCacheRWT)
|
|
import Hasura.RQL.Types
|
|
import Hasura.RQL.Types.Run
|
|
import Hasura.Server.API.Query
|
|
import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate)
|
|
import Hasura.Server.Init (InstanceId (..))
|
|
import Hasura.Server.Logging
|
|
|
|
import Data.Aeson
|
|
import Data.Aeson.Casing
|
|
import Data.Aeson.TH
|
|
import Data.IORef
|
|
import GHC.AssertNF
|
|
|
|
import qualified Control.Concurrent.Extended as C
|
|
import qualified Control.Concurrent.STM as STM
|
|
import qualified Control.Immortal as Immortal
|
|
import qualified Data.Text as T
|
|
import qualified Data.Time as UTC
|
|
import qualified Database.PG.Query as PG
|
|
import qualified Database.PostgreSQL.LibPQ as PQ
|
|
import qualified Network.HTTP.Client as HTTP
|
|
|
|
pgChannel :: PG.PGChannel
|
|
pgChannel = "hasura_schema_update"
|
|
|
|
data ThreadType
|
|
= TTListener
|
|
| TTProcessor
|
|
deriving (Eq)
|
|
|
|
instance Show ThreadType where
|
|
show TTListener = "listener"
|
|
show TTProcessor = "processor"
|
|
|
|
|
|
data SchemaSyncThreadLog
|
|
= SchemaSyncThreadLog
|
|
{ suelLogLevel :: !LogLevel
|
|
, suelThreadType :: !ThreadType
|
|
, suelInfo :: !Value
|
|
} deriving (Show, Eq)
|
|
|
|
instance ToJSON SchemaSyncThreadLog where
|
|
toJSON (SchemaSyncThreadLog _ t info) =
|
|
object [ "thread_type" .= show t
|
|
, "info" .= info
|
|
]
|
|
|
|
instance ToEngineLog SchemaSyncThreadLog Hasura where
|
|
toEngineLog threadLog =
|
|
(suelLogLevel threadLog, ELTInternal ILTSchemaSyncThread, toJSON threadLog)
|
|
|
|
data EventPayload
|
|
= EventPayload
|
|
{ _epInstanceId :: !InstanceId
|
|
, _epOccurredAt :: !UTC.UTCTime
|
|
, _epInvalidations :: !CacheInvalidations
|
|
}
|
|
$(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload)
|
|
|
|
data ThreadError
|
|
= TEJsonParse !T.Text
|
|
| TEQueryError !QErr
|
|
$(deriveToJSON
|
|
defaultOptions { constructorTagModifier = snakeCase . drop 2
|
|
, sumEncoding = TaggedObject "type" "info"
|
|
}
|
|
''ThreadError)
|
|
|
|
-- | An IO action that enables metadata syncing
|
|
startSchemaSyncThreads
|
|
:: (MonadIO m)
|
|
=> SQLGenCtx
|
|
-> PG.PGPool
|
|
-> Logger Hasura
|
|
-> HTTP.Manager
|
|
-> SchemaCacheRef
|
|
-> InstanceId
|
|
-> Maybe UTC.UTCTime
|
|
-> m (Immortal.Thread, Immortal.Thread)
|
|
-- ^ Returns: (listener handle, processor handle)
|
|
startSchemaSyncThreads sqlGenCtx pool logger httpMgr cacheRef instanceId cacheInitTime = do
|
|
-- only the latest event is recorded here
|
|
-- we don't want to store and process all the events, only the latest event
|
|
updateEventRef <- liftIO $ STM.newTVarIO Nothing
|
|
|
|
-- Start listener thread
|
|
lTId <- liftIO $ C.forkImmortal "SchemeUpdate.listener" logger $
|
|
listener sqlGenCtx pool logger httpMgr updateEventRef cacheRef instanceId cacheInitTime
|
|
logThreadStarted TTListener lTId
|
|
|
|
-- Start processor thread
|
|
pTId <- liftIO $ C.forkImmortal "SchemeUpdate.processor" logger $
|
|
processor sqlGenCtx pool logger httpMgr updateEventRef cacheRef instanceId
|
|
logThreadStarted TTProcessor pTId
|
|
|
|
return (lTId, pTId)
|
|
|
|
where
|
|
logThreadStarted threadType thread =
|
|
let msg = T.pack (show threadType) <> " thread started"
|
|
in unLogger logger $
|
|
StartupLog LevelInfo "schema-sync" $
|
|
object [ "instance_id" .= getInstanceId instanceId
|
|
, "thread_id" .= show (Immortal.threadId thread)
|
|
, "message" .= msg
|
|
]
|
|
|
|
-- | An IO action that listens to postgres for events and pushes them to a Queue, in a loop forever.
|
|
listener
|
|
:: SQLGenCtx
|
|
-> PG.PGPool
|
|
-> Logger Hasura
|
|
-> HTTP.Manager
|
|
-> STM.TVar (Maybe EventPayload)
|
|
-> SchemaCacheRef
|
|
-> InstanceId
|
|
-> Maybe UTC.UTCTime -> IO void
|
|
listener sqlGenCtx pool logger httpMgr updateEventRef
|
|
cacheRef instanceId cacheInitTime =
|
|
-- Never exits
|
|
forever $ do
|
|
listenResE <-
|
|
liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler
|
|
either onError return listenResE
|
|
logWarn
|
|
C.sleep $ seconds 1
|
|
where
|
|
threadType = TTListener
|
|
|
|
shouldRefresh dbInstId accrdAt =
|
|
case cacheInitTime of
|
|
Nothing -> True
|
|
Just time -> (dbInstId /= instanceId) && accrdAt > time
|
|
|
|
refreshCache Nothing = return ()
|
|
refreshCache (Just (dbInstId, accrdAt, invalidations)) =
|
|
when (shouldRefresh dbInstId accrdAt) $
|
|
refreshSchemaCache sqlGenCtx pool logger httpMgr cacheRef invalidations
|
|
threadType "schema cache reloaded after postgres listen init"
|
|
|
|
notifyHandler = \case
|
|
PG.PNEOnStart -> do
|
|
eRes <- runExceptT $ PG.runTx pool
|
|
(PG.Serializable, Nothing) fetchLastUpdate
|
|
case eRes of
|
|
Left e -> onError e
|
|
Right mLastUpd -> refreshCache mLastUpd
|
|
|
|
PG.PNEPQNotify notif ->
|
|
case eitherDecodeStrict $ PQ.notifyExtra notif of
|
|
Left e -> logError logger threadType $ TEJsonParse $ T.pack e
|
|
Right payload -> do
|
|
logInfo logger threadType $ object ["received_event" .= payload]
|
|
$assertNFHere payload -- so we don't write thunks to mutable vars
|
|
-- Push a notify event to Queue
|
|
STM.atomically $ STM.writeTVar updateEventRef $ Just payload
|
|
|
|
onError = logError logger threadType . TEQueryError
|
|
-- NOTE: we handle expected error conditions here, while unexpected exceptions will result in
|
|
-- a restart and log from 'forkImmortal'
|
|
logWarn = unLogger logger $
|
|
SchemaSyncThreadLog LevelWarn TTListener $ String
|
|
"error occurred, retrying postgres listen after 1 second"
|
|
|
|
|
|
-- | An IO action that processes events from Queue, in a loop forever.
|
|
processor
|
|
:: SQLGenCtx
|
|
-> PG.PGPool
|
|
-> Logger Hasura
|
|
-> HTTP.Manager
|
|
-> STM.TVar (Maybe EventPayload)
|
|
-> SchemaCacheRef
|
|
-> InstanceId -> IO void
|
|
processor sqlGenCtx pool logger httpMgr updateEventRef
|
|
cacheRef instanceId =
|
|
-- Never exits
|
|
forever $ do
|
|
event <- STM.atomically getLatestEvent
|
|
logInfo logger threadType $ object ["processed_event" .= event]
|
|
when (shouldReload event) $
|
|
refreshSchemaCache sqlGenCtx pool logger httpMgr cacheRef (_epInvalidations event)
|
|
threadType "schema cache reloaded"
|
|
where
|
|
-- checks if there is an event
|
|
-- and replaces it with Nothing
|
|
getLatestEvent = do
|
|
eventM <- STM.readTVar updateEventRef
|
|
case eventM of
|
|
Just event -> do
|
|
STM.writeTVar updateEventRef Nothing
|
|
return event
|
|
Nothing -> STM.retry
|
|
threadType = TTProcessor
|
|
|
|
-- If event is from another server
|
|
shouldReload payload = _epInstanceId payload /= instanceId
|
|
|
|
refreshSchemaCache
|
|
:: SQLGenCtx
|
|
-> PG.PGPool
|
|
-> Logger Hasura
|
|
-> HTTP.Manager
|
|
-> SchemaCacheRef
|
|
-> CacheInvalidations
|
|
-> ThreadType
|
|
-> T.Text -> IO ()
|
|
refreshSchemaCache sqlGenCtx pool logger httpManager cacheRef invalidations threadType msg = do
|
|
-- Reload schema cache from catalog
|
|
resE <- liftIO $ runExceptT $ withSCUpdate cacheRef logger do
|
|
rebuildableCache <- fst <$> liftIO (readIORef $ _scrCache cacheRef)
|
|
((), cache, _) <- buildSchemaCacheWithOptions CatalogSync invalidations
|
|
& runCacheRWT rebuildableCache
|
|
& peelRun runCtx pgCtx PG.ReadWrite
|
|
pure ((), cache)
|
|
case resE of
|
|
Left e -> logError logger threadType $ TEQueryError e
|
|
Right () -> logInfo logger threadType $ object ["message" .= msg]
|
|
where
|
|
runCtx = RunCtx adminUserInfo httpManager sqlGenCtx
|
|
pgCtx = PGExecCtx pool PG.Serializable
|
|
|
|
logInfo :: Logger Hasura -> ThreadType -> Value -> IO ()
|
|
logInfo logger threadType val = unLogger logger $
|
|
SchemaSyncThreadLog LevelInfo threadType val
|
|
|
|
logError :: ToJSON a => Logger Hasura -> ThreadType -> a -> IO ()
|
|
logError logger threadType err =
|
|
unLogger logger $ SchemaSyncThreadLog LevelError threadType $
|
|
object ["error" .= toJSON err]
|