server: core changes for zero-downtime env vars update on cloud

[GS-232]: https://hasurahq.atlassian.net/browse/GS-232?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7207
Co-authored-by: pranshi06 <85474619+pranshi06@users.noreply.github.com>
Co-authored-by: Rakesh Emmadi <12475069+rakeshkky@users.noreply.github.com>
Co-authored-by: Puru Gupta <32328846+purugupta99@users.noreply.github.com>
Co-authored-by: Naveen Naidu <30195193+Naveenaidu@users.noreply.github.com>
GitOrigin-RevId: 90a771036da5275cd277f3daaf410381955c69de
This commit is contained in:
Anon Ray 2023-03-30 22:01:50 +05:30 committed by hasura-bot
parent bd9f93eaef
commit 5a81eaa9b6
16 changed files with 459 additions and 284 deletions

View File

@ -18,6 +18,9 @@ import GHC.Debug.Stub
import GHC.TypeLits (Symbol)
import Hasura.App
import Hasura.App.State
( AppEnv (..),
Loggers (..),
)
import Hasura.Backends.Postgres.Connection.MonadTx
import Hasura.Backends.Postgres.Connection.Settings
import Hasura.GC qualified as GC

View File

@ -113,6 +113,7 @@ import Hasura.GraphQL.Transport.HTTP
import Hasura.GraphQL.Transport.HTTP.Protocol (toParsed)
import Hasura.GraphQL.Transport.WSServerApp qualified as WS
import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS
import Hasura.GraphQL.Transport.WebSocket.Types (WSServerEnv (..))
import Hasura.Logging
import Hasura.Metadata.Class
import Hasura.PingSources
@ -1006,17 +1007,7 @@ mkHGEServer setupHook appStateRef ekgStore = do
AppEnv {..} <- lift askAppEnv
let Loggers loggerCtx logger _ = appEnvLoggers
wsServerEnv <-
WS.createWSServerEnv
(_lsLogger appEnvLoggers)
appEnvSubscriptionState
appStateRef
appEnvManager
appEnvEnableReadOnlyMode
appEnvWebSocketKeepAlive
appEnvServerMetrics
appEnvPrometheusMetrics
appEnvTraceSamplingPolicy
wsServerEnv <- lift $ WS.createWSServerEnv appStateRef
HasuraApp app actionSubState stopWsServer <-
lift $
@ -1082,11 +1073,11 @@ mkHGEServer setupHook appStateRef ekgStore = do
(scSourcePingConfig <$> getSchemaCache appStateRef)
)
-- start a background thread for telemetry
_telemetryThread <-
if isTelemetryEnabled acEnableTelemetry
then do
lift . unLogger logger $ mkGenericLog @Text LevelInfo "telemetry" telemetryNotice
-- initialise the websocket connection reaper thread
_websocketConnectionReaperThread <-
C.forkManagedT "websocket connection reaper thread" logger $
liftIO $
WS.websocketConnectionReaper getLatestConfigForWSServer getSchemaCache' (_wseServer wsServerEnv)
dbUid <-
getMetadataDbUid `onLeftM` throwErrJExit DatabaseMigrationError
@ -1094,12 +1085,12 @@ mkHGEServer setupHook appStateRef ekgStore = do
liftIO (runExceptT $ PG.runTx appEnvMetadataDbPool (PG.ReadCommitted, Nothing) $ getPgVersion)
`onLeftM` throwErrJExit DatabaseMigrationError
telemetryThread <-
lift . unLogger logger $ mkGenericLog @Text LevelInfo "telemetry" telemetryNotice
-- start a background thread for telemetry
_telemetryThread <-
C.forkManagedT "runTelemetry" logger $
liftIO $
runTelemetry logger appEnvManager (getSchemaCache appStateRef) dbUid appEnvInstanceId pgVersion acExperimentalFeatures
return $ Just telemetryThread
else return Nothing
runTelemetry logger appStateRef dbUid pgVersion
-- forking a dedicated polling thread to dynamically get the latest JWK settings
-- set by the user and update the JWK accordingly. This will help in applying the
@ -1119,6 +1110,12 @@ mkHGEServer setupHook appStateRef ekgStore = do
Right _ -> False
Left err -> qeCode err == ConcurrentUpdate
getLatestConfigForWSServer =
fmap
(\appCtx -> (acAuthMode appCtx, acEnableAllowlist appCtx, acCorsPolicy appCtx))
(getAppContext appStateRef)
getSchemaCache' = getSchemaCache appStateRef
prepareScheduledEvents (Logger logger) = do
liftIO $ logger $ mkGenericLog @Text LevelInfo "scheduled_triggers" "preparing data"
res <- Retry.retrying Retry.retryPolicyDefault isRetryRequired (return unlockAllLockedScheduledEvents)
@ -1250,11 +1247,6 @@ mkHGEServer setupHook appStateRef ekgStore = do
startAsyncActionsPollerThread logger lockedEventsCtx actionSubState = do
AppEnv {..} <- lift askAppEnv
AppContext {..} <- liftIO $ getAppContext appStateRef
-- start a background thread to handle async actions
case acAsyncActionsFetchInterval of
Skip -> pure () -- Don't start the poller thread
Interval (unrefine -> sleepTime) -> do
let label = "asyncActionsProcessor"
asyncActionGracefulShutdownAction =
( liftWithStateless \lowerIO ->
@ -1267,19 +1259,18 @@ mkHGEServer setupHook appStateRef ekgStore = do
)
)
-- start a background thread to handle async actions
void
$ C.forkManagedTWithGracefulShutdown
label
logger
(C.ThreadShutdown asyncActionGracefulShutdownAction)
$ asyncActionsProcessor
-- TODO: puru: send IO hook for acEnvironment
acEnvironment
(acEnvironment <$> getAppContext appStateRef)
logger
(getSchemaCache appStateRef)
(acAsyncActionsFetchInterval <$> getAppContext appStateRef)
(leActionEvents lockedEventsCtx)
appEnvPrometheusMetrics
sleepTime
Nothing
-- start a background thread to handle async action live queries
@ -1289,7 +1280,6 @@ mkHGEServer setupHook appStateRef ekgStore = do
startScheduledEventsPollerThread logger lockedEventsCtx = do
AppEnv {..} <- lift askAppEnv
AppContext {..} <- liftIO $ getAppContext appStateRef
-- prepare scheduled triggers
lift $ prepareScheduledEvents logger
@ -1318,8 +1308,7 @@ mkHGEServer setupHook appStateRef ekgStore = do
logger
(C.ThreadShutdown scheduledEventsGracefulShutdownAction)
$ processScheduledTriggers
-- TODO: puru: send IO hook for acEnvironment
acEnvironment
(acEnvironment <$> getAppContext appStateRef)
logger
scheduledEventsStatsLogger
appEnvManager

View File

@ -12,6 +12,7 @@ module Hasura.App.State
-- * init functions
buildRebuildableAppContext,
rebuildRebuildableAppContext,
initSQLGenCtx,
-- * server config
@ -122,7 +123,7 @@ data AppEnv = AppEnv
appEnvWebSocketKeepAlive :: KeepAliveDelay,
appEnvWebSocketConnectionInitTimeout :: WSConnectionInitTimeout,
appEnvGracefulShutdownTimeout :: Refined NonNegative Seconds,
-- TODO: Move this to `ServerContext`. We are leaving this for now as this cannot be changed directly
-- TODO: Move this to `AppContext`. We are leaving this for now as this cannot be changed directly
-- by the user on the cloud dashboard and will also require a refactor in HasuraPro/App.hs
-- as this thread is initialised there before creating the `AppStateRef`. But eventually we need
-- to do it for the Enterprise version.
@ -204,6 +205,27 @@ buildRebuildableAppContext readerContext serveOptions env = do
let !rebuildableAppContext = RebuildableAppContext appContext initInvalidationKeys (Inc.rebuildRule result)
pure rebuildableAppContext
-- | Function to rebuild the 'AppContext' from a given 'RebuildableAppContext'
-- and a new 'ServeOptions'
rebuildRebuildableAppContext ::
(MonadIO m, MonadError QErr m) =>
(L.Logger L.Hasura, HTTP.Manager) ->
RebuildableAppContext impl ->
ServeOptions impl ->
E.Environment ->
m (RebuildableAppContext impl)
rebuildRebuildableAppContext readerCtx (RebuildableAppContext _ _ rule) serveOptions env = do
let newInvalidationKeys = InvalidationKeys
result <-
liftEitherM $
liftIO $
runExceptT $
flip runReaderT readerCtx $
Inc.build rule (serveOptions, env, newInvalidationKeys)
let appContext = Inc.result result
!newCtx = RebuildableAppContext appContext newInvalidationKeys (Inc.rebuildRule result)
pure newCtx
buildAppContextRule ::
forall arr m impl.
( ArrowChoice arr,

View File

@ -400,7 +400,7 @@ processScheduledTriggers ::
MonadMetadataStorage m,
MonadBaseControl IO m
) =>
Env.Environment ->
IO Env.Environment ->
L.Logger L.Hasura ->
FetchedScheduledEventsStatsLogger ->
HTTP.Manager ->
@ -408,10 +408,11 @@ processScheduledTriggers ::
IO SchemaCache ->
LockedEventsCtx ->
m (Forever m)
processScheduledTriggers env logger statsLogger httpMgr scheduledTriggerMetrics getSC LockedEventsCtx {..} = do
processScheduledTriggers getEnvHook logger statsLogger httpMgr scheduledTriggerMetrics getSC LockedEventsCtx {..} = do
return $
Forever () $
const do
env <- liftIO getEnvHook
getScheduledEventsForDelivery >>= \case
Left e -> logInternalError e
Right (cronEvents, oneOffEvents) -> do

View File

@ -38,6 +38,7 @@ import Data.Set (Set)
import Data.Text.Extended
import Data.Text.NonEmpty
import Database.PG.Query qualified as PG
import Hasura.App.State
import Hasura.Backends.Postgres.Connection.MonadTx
import Hasura.Backends.Postgres.Execute.Prepare
import Hasura.Backends.Postgres.Execute.Types
@ -74,17 +75,18 @@ import Hasura.RQL.Types.Eventing
import Hasura.RQL.Types.Function
import Hasura.RQL.Types.SchemaCache
import Hasura.SQL.Backend
import Hasura.Server.Init.Config (OptionalInterval (..))
import Hasura.Server.Prometheus (PrometheusMetrics (..))
import Hasura.Server.Utils
( mkClientHeadersForward,
mkSetCookieHeaders,
)
import Hasura.Services.Network
import Hasura.Session
import Hasura.Tracing qualified as Tracing
import Language.GraphQL.Draft.Syntax qualified as G
import Network.HTTP.Client.Transformable qualified as HTTP
import Network.Wreq qualified as Wreq
import Refined (unrefine)
import System.Metrics.Prometheus.Counter as Prometheus.Counter
fetchActionLogResponses ::
@ -428,25 +430,32 @@ resolveAsyncActionQuery userInfo annAction =
-- See Note [Async action architecture] above
asyncActionsProcessor ::
forall m.
( MonadIO m,
( HasAppEnv m,
MonadIO m,
MonadBaseControl IO m,
LA.Forall (LA.Pure m),
MonadMetadataStorage m,
ProvidesNetwork m,
Tracing.MonadTrace m
) =>
Env.Environment ->
IO Env.Environment ->
L.Logger L.Hasura ->
IO SchemaCache ->
IO OptionalInterval ->
STM.TVar (Set LockedActionEventId) ->
PrometheusMetrics ->
Milliseconds ->
Maybe GH.GQLQueryText ->
m (Forever m)
asyncActionsProcessor env logger getSCFromRef' lockedActionEvents prometheusMetrics sleepTime gqlQueryText =
asyncActionsProcessor getEnvHook logger getSCFromRef' getFetchInterval lockedActionEvents gqlQueryText =
return $
Forever () $
const $ do
fetchInterval <- liftIO getFetchInterval
case fetchInterval of
-- async actions processor thread is a polling thread, so we sleep
-- for a second in case the fetch interval is not provided and try to
-- get it in the next iteration. If the fetch interval is available,
-- we check for async actions to process.
Skip -> liftIO $ sleep $ seconds 1
Interval sleepTime -> do
actionCache <- scActions <$> liftIO getSCFromRef'
let asyncActions =
Map.filter ((== ActionMutation ActionAsynchronous) . (^. aiDefinition . adType)) actionCache
@ -465,12 +474,11 @@ asyncActionsProcessor env logger getSCFromRef' lockedActionEvents prometheusMetr
-- no events that are in the 'processing' state
saveLockedEvents (map (EventId . actionIdToText . _aliId) asyncInvocations) lockedActionEvents
LA.mapConcurrently_ (callHandler actionCache) asyncInvocations
liftIO $ sleep $ milliseconds sleepTime
liftIO $ sleep $ milliseconds (unrefine sleepTime)
where
callHandler :: ActionCache -> ActionLogItem -> m ()
callHandler actionCache actionLogItem =
Tracing.newTrace Tracing.sampleAlways "async actions processor" do
httpManager <- askHTTPManager
let ActionLogItem
actionId
actionName
@ -490,13 +498,15 @@ asyncActionsProcessor env logger getSCFromRef' lockedActionEvents prometheusMetr
actionContext = ActionContext actionName
metadataRequestTransform = _adRequestTransform definition
metadataResponseTransform = _adResponseTransform definition
eitherRes <-
eitherRes <- do
env <- liftIO getEnvHook
AppEnv {..} <- askAppEnv
runExceptT $
flip runReaderT logger $
callWebhook
env
httpManager
prometheusMetrics
appEnvManager
appEnvPrometheusMetrics
outputType
outputFields
reqHeaders

View File

@ -147,7 +147,7 @@ addLiveQuery ::
PrometheusMetrics ->
SubscriberMetadata ->
SubscriptionsState ->
LiveQueriesOptions ->
IO (LiveQueriesOptions, StreamQueriesOptions) ->
SourceName ->
ParameterizedQueryHash ->
-- | operation name of the query
@ -163,7 +163,7 @@ addLiveQuery
prometheusMetrics
subscriberMetadata
subscriptionState
lqOpts
getSubscriptionOptions
source
parameterizedQueryHash
operationName
@ -175,7 +175,10 @@ addLiveQuery
-- disposable subscriber UUID:
subscriberId <- newSubscriberId
(lqOpts, _) <- getSubscriptionOptions
let !subscriber = Subscriber subscriberId subscriberMetadata requestId operationName onResultAction
SubscriptionsOptions _ refetchInterval = lqOpts
$assertNFHere subscriber -- so we don't write thunks to mutable vars
(pollerMaybe, ()) <-
STM.atomically $
@ -206,7 +209,6 @@ addLiveQuery
pure $ SubscriberDetails handlerId cohortKey subscriberId
where
SubscriptionsState lqMap _ postPollHook _ = subscriptionState
SubscriptionsOptions _ refetchInterval = lqOpts
SubscriptionQueryPlan (ParameterizedSubscriptionQueryPlan role query) sourceConfig cohortId resolvedConnectionTemplate cohortKey _ = plan
handlerId = BackendPollerKey $ AB.mkAnyBackend @b $ PollerKey source role (toTxt query) resolvedConnectionTemplate
@ -233,7 +235,7 @@ addStreamSubscriptionQuery ::
PrometheusMetrics ->
SubscriberMetadata ->
SubscriptionsState ->
StreamQueriesOptions ->
IO (LiveQueriesOptions, StreamQueriesOptions) ->
SourceName ->
ParameterizedQueryHash ->
-- | operation name of the query
@ -251,7 +253,7 @@ addStreamSubscriptionQuery
prometheusMetrics
subscriberMetadata
subscriptionState
streamQOpts
getSubscriptionOptions
source
parameterizedQueryHash
operationName
@ -263,8 +265,10 @@ addStreamSubscriptionQuery
-- disposable subscriber UUID:
subscriberId <- newSubscriberId
(_, streamQOpts) <- getSubscriptionOptions
let !subscriber = Subscriber subscriberId subscriberMetadata requestId operationName onResultAction
SubscriptionsOptions _ refetchInterval = streamQOpts
$assertNFHere subscriber -- so we don't write thunks to mutable vars
(handlerM, cohortCursorTVar) <-
@ -296,7 +300,6 @@ addStreamSubscriptionQuery
pure $ SubscriberDetails handlerId (cohortKey, cohortCursorTVar) subscriberId
where
SubscriptionsState _ streamQueryMap postPollHook _ = subscriptionState
SubscriptionsOptions _ refetchInterval = streamQOpts
SubscriptionQueryPlan (ParameterizedSubscriptionQueryPlan role query) sourceConfig cohortId resolvedConnectionTemplate cohortKey _ = plan
handlerId = BackendPollerKey $ AB.mkAnyBackend @b $ PollerKey source role (toTxt query) resolvedConnectionTemplate

View File

@ -11,12 +11,10 @@ import Control.Exception.Lifted
import Control.Monad.Trans.Control qualified as MC
import Data.Aeson (object, toJSON, (.=))
import Data.ByteString.Char8 qualified as B (pack)
import Data.Environment qualified as Env
import Data.Text (pack)
import Hasura.App.State
import Hasura.GraphQL.Execute qualified as E
import Hasura.GraphQL.Execute.Backend qualified as EB
import Hasura.GraphQL.Execute.Subscription.State qualified as ES
import Hasura.GraphQL.Logging
import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery)
import Hasura.GraphQL.Transport.Instances ()
@ -29,10 +27,9 @@ import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.Types.SchemaCache
import Hasura.Server.AppStateRef
import Hasura.Server.Auth (AuthMode, UserAuthentication)
import Hasura.Server.Auth (UserAuthentication)
import Hasura.Server.Init.Config
( KeepAliveDelay,
WSConnectionInitTimeout,
( WSConnectionInitTimeout,
)
import Hasura.Server.Limits
import Hasura.Server.Metrics (ServerMetrics (..))
@ -41,10 +38,8 @@ import Hasura.Server.Prometheus
decWebsocketConnections,
incWebsocketConnections,
)
import Hasura.Server.Types (ReadOnlyMode)
import Hasura.Services.Network
import Hasura.Tracing qualified as Tracing
import Network.HTTP.Client qualified as HTTP
import Network.WebSockets qualified as WS
import System.Metrics.Gauge qualified as EKG.Gauge
@ -64,14 +59,12 @@ createWSServerApp ::
ProvidesNetwork m,
Tracing.MonadTrace m
) =>
Env.Environment ->
HashSet (L.EngineLogType L.Hasura) ->
AuthMode ->
WSServerEnv impl ->
WSConnectionInitTimeout ->
-- | aka generalized 'WS.ServerApp'
WS.HasuraServerApp m
-- -- ^ aka generalized 'WS.ServerApp'
createWSServerApp env enabledLogTypes authMode serverEnv connInitTimeout = \ !ipAddress !pendingConn -> do
createWSServerApp enabledLogTypes serverEnv connInitTimeout = \ !ipAddress !pendingConn -> do
let getMetricsConfig = scMetricsConfig <$> getSchemaCache (_wseAppStateRef serverEnv)
WS.createServerApp getMetricsConfig connInitTimeout (_wseServer serverEnv) prometheusMetrics handlers ipAddress pendingConn
where
@ -85,6 +78,7 @@ createWSServerApp env enabledLogTypes authMode serverEnv connInitTimeout = \ !ip
serverMetrics = _wseServerMetrics serverEnv
prometheusMetrics = _wsePrometheusMetrics serverEnv
getAuthMode = acAuthMode <$> getAppContext (_wseAppStateRef serverEnv)
wsActions = mkWSActions logger
-- Mask async exceptions during event processing to help maintain integrity of mutable vars:
@ -96,7 +90,7 @@ createWSServerApp env enabledLogTypes authMode serverEnv connInitTimeout = \ !ip
onMessageHandler conn bs sp =
mask_ $
onMessage env enabledLogTypes authMode serverEnv conn bs (wsActions sp)
onMessage enabledLogTypes getAuthMode serverEnv conn bs (wsActions sp)
onCloseHandler conn = mask_ do
liftIO $ EKG.Gauge.dec $ smWebsocketConnections serverMetrics
@ -107,46 +101,35 @@ stopWSServerApp :: WSServerEnv impl -> IO ()
stopWSServerApp wsEnv = WS.shutdown (_wseServer wsEnv)
createWSServerEnv ::
(MonadIO m) =>
L.Logger L.Hasura ->
ES.SubscriptionsState ->
( HasAppEnv m,
MonadIO m
) =>
AppStateRef impl ->
HTTP.Manager ->
ReadOnlyMode ->
KeepAliveDelay ->
ServerMetrics ->
PrometheusMetrics ->
Tracing.SamplingPolicy ->
m (WSServerEnv impl)
createWSServerEnv
logger
lqState
appStateRef
httpManager
readOnlyMode
keepAliveDelay
serverMetrics
prometheusMetrics
traceSamplingPolicy = do
AppContext {..} <- liftIO $ getAppContext appStateRef
wsServer <- liftIO $ STM.atomically $ WS.createWSServer logger
createWSServerEnv appStateRef = do
AppEnv {..} <- askAppEnv
let getCorsPolicy = acCorsPolicy <$> getAppContext appStateRef
logger = _lsLogger appEnvLoggers
AppContext {acEnableAllowlist, acAuthMode} <- liftIO $ getAppContext appStateRef
allowlist <- liftIO $ scAllowlist <$> getSchemaCache appStateRef
corsPolicy <- liftIO getCorsPolicy
wsServer <- liftIO $ STM.atomically $ WS.createWSServer acAuthMode acEnableAllowlist allowlist corsPolicy logger
pure $
WSServerEnv
logger
lqState
acLiveQueryOptions
acStreamQueryOptions
(_lsLogger appEnvLoggers)
appEnvSubscriptionState
appStateRef
httpManager
acCorsPolicy
acSQLGenCtx
readOnlyMode
appEnvManager
getCorsPolicy
appEnvEnableReadOnlyMode
wsServer
acEnableAllowlist
keepAliveDelay
serverMetrics
prometheusMetrics
traceSamplingPolicy
appEnvWebSocketKeepAlive
appEnvServerMetrics
appEnvPrometheusMetrics
appEnvTraceSamplingPolicy
mkWSActions :: L.Logger L.Hasura -> WSSubProtocol -> WS.WSActions WSConnData
mkWSActions logger subProtocol =

View File

@ -31,7 +31,6 @@ import Data.ByteString (ByteString)
import Data.ByteString.Lazy qualified as LBS
import Data.CaseInsensitive qualified as CI
import Data.Dependent.Map qualified as DM
import Data.Environment qualified as Env
import Data.HashMap.Strict qualified as Map
import Data.HashMap.Strict.InsOrd qualified as OMap
import Data.HashSet qualified as Set
@ -42,6 +41,7 @@ import Data.Text.Encoding qualified as TE
import Data.Time.Clock qualified as TC
import Data.Word (Word16)
import GHC.AssertNF.CPP
import Hasura.App.State
import Hasura.Backends.Postgres.Instances.Transport (runPGMutationTransaction)
import Hasura.Base.Error
import Hasura.EncJSON
@ -360,7 +360,7 @@ onConn wsId requestHead ipAddress onConnHActions = do
enforceCors origin reqHdrs = do
(L.Logger logger) <- asks _wseLogger
corsPolicy <- asks _wseCorsPolicy
corsPolicy <- liftIO =<< asks _wseCorsPolicy
case cpConfig corsPolicy of
CCAllowAll -> return reqHdrs
CCDisabled readCookie ->
@ -415,7 +415,6 @@ onStart ::
HasResourceLimits m,
ProvidesNetwork m
) =>
Env.Environment ->
HashSet (L.EngineLogType L.Hasura) ->
WSServerEnv impl ->
WSConn ->
@ -423,7 +422,7 @@ onStart ::
StartMsg ->
WS.WSActions WSConnData ->
m ()
onStart env enabledLogTypes serverEnv wsConn shouldCaptureVariables (StartMsg opId q) onMessageActions = catchAndIgnore $ do
onStart enabledLogTypes serverEnv wsConn shouldCaptureVariables (StartMsg opId q) onMessageActions = catchAndIgnore $ do
timerTot <- startTimer
op <- liftIO $ STM.atomically $ STMMap.lookup opId opMap
let opName = _grOperationName q
@ -454,6 +453,10 @@ onStart env enabledLogTypes serverEnv wsConn shouldCaptureVariables (StartMsg op
ExceptT (Either GQExecError QErr) (ExceptT () m) a
runLimits = withErr Right $ runResourceLimits operationLimit
env <- liftIO $ acEnvironment <$> getAppContext appStateRef
sqlGenCtx <- liftIO $ acSQLGenCtx <$> getAppContext appStateRef
enableAL <- liftIO $ acEnableAllowlist <$> getAppContext appStateRef
reqParsedE <- lift $ E.checkGQLExecution userInfo (reqHdrs, ipAddress) enableAL sc q requestId
reqParsed <- onLeft reqParsedE (withComplete . preExecErr requestId Nothing)
queryPartsE <- runExceptT $ getSingleOperation reqParsed
@ -772,6 +775,7 @@ onStart env enabledLogTypes serverEnv wsConn shouldCaptureVariables (StartMsg op
Maybe RJ.RemoteJoins ->
ExceptT (Either GQExecError QErr) (ExceptT () m) AnnotatedResponsePart
runRemoteGQ requestId reqUnparsed fieldName userInfo reqHdrs rsi resultCustomizer gqlReq remoteJoins = do
env <- liftIO $ acEnvironment <$> getAppContext appStateRef
(telemTimeIO_DT, _respHdrs, resp) <-
doQErr $
E.execRemoteGQ env userInfo reqHdrs (rsDef rsi) gqlReq
@ -793,20 +797,18 @@ onStart env enabledLogTypes serverEnv wsConn shouldCaptureVariables (StartMsg op
WSServerEnv
logger
subscriptionsState
lqOpts
streamQOpts
appStateRef
_
_
sqlGenCtx
readOnlyMode
_
enableAL
_keepAliveDelay
_serverMetrics
prometheusMetrics
_ = serverEnv
-- Hook to retrieve the latest subscription options(live query + stream query options) from the `appStateRef`
getSubscriptionOptions = fmap (\appCtx -> (acLiveQueryOptions appCtx, acStreamQueryOptions appCtx)) (getAppContext appStateRef)
gqlMetrics = pmGraphQLRequestMetrics prometheusMetrics
WSConnData userInfoR opMap errRespTy queryType = WS.getData wsConn
@ -911,7 +913,7 @@ onStart env enabledLogTypes serverEnv wsConn shouldCaptureVariables (StartMsg op
(_wsePrometheusMetrics serverEnv)
subscriberMetadata
subscriptionsState
lqOpts
getSubscriptionOptions
sourceName
parameterizedQueryHash
opName
@ -938,7 +940,7 @@ onStart env enabledLogTypes serverEnv wsConn shouldCaptureVariables (StartMsg op
(_wsePrometheusMetrics serverEnv)
subscriberMetadata
subscriptionsState
streamQOpts
getSubscriptionOptions
sourceName
parameterizedQueryHash
opName
@ -1017,15 +1019,14 @@ onMessage ::
ProvidesNetwork m,
Tracing.MonadTrace m
) =>
Env.Environment ->
HashSet (L.EngineLogType L.Hasura) ->
AuthMode ->
IO AuthMode ->
WSServerEnv impl ->
WSConn ->
LBS.ByteString ->
WS.WSActions WSConnData ->
m ()
onMessage env enabledLogTypes authMode serverEnv wsConn msgRaw onMessageActions =
onMessage enabledLogTypes authMode serverEnv wsConn msgRaw onMessageActions =
Tracing.newTrace (_wseTraceSamplingPolicy serverEnv) "websocket" do
case J.eitherDecode msgRaw of
Left e -> do
@ -1049,7 +1050,7 @@ onMessage env enabledLogTypes authMode serverEnv wsConn msgRaw onMessageActions
if _mcAnalyzeQueryVariables (scMetricsConfig schemaCache)
then CaptureQueryVariables
else DoNotCaptureQueryVariables
onStart env enabledLogTypes serverEnv wsConn shouldCaptureVariables startMsg onMessageActions
onStart enabledLogTypes serverEnv wsConn shouldCaptureVariables startMsg onMessageActions
CMStop stopMsg -> onStop serverEnv wsConn stopMsg
-- specfic to graphql-ws
CMPing mPayload -> onPing wsConn mPayload
@ -1107,14 +1108,14 @@ onConnInit ::
L.Logger L.Hasura ->
HTTP.Manager ->
WSConn ->
AuthMode ->
IO AuthMode ->
Maybe ConnParams ->
-- | this is the message handler for handling errors on initializing a from the client connection
WS.WSOnErrorMessageAction WSConnData ->
-- | this is the message handler for handling "keep-alive" messages to the client
WS.WSKeepAliveMessageAction WSConnData ->
m ()
onConnInit logger manager wsConn authMode connParamsM onConnInitErrAction keepAliveMessageAction = do
onConnInit logger manager wsConn getAuthMode connParamsM onConnInitErrAction keepAliveMessageAction = do
-- TODO(from master): what should be the behaviour of connection_init message when a
-- connection is already iniatilized? Currently, we seem to be doing
-- something arbitrary which isn't correct. Ideally, we should stick to
@ -1124,6 +1125,7 @@ onConnInit logger manager wsConn authMode connParamsM onConnInitErrAction keepAl
-- 'not initialised'. This means that there is no reason for the
-- connection to be in `CSInitError` state.
connState <- liftIO (STM.readTVarIO (_wscUser $ WS.getData wsConn))
authMode <- liftIO $ getAuthMode
case getIpAddress connState of
Left err -> unexpectedInitError err
Right ipAddress -> do

View File

@ -18,7 +18,8 @@ module Hasura.GraphQL.Transport.WebSocket.Server
WSLog (WSLog),
WSOnErrorMessageAction,
WSQueueResponse (WSQueueResponse),
WSServer,
WSServer (..),
websocketConnectionReaper,
closeConn,
sendMsgAndCloseConn,
createServerApp,
@ -37,6 +38,8 @@ where
import Control.Concurrent.Async qualified as A
import Control.Concurrent.Async.Lifted.Safe qualified as LA
import Control.Concurrent.Extended (sleep)
import Control.Concurrent.STM (readTVarIO)
import Control.Concurrent.STM qualified as STM
import Control.Exception.Lifted
import Control.Monad.Trans.Control qualified as MC
@ -60,7 +63,10 @@ import Hasura.GraphQL.Transport.WebSocket.Protocol
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.Types.Common (MetricsConfig (..))
import Hasura.Server.Init.Config (WSConnectionInitTimeout (..))
import Hasura.RQL.Types.SchemaCache
import Hasura.Server.Auth (AuthMode, compareAuthMode)
import Hasura.Server.Cors (CorsPolicy)
import Hasura.Server.Init.Config (AllowListStatus (..), WSConnectionInitTimeout (..))
import Hasura.Server.Prometheus
( PrometheusMetrics (..),
)
@ -162,6 +168,15 @@ instance L.ToEngineLog WSLog L.Hasura where
toEngineLog wsLog =
(L.LevelDebug, L.ELTInternal L.ILTWsServer, J.toJSON wsLog)
data WSReaperThreadLog = WSReaperThreadLog
{ _wrtlMessage :: Text
}
deriving (Show)
instance L.ToEngineLog WSReaperThreadLog L.Hasura where
toEngineLog (WSReaperThreadLog message) =
(L.LevelInfo, L.ELTInternal L.ILTWsServer, J.toJSON message)
data WSQueueResponse = WSQueueResponse
{ _wsqrMessage :: !BL.ByteString,
-- | extra metadata that we use for other actions, such as print log
@ -219,16 +234,30 @@ data ServerStatus a
| ShuttingDown
data WSServer a = WSServer
{ _wssLogger :: !(L.Logger L.Hasura),
{ _wssLogger :: L.Logger L.Hasura,
-- | Keep track of the security sensitive user configuration to perform
-- maintenance actions
_wssSecuritySensitiveUserConfig :: STM.TVar SecuritySensitiveUserConfig,
-- | See e.g. createServerApp.onAccept for how we use STM to preserve consistency
_wssStatus :: !(STM.TVar (ServerStatus a))
_wssStatus :: STM.TVar (ServerStatus a)
}
createWSServer :: L.Logger L.Hasura -> STM.STM (WSServer a)
createWSServer logger = do
-- These are security sensitive user configuration. That is, if any of the
-- following config changes, we need to perform maintenance actions like closing
-- all websocket connections
data SecuritySensitiveUserConfig = SecuritySensitiveUserConfig
{ ssucAuthMode :: AuthMode,
ssucEnableAllowlist :: AllowListStatus,
ssucAllowlist :: InlinedAllowlist,
ssucCorsPolicy :: CorsPolicy
}
createWSServer :: AuthMode -> AllowListStatus -> InlinedAllowlist -> CorsPolicy -> L.Logger L.Hasura -> STM.STM (WSServer a)
createWSServer authMode enableAllowlist allowlist corsPolicy logger = do
connMap <- STMMap.new
userConfRef <- STM.newTVar $ SecuritySensitiveUserConfig authMode enableAllowlist allowlist corsPolicy
serverStatus <- STM.newTVar (AcceptingConns connMap)
return $ WSServer logger serverStatus
return $ WSServer logger userConfRef serverStatus
closeAllWith ::
(BL.ByteString -> WSConn a -> IO ()) ->
@ -308,6 +337,82 @@ data WSHandlers m a = WSHandlers
_hOnClose :: OnCloseH m a
}
-- | The background thread responsible for closing all websocket connections
-- when security sensitive user configuration changes. It checks for changes in
-- the auth mode, allowlist and cors config, and invalidates/closes all
-- connections if there are any changes.
websocketConnectionReaper :: IO (AuthMode, AllowListStatus, CorsPolicy) -> IO SchemaCache -> WSServer a -> IO Void
websocketConnectionReaper getLatestConfig getSchemaCache (WSServer (L.Logger writeLog) userConfRef serverStatus) =
forever $ do
(currAuthMode, currEnableAllowlist, currCorsPolicy) <- getLatestConfig
currAllowlist <- scAllowlist <$> getSchemaCache
SecuritySensitiveUserConfig prevAuthMode prevEnableAllowlist prevAllowlist prevCorsPolicy <- readTVarIO userConfRef
-- check and close all connections if required
checkAndReapConnections
(currAuthMode, prevAuthMode)
(currCorsPolicy, prevCorsPolicy)
(currEnableAllowlist, prevEnableAllowlist)
(currAllowlist, prevAllowlist)
sleep $ seconds 1
where
closeAllConnectionsWithReason ::
String ->
BL.ByteString ->
(SecuritySensitiveUserConfig -> SecuritySensitiveUserConfig) ->
IO ()
closeAllConnectionsWithReason logMsg reason updateConf = do
writeLog $
WSReaperThreadLog $
fromString $
logMsg
conns <- STM.atomically $ do
STM.modifyTVar' userConfRef updateConf
flushConnMap serverStatus
closeAllWith (flip forceConnReconnect) reason conns
-- Close all connections based on -
-- if CorsPolicy changed -> close
-- if AuthMode changed -> close
-- if AllowlistEnabled -> enabled from disabled -> close
-- if AllowlistEnabled -> allowlist collection changed -> close
checkAndReapConnections (currAuthMode, prevAuthMode) (currCorsPolicy, prevCorsPolicy) (currEnableAllowlist, prevEnableAllowlist) (currAllowlist, prevAllowlist) = do
hasAuthModeChanged <- not <$> compareAuthMode currAuthMode prevAuthMode
let hasCorsPolicyChanged = currCorsPolicy /= prevCorsPolicy
hasAllowlistEnabled = prevEnableAllowlist == AllowListDisabled && currEnableAllowlist == AllowListEnabled
hasAllowlistUpdated =
(prevEnableAllowlist == AllowListEnabled && currEnableAllowlist == AllowListEnabled) && (currAllowlist /= prevAllowlist)
if
-- if CORS policy has changed, close all connections
| hasCorsPolicyChanged ->
closeAllConnectionsWithReason
"closing all websocket connections as the cors policy changed"
"cors policy changed"
(\conf -> conf {ssucCorsPolicy = currCorsPolicy})
-- if any auth config has changed, close all connections
| hasAuthModeChanged ->
closeAllConnectionsWithReason
"closing all websocket connections as the auth mode changed"
"auth mode changed"
(\conf -> conf {ssucAuthMode = currAuthMode})
-- In case of allowlist, we need to check if the allowlist has changed.
-- If the allowlist is disabled, we keep all the connections
-- as is.
-- If the allowlist is enabled from a disabled state, we need to close all the
-- connections.
| hasAllowlistEnabled ->
closeAllConnectionsWithReason
"closing all websocket connections as allow list is enabled"
"allow list enabled"
(\conf -> conf {ssucEnableAllowlist = currEnableAllowlist})
-- If the allowlist is already enabled and there are any changes made to the
-- allowlist, we need to close all the connections.
| hasAllowlistUpdated ->
closeAllConnectionsWithReason
"closing all websocket connections as the allow list has been updated"
"allow list updated"
(\conf -> conf {ssucAllowlist = currAllowlist})
| otherwise -> pure ()
createServerApp ::
(MonadIO m, MC.MonadBaseControl IO m, LA.Forall (LA.Pure m), MonadWSLog m) =>
IO MetricsConfig ->
@ -319,7 +424,7 @@ createServerApp ::
-- | aka WS.ServerApp
HasuraServerApp m
{-# INLINE createServerApp #-}
createServerApp getMetricsConfig wsConnInitTimeout (WSServer logger@(L.Logger writeLog) serverStatus) prometheusMetrics wsHandlers !ipAddress !pendingConn = do
createServerApp getMetricsConfig wsConnInitTimeout (WSServer logger@(L.Logger writeLog) _ serverStatus) prometheusMetrics wsHandlers !ipAddress !pendingConn = do
wsId <- WSId <$> liftIO UUID.nextRandom
logWSLog logger $ WSLog wsId EConnectionRequest Nothing
-- NOTE: this timer is specific to `graphql-ws`. the server has to close the connection
@ -477,10 +582,11 @@ createServerApp getMetricsConfig wsConnInitTimeout (WSServer logger@(L.Logger wr
logWSLog logger $ WSLog (_wcConnId wsConn) EClosed Nothing
shutdown :: WSServer a -> IO ()
shutdown (WSServer (L.Logger writeLog) serverStatus) = do
shutdown (WSServer (L.Logger writeLog) _ serverStatus) = do
writeLog $ L.debugT "Shutting websockets server down"
conns <- STM.atomically $ do
conns <- flushConnMap serverStatus
STM.writeTVar serverStatus ShuttingDown
return conns
pure conns
closeAllWith (flip forceConnReconnect) "shutting server down" conns

View File

@ -13,7 +13,6 @@ where
import Control.Concurrent.STM qualified as STM
import Data.Time.Clock qualified as TC
import Hasura.GraphQL.Execute qualified as E
import Hasura.GraphQL.Execute.Subscription.Options qualified as ES
import Hasura.GraphQL.Execute.Subscription.State qualified as ES
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.Instances ()
@ -21,10 +20,9 @@ import Hasura.GraphQL.Transport.WebSocket.Protocol
import Hasura.GraphQL.Transport.WebSocket.Server qualified as WS
import Hasura.Logging qualified as L
import Hasura.Prelude
import Hasura.RQL.Types.Common
import Hasura.Server.AppStateRef
import Hasura.Server.Cors
import Hasura.Server.Init.Config (AllowListStatus, KeepAliveDelay (..))
import Hasura.Server.Init.Config (KeepAliveDelay (..))
import Hasura.Server.Metrics (ServerMetrics (..))
import Hasura.Server.Prometheus (PrometheusMetrics (..))
import Hasura.Server.Types (ReadOnlyMode (..))
@ -76,15 +74,11 @@ data WSConnData = WSConnData
data WSServerEnv impl = WSServerEnv
{ _wseLogger :: !(L.Logger L.Hasura),
_wseSubscriptionState :: !ES.SubscriptionsState,
_wseLiveQueriesOption :: !ES.LiveQueriesOptions,
_wseStreamQueriesOptions :: !ES.StreamQueriesOptions,
_wseAppStateRef :: AppStateRef impl,
_wseHManager :: !HTTP.Manager,
_wseCorsPolicy :: !CorsPolicy,
_wseSQLCtx :: !SQLGenCtx,
_wseCorsPolicy :: IO CorsPolicy,
_wseReadOnlyMode :: ReadOnlyMode,
_wseServer :: !WSServer,
_wseEnableAllowlist :: !AllowListStatus,
_wseKeepAliveDelay :: !KeepAliveDelay,
_wseServerMetrics :: !ServerMetrics,
_wsePrometheusMetrics :: !PrometheusMetrics,

View File

@ -8,7 +8,6 @@ module Hasura.Server.App
Handler,
HandlerCtx (hcReqHeaders, hcAppContext, hcSchemaCache, hcUser),
HasuraApp (HasuraApp),
Loggers (..),
MonadConfigApiHandler (..),
MonadMetadataApiAuthorization (..),
AppContext (..),
@ -81,8 +80,8 @@ import Hasura.Server.API.V2Query qualified as V2Q
import Hasura.Server.AppStateRef
( AppStateRef,
getAppContext,
getRebuildableSchemaCacheWithVersion,
getSchemaCache,
readSchemaCacheRef,
withSchemaCacheUpdate,
)
import Hasura.Server.Auth (AuthMode (..), UserAuthentication (..))
@ -328,7 +327,7 @@ mkSpockAction appStateRef qErrEncoder qErrModifier apiHandler = do
authInfo <- onLeft authenticationResp (logErrorAndResp Nothing requestId req (reqBody, Nothing) False origHeaders (ExtraUserInfo Nothing) . qErrModifier)
let (userInfo, _, authHeaders, extraUserInfo) = authInfo
appContext <- liftIO $ getAppContext appStateRef
(schemaCache, schemaCacheVer) <- liftIO $ readSchemaCacheRef appStateRef
(schemaCache, schemaCacheVer) <- liftIO $ getRebuildableSchemaCacheWithVersion appStateRef
pure
( userInfo,
authHeaders,
@ -681,6 +680,7 @@ configApiGetHandler appStateRef = do
AppEnv {..} <- lift askAppEnv
AppContext {..} <- liftIO $ getAppContext appStateRef
Spock.get "v1alpha1/config" $
onlyWhenApiEnabled isConfigEnabled appStateRef $
mkSpockAction appStateRef encodeQErr id $
mkGetHandler $ do
onlyAdmin
@ -738,13 +738,12 @@ mkWaiApp ::
m HasuraApp
mkWaiApp setupHook appStateRef ekgStore wsServerEnv = do
appEnv@AppEnv {..} <- askAppEnv
AppContext {..} <- liftIO $ getAppContext appStateRef
spockApp <- liftWithStateless $ \lowerIO ->
Spock.spockAsApp $
Spock.spockT lowerIO $
httpApp setupHook appStateRef appEnv ekgStore
let wsServerApp = WS.createWSServerApp acEnvironment (_lsEnabledLogTypes appEnvLoggingSettings) acAuthMode wsServerEnv appEnvWebSocketConnectionInitTimeout -- TODO: Lyndon: Can we pass environment through wsServerEnv?
let wsServerApp = WS.createWSServerApp (_lsEnabledLogTypes appEnvLoggingSettings) wsServerEnv appEnvWebSocketConnectionInitTimeout
stopWSServer = WS.stopWSServerApp wsServerEnv
waiApp <- liftWithStateless $ \lowerIO ->
@ -787,13 +786,11 @@ httpApp setupHook appStateRef AppEnv {..} ekgStore = do
setupHook appStateRef
-- cors middleware
-- todo: puru: create middleware dynamically based on the corsPolicy change
Spock.middleware $
corsMiddleware (acCorsPolicy <$> getAppContext appStateRef)
appCtx@AppContext {..} <- liftIO $ getAppContext appStateRef
-- API Console and Root Dir
when (isConsoleEnabled acConsoleStatus && isMetadataEnabled appCtx) serveApiConsole
serveApiConsole
-- Local console assets for server and CLI consoles
serveApiConsoleAssets
@ -847,6 +844,7 @@ httpApp setupHook appStateRef AppEnv {..} ekgStore = do
RestRequest Spock.SpockMethod ->
Handler m (HttpLogGraphQLInfo, APIResp)
customEndpointHandler restReq = do
AppContext {..} <- liftIO $ getAppContext appStateRef
endpoints <- liftIO $ scEndpoints <$> getSchemaCache appStateRef
schemaCache <- lastBuiltSchemaCache <$> asks hcSchemaCache
schemaCacheVer <- asks hcSchemaCacheVersion
@ -885,52 +883,57 @@ httpApp setupHook appStateRef AppEnv {..} ekgStore = do
-- TODO: Are we actually able to use mkGetHandler in this situation? POST handler seems to do some work that we might want to avoid.
mkGetHandler $ customEndpointHandler (RestRequest wildcard method allParams)
when (isMetadataEnabled appCtx) $ do
-- Note: we create a schema cache updater function, to restrict the access
-- to 'AppStateRef' inside the request handlers
let schemaCacheUpdater = withSchemaCacheUpdate appStateRef logger Nothing
Spock.post "v1/graphql/explain" gqlExplainAction
Spock.post "v1/graphql/explain" $ do
onlyWhenApiEnabled isMetadataEnabled appStateRef gqlExplainAction
Spock.post "v1alpha1/graphql/explain" gqlExplainAction
Spock.post "v1alpha1/graphql/explain" $ do
onlyWhenApiEnabled isMetadataEnabled appStateRef gqlExplainAction
Spock.post "v1/query" $
Spock.post "v1/query" $ do
onlyWhenApiEnabled isMetadataEnabled appStateRef $
spockAction encodeQErr id $ do
mkPostHandler $ fmap (emptyHttpLogGraphQLInfo,) <$> mkAPIRespHandler (v1QueryHandler schemaCacheUpdater)
Spock.post "v1/metadata" $
Spock.post "v1/metadata" $ do
onlyWhenApiEnabled isMetadataEnabled appStateRef $
spockAction encodeQErr id $
mkPostHandler $
fmap (emptyHttpLogGraphQLInfo,) <$> mkAPIRespHandler (v1MetadataHandler schemaCacheUpdater)
Spock.post "v2/query" $
Spock.post "v2/query" $ do
onlyWhenApiEnabled isMetadataEnabled appStateRef $
spockAction encodeQErr id $
mkPostHandler $
fmap (emptyHttpLogGraphQLInfo,) <$> mkAPIRespHandler (v2QueryHandler schemaCacheUpdater)
when (isPGDumpEnabled appCtx) $
Spock.post "v1alpha1/pg_dump" $
Spock.post "v1alpha1/pg_dump" $ do
onlyWhenApiEnabled isPGDumpEnabled appStateRef $
spockAction encodeQErr id $
mkPostHandler $
fmap (emptyHttpLogGraphQLInfo,) <$> v1Alpha1PGDumpHandler
when (isConfigEnabled appCtx) $
runConfigApiHandler appStateRef
when (isGraphQLEnabled appCtx) $ do
Spock.post "v1alpha1/graphql" $
Spock.post "v1alpha1/graphql" $ do
onlyWhenApiEnabled isGraphQLEnabled appStateRef $
spockAction GH.encodeGQErr id $
mkGQLRequestHandler $
mkGQLAPIRespHandler $
v1Alpha1GQHandler E.QueryHasura
Spock.post "v1/graphql" $
Spock.post "v1/graphql" $ do
onlyWhenApiEnabled isGraphQLEnabled appStateRef $
spockAction GH.encodeGQErr allMod200 $
mkGQLRequestHandler $
mkGQLAPIRespHandler $
v1GQHandler
Spock.post "v1beta1/relay" $
Spock.post "v1beta1/relay" $ do
onlyWhenApiEnabled isGraphQLEnabled appStateRef $
spockAction GH.encodeGQErr allMod200 $
mkGQLRequestHandler $
mkGQLAPIRespHandler $
@ -948,37 +951,48 @@ httpApp setupHook appStateRef AppEnv {..} ekgStore = do
stats <- liftIO RTS.getRTSStats
Spock.json stats
when (isDeveloperAPIEnabled appCtx) $ do
Spock.get "dev/ekg" $
Spock.get "dev/ekg" $ do
onlyWhenApiEnabled isDeveloperAPIEnabled appStateRef $
spockAction encodeQErr id $
mkGetHandler $ do
onlyAdmin
respJ <- liftIO $ EKG.sampleAll ekgStore
return (emptyHttpLogGraphQLInfo, JSONResp $ HttpResponse (encJFromJValue $ EKG.sampleToJson respJ) [])
-- This deprecated endpoint used to show the query plan cache pre-PDV.
-- Eventually this endpoint can be removed.
Spock.get "dev/plan_cache" $
Spock.get "dev/plan_cache" $ do
onlyWhenApiEnabled isDeveloperAPIEnabled appStateRef $
spockAction encodeQErr id $
mkGetHandler $ do
onlyAdmin
return (emptyHttpLogGraphQLInfo, JSONResp $ HttpResponse (encJFromJValue J.Null) [])
Spock.get "dev/subscriptions" $
Spock.get "dev/subscriptions" $ do
onlyWhenApiEnabled isDeveloperAPIEnabled appStateRef $
spockAction encodeQErr id $
mkGetHandler $ do
onlyAdmin
respJ <- liftIO $ ES.dumpSubscriptionsState False acLiveQueryOptions acStreamQueryOptions appEnvSubscriptionState
appCtx <- liftIO $ getAppContext appStateRef
respJ <- liftIO $ ES.dumpSubscriptionsState False (acLiveQueryOptions appCtx) (acStreamQueryOptions appCtx) appEnvSubscriptionState
return (emptyHttpLogGraphQLInfo, JSONResp $ HttpResponse (encJFromJValue respJ) [])
Spock.get "dev/subscriptions/extended" $
Spock.get "dev/subscriptions/extended" $ do
onlyWhenApiEnabled isDeveloperAPIEnabled appStateRef $
spockAction encodeQErr id $
mkGetHandler $ do
onlyAdmin
respJ <- liftIO $ ES.dumpSubscriptionsState True acLiveQueryOptions acStreamQueryOptions appEnvSubscriptionState
appCtx <- liftIO $ getAppContext appStateRef
respJ <- liftIO $ ES.dumpSubscriptionsState True (acLiveQueryOptions appCtx) (acStreamQueryOptions appCtx) appEnvSubscriptionState
return (emptyHttpLogGraphQLInfo, JSONResp $ HttpResponse (encJFromJValue respJ) [])
Spock.get "dev/dataconnector/schema" $
Spock.get "dev/dataconnector/schema" $ do
onlyWhenApiEnabled isDeveloperAPIEnabled appStateRef $
spockAction encodeQErr id $
mkGetHandler $ do
onlyAdmin
return (emptyHttpLogGraphQLInfo, JSONResp $ HttpResponse (encJFromJValue openApiSchema) [])
Spock.get "api/swagger/json" $
spockAction encodeQErr id $
mkGetHandler $ do
@ -1028,10 +1042,13 @@ httpApp setupHook appStateRef AppEnv {..} ekgStore = do
serveApiConsole = do
-- redirect / to /console
Spock.get Spock.root $ Spock.redirect "console"
Spock.get Spock.root $ do
onlyWhenApiEnabled (\appCtx -> isConsoleEnabled (acConsoleStatus appCtx) && isMetadataEnabled appCtx) appStateRef $
Spock.redirect "console"
-- serve console html
Spock.get ("console" <//> Spock.wildcard) $ \path -> do
onlyWhenApiEnabled (\appCtx -> isConsoleEnabled (acConsoleStatus appCtx) && isMetadataEnabled appCtx) appStateRef $ do
AppContext {..} <- liftIO $ getAppContext appStateRef
req <- Spock.request
let headers = Wai.requestHeaders req
@ -1044,6 +1061,23 @@ httpApp setupHook appStateRef AppEnv {..} ekgStore = do
Spock.get ("console/assets" <//> Spock.wildcard) $ \path -> do
consoleAssetsHandler logger appEnvLoggingSettings dir path
-- an endpoint can be switched ON/OFF dynamically, hence serve the endpoint only
-- when it is enabled else throw HTTP Error 404
onlyWhenApiEnabled ::
MonadIO m =>
(AppContext -> Bool) ->
AppStateRef impl ->
Spock.ActionCtxT ctx m b ->
Spock.ActionCtxT ctx m b
onlyWhenApiEnabled isEnabled appStateRef endpointAction = do
appContext <- liftIO $ getAppContext appStateRef
if (isEnabled appContext)
then do endpointAction
else do
let qErr = err404 NotFound "resource does not exist"
Spock.setStatus $ qeStatus qErr
Spock.json $ encodeQErr False qErr
raiseGenericApiError ::
forall m.
(MonadIO m, HttpLog m) =>

View File

@ -5,7 +5,7 @@ module Hasura.Server.AppStateRef
initialiseAppStateRef,
withSchemaCacheUpdate,
readAppContextRef,
readSchemaCacheRef,
getRebuildableSchemaCacheWithVersion,
-- * TLS AllowList reference
TLSAllowListRef,
@ -140,8 +140,8 @@ readAppContextRef :: AppStateRef impl -> IO (RebuildableAppContext impl)
readAppContextRef scRef = asAppCtx <$> readIORef (_scrCache scRef)
-- | Read the contents of the 'AppStateRef' to get the latest 'RebuildableSchemaCache' and 'SchemaCacheVer'
readSchemaCacheRef :: AppStateRef impl -> IO (RebuildableSchemaCache, SchemaCacheVer)
readSchemaCacheRef scRef = asSchemaCache <$> readIORef (_scrCache scRef)
getRebuildableSchemaCacheWithVersion :: AppStateRef impl -> IO (RebuildableSchemaCache, SchemaCacheVer)
getRebuildableSchemaCacheWithVersion scRef = asSchemaCache <$> readIORef (_scrCache scRef)
--------------------------------------------------------------------------------
-- TLS Allow List
@ -179,11 +179,11 @@ readTLSAllowList (TLSAllowListRef ref) =
-- | Read the latest 'SchemaCache' from the 'AppStateRef'.
getSchemaCache :: AppStateRef impl -> IO SchemaCache
getSchemaCache asRef = lastBuiltSchemaCache . fst <$> readSchemaCacheRef asRef
getSchemaCache asRef = lastBuiltSchemaCache . fst <$> getRebuildableSchemaCacheWithVersion asRef
-- | Read the latest 'SchemaCache' and its version from the 'AppStateRef'.
getSchemaCacheWithVersion :: AppStateRef impl -> IO (SchemaCache, SchemaCacheVer)
getSchemaCacheWithVersion scRef = first lastBuiltSchemaCache <$> readSchemaCacheRef scRef
getSchemaCacheWithVersion scRef = fmap (\(sc, ver) -> (lastBuiltSchemaCache sc, ver)) $ getRebuildableSchemaCacheWithVersion scRef
-- | Read the latest 'AppContext' from the 'AppStateRef'.
getAppContext :: AppStateRef impl -> IO AppContext

View File

@ -3,6 +3,7 @@
module Hasura.Server.Auth
( getUserInfoWithExpTime,
AuthMode (..),
compareAuthMode,
setupAuthMode,
AdminSecretHash,
unsafeMkAdminSecretHash,
@ -97,7 +98,25 @@ data AuthMode
| AMAdminSecret !(Set.HashSet AdminSecretHash) !(Maybe RoleName)
| AMAdminSecretAndHook !(Set.HashSet AdminSecretHash) !AuthHook
| AMAdminSecretAndJWT !(Set.HashSet AdminSecretHash) ![JWTCtx] !(Maybe RoleName)
deriving (Show, Eq)
deriving (Eq, Show)
-- | In case JWT is used as an authentication mode, the JWKs are stored inside JWTCtx
-- as an `IORef`. `IORef` has pointer equality, so we need to compare the values
-- inside the `IORef` to check if the `JWTCtx` is same.
compareAuthMode :: AuthMode -> AuthMode -> IO Bool
compareAuthMode authMode authMode' = do
case (authMode, authMode') of
((AMAdminSecretAndJWT adminSecretHash jwtCtx roleName), (AMAdminSecretAndJWT adminSecretHash' jwtCtx' roleName')) -> do
-- Since keyConfig of JWTCtx is an IORef it is necessary to extract the value before checking the equality
isJwtCtxSame <- zipWithM compareJWTConfig jwtCtx jwtCtx'
return $ (adminSecretHash == adminSecretHash') && (and isJwtCtxSame) && (roleName == roleName')
_ -> return $ authMode == authMode'
where
compareJWTConfig :: JWTCtx -> JWTCtx -> IO Bool
compareJWTConfig (JWTCtx url keyConfigRef audM iss claims allowedSkew headers) (JWTCtx url' keyConfigRef' audM' iss' claims' allowedSkew' headers') = do
keyConfig <- readIORef keyConfigRef
keyConfig' <- readIORef keyConfigRef'
return $ (url, keyConfig, audM, iss, claims, allowedSkew, headers) == (url', keyConfig', audM', iss', claims', allowedSkew', headers')
-- | Validate the user's requested authentication configuration, launching any
-- required maintenance threads for JWT etc.

View File

@ -36,7 +36,7 @@ import Hasura.SQL.BackendMap qualified as BackendMap
import Hasura.Server.AppStateRef
( AppStateRef,
getAppContext,
readSchemaCacheRef,
getRebuildableSchemaCacheWithVersion,
withSchemaCacheUpdate,
)
import Hasura.Server.Logging
@ -281,7 +281,7 @@ refreshSchemaCache
let logger = _lsLogger appEnvLoggers
respErr <- runExceptT $
withSchemaCacheUpdate appStateRef logger (Just logTVar) $ do
rebuildableCache <- liftIO $ fst <$> readSchemaCacheRef appStateRef
rebuildableCache <- liftIO $ fst <$> getRebuildableSchemaCacheWithVersion appStateRef
appContext <- liftIO $ getAppContext appStateRef
let serverConfigCtx = buildServerConfigCtx appEnv appContext
(msg, cache, _) <-

View File

@ -37,6 +37,7 @@ import Data.List qualified as L
import Data.List.Extended qualified as L
import Data.Text qualified as T
import Data.Text.Conversions (UTF8 (..), decodeText)
import Hasura.App.State qualified as State
import Hasura.HTTP
import Hasura.Logging
import Hasura.LogicalModel.Cache (LogicalModelInfo (_lmiArguments))
@ -52,6 +53,8 @@ import Hasura.RQL.Types.Table
import Hasura.SQL.AnyBackend qualified as Any
import Hasura.SQL.Backend (BackendType)
import Hasura.SQL.Tag
import Hasura.Server.AppStateRef qualified as HGE
import Hasura.Server.Init.Config
import Hasura.Server.Telemetry.Counters (dumpServiceTimingMetrics)
import Hasura.Server.Telemetry.Types
import Hasura.Server.Types
@ -127,28 +130,33 @@ telemetryUrl = "https://telemetry.hasura.io/v1/http"
-- hours. The send time depends on when the server was started and will
-- naturally drift.
runTelemetry ::
forall m impl.
( MonadIO m,
State.HasAppEnv m
) =>
Logger Hasura ->
HTTP.Manager ->
-- | an action that always returns the latest schema cache
IO SchemaCache ->
-- | an action that always returns the latest schema cache ref
HGE.AppStateRef impl ->
MetadataDbId ->
InstanceId ->
PGVersion ->
HashSet ExperimentalFeature ->
IO void
runTelemetry (Logger logger) manager getSchemaCache metadataDbUid instanceId pgVersion experimentalFeatures = do
let options = wreqOptions manager []
forever $ do
schemaCache <- getSchemaCache
m Void
runTelemetry (Logger logger) appStateRef metadataDbUid pgVersion = do
State.AppEnv {..} <- State.askAppEnv
let options = wreqOptions appEnvManager []
forever $ liftIO $ do
telemetryStatus <- State.acEnableTelemetry <$> HGE.getAppContext appStateRef
case telemetryStatus of
TelemetryEnabled -> do
schemaCache <- HGE.getSchemaCache appStateRef
serviceTimings <- dumpServiceTimingMetrics
experimentalFeatures <- State.acExperimentalFeatures <$> HGE.getAppContext appStateRef
ci <- CI.getCI
-- Creates a telemetry payload for a specific backend.
let telemetryForSource :: forall (b :: BackendType). HasTag b => SourceInfo b -> TelemetryPayload
telemetryForSource =
mkTelemetryPayload
metadataDbUid
instanceId
appEnvInstanceId
currentVersion
pgVersion
ci
@ -167,6 +175,7 @@ runTelemetry (Logger logger) manager getSchemaCache metadataDbUid instanceId pgV
resp <- try $ Wreq.postWith options (T.unpack telemetryUrl) payload
either logHttpEx handleHttpResp resp
C.sleep $ days 1
TelemetryDisabled -> C.sleep $ seconds 1
where
logHttpEx :: HTTP.HttpException -> IO ()
logHttpEx ex = do

View File

@ -378,7 +378,7 @@ streamingSubscriptionPollingSpec srcConfig = do
dummyPromMetrics
subscriberMetadata
subscriptionState
subOptions
(pure (subOptions, subOptions))
SNDefault
dummyParamQueryHash
Nothing