mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-09-21 15:38:40 +03:00
server: refactor 'pollQuery' to have a hook to process 'PollDetails' (#5391)
Co-authored-by: Vamshi Surabhi <0x777@users.noreply.github.com>
This commit is contained in:
parent
d6dab9dc01
commit
1eb36bbbb3
@ -41,7 +41,7 @@ runApp env (HGEOptionsG rci hgeCmd) =
|
||||
HCServe serveOptions -> do
|
||||
(initCtx, initTime) <- initialiseCtx env hgeCmd rci
|
||||
let shutdownApp = return ()
|
||||
-- Catches the SIGTERM signal and initiates a graceful shutdown.
|
||||
-- Catches the SIGTERM signal and initiates a graceful shutdown.
|
||||
-- Graceful shutdown for regular HTTP requests is already implemented in
|
||||
-- Warp, and is triggered by invoking the 'closeSocket' callback.
|
||||
-- We only catch the SIGTERM signal once, that is, if the user hits CTRL-C
|
||||
@ -50,7 +50,7 @@ runApp env (HGEOptionsG rci hgeCmd) =
|
||||
Signals.sigTERM
|
||||
(Signals.CatchOnce (shutdownGracefully initCtx))
|
||||
Nothing
|
||||
runHGEServer env serveOptions initCtx Nothing initTime shutdownApp
|
||||
runHGEServer env serveOptions initCtx Nothing initTime shutdownApp Nothing
|
||||
|
||||
HCExport -> do
|
||||
(initCtx, _) <- initialiseCtx env hgeCmd rci
|
||||
@ -68,7 +68,7 @@ runApp env (HGEOptionsG rci hgeCmd) =
|
||||
let sqlGenCtx = SQLGenCtx False
|
||||
res <- runAsAdmin _icPgPool sqlGenCtx _icHttpManager $ do
|
||||
schemaCache <- buildRebuildableSchemaCache env
|
||||
execQuery env queryBs
|
||||
execQuery env queryBs
|
||||
& Tracing.runTraceTWithReporter Tracing.noReporter "execute"
|
||||
& runHasSystemDefinedT (SystemDefined False)
|
||||
& runCacheRWT schemaCache
|
||||
|
@ -73,6 +73,7 @@ import Hasura.Session
|
||||
|
||||
import qualified Hasura.Tracing as Tracing
|
||||
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
|
||||
import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as EL
|
||||
|
||||
data ExitCode
|
||||
= InvalidEnvironmentVariableOptionsError
|
||||
@ -305,8 +306,9 @@ runHGEServer
|
||||
-- ^ start time
|
||||
-> IO ()
|
||||
-- ^ shutdown function
|
||||
-> Maybe EL.LiveQueryPostPollHook
|
||||
-> m ()
|
||||
runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp = do
|
||||
runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp postPollHook = do
|
||||
-- Comment this to enable expensive assertions from "GHC.AssertNF". These
|
||||
-- will log lines to STDOUT containing "not in normal form". In the future we
|
||||
-- could try to integrate this into our tests. For now this is a development
|
||||
@ -323,7 +325,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp = d
|
||||
|
||||
authMode <- either (printErrExit AuthConfigurationError . T.unpack) return authModeRes
|
||||
|
||||
_idleGCThread <- C.forkImmortal "ourIdleGC" logger $ liftIO $
|
||||
_idleGCThread <- C.forkImmortal "ourIdleGC" logger $ liftIO $
|
||||
ourIdleGC logger (seconds 0.3) (seconds 10) (seconds 60)
|
||||
|
||||
HasuraApp app cacheRef cacheInitTime stopWsServer <- flip onException (flushLogger loggerCtx) $
|
||||
@ -346,6 +348,7 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp = d
|
||||
soLiveQueryOpts
|
||||
soPlanCacheOptions
|
||||
soResponseInternalErrorsConfig
|
||||
postPollHook
|
||||
_icSchemaCache
|
||||
|
||||
-- log inconsistent schema objects
|
||||
|
@ -13,6 +13,8 @@ module Hasura.GraphQL.Execute.LiveQuery.Poll (
|
||||
, PollDetails(..)
|
||||
, BatchExecutionDetails(..)
|
||||
, CohortExecutionDetails(..)
|
||||
, LiveQueryPostPollHook
|
||||
, defaultLiveQueryPostPollHook
|
||||
|
||||
-- * Cohorts
|
||||
, Cohort(..)
|
||||
@ -28,6 +30,7 @@ module Hasura.GraphQL.Execute.LiveQuery.Poll (
|
||||
, newSubscriberId
|
||||
, SubscriberMetadata
|
||||
, mkSubscriberMetadata
|
||||
, unSubscriberMetadata
|
||||
, SubscriberMap
|
||||
, OnChange
|
||||
, LGQResponse
|
||||
@ -382,17 +385,23 @@ pollDetailMinimal (PollDetails{..}) =
|
||||
instance L.ToEngineLog PollDetails L.Hasura where
|
||||
toEngineLog pl = (L.LevelInfo, L.ELTLivequeryPollerLog, pollDetailMinimal pl)
|
||||
|
||||
type LiveQueryPostPollHook = PollDetails -> IO ()
|
||||
|
||||
-- the default LiveQueryPostPollHook
|
||||
defaultLiveQueryPostPollHook :: L.Logger L.Hasura -> LiveQueryPostPollHook
|
||||
defaultLiveQueryPostPollHook logger pd = L.unLogger logger pd
|
||||
|
||||
-- | Where the magic happens: the top-level action run periodically by each
|
||||
-- active 'Poller'. This needs to be async exception safe.
|
||||
pollQuery
|
||||
:: L.Logger L.Hasura
|
||||
-> PollerId
|
||||
:: PollerId
|
||||
-> LiveQueriesOptions
|
||||
-> PGExecCtx
|
||||
-> MultiplexedQuery
|
||||
-> CohortMap
|
||||
-> LiveQueryPostPollHook
|
||||
-> IO ()
|
||||
pollQuery logger pollerId lqOpts pgExecCtx pgQuery cohortMap = do
|
||||
pollQuery pollerId lqOpts pgExecCtx pgQuery cohortMap postPollHook = do
|
||||
(totalTime, (snapshotTime, batchesDetails)) <- withElapsedTime $ do
|
||||
|
||||
-- snapshot the current cohorts and split them into batches
|
||||
@ -428,14 +437,15 @@ pollQuery logger pollerId lqOpts pgExecCtx pgQuery cohortMap = do
|
||||
|
||||
pure (snapshotTime, batchesDetails)
|
||||
|
||||
L.unLogger logger $ PollDetails
|
||||
{ _pdPollerId = pollerId
|
||||
, _pdGeneratedSql = unMultiplexedQuery pgQuery
|
||||
, _pdSnapshotTime = snapshotTime
|
||||
, _pdBatches = batchesDetails
|
||||
, _pdLiveQueryOptions = lqOpts
|
||||
, _pdTotalTime = totalTime
|
||||
}
|
||||
let pollDetails = PollDetails
|
||||
{ _pdPollerId = pollerId
|
||||
, _pdGeneratedSql = unMultiplexedQuery pgQuery
|
||||
, _pdSnapshotTime = snapshotTime
|
||||
, _pdBatches = batchesDetails
|
||||
, _pdLiveQueryOptions = lqOpts
|
||||
, _pdTotalTime = totalTime
|
||||
}
|
||||
postPollHook pollDetails
|
||||
where
|
||||
LiveQueriesOptions batchSize _ = lqOpts
|
||||
|
||||
|
@ -7,6 +7,7 @@ module Hasura.GraphQL.Execute.LiveQuery.State
|
||||
, dumpLiveQueriesState
|
||||
|
||||
, LiveQueryId
|
||||
, LiveQueryPostPollHook
|
||||
, addLiveQuery
|
||||
, removeLiveQuery
|
||||
) where
|
||||
@ -41,13 +42,16 @@ data LiveQueriesState
|
||||
{ _lqsOptions :: !LiveQueriesOptions
|
||||
, _lqsPGExecTx :: !PGExecCtx
|
||||
, _lqsLiveQueryMap :: !PollerMap
|
||||
, _lqsPostPollHook :: !LiveQueryPostPollHook
|
||||
-- ^ A hook function which is run after each fetch cycle
|
||||
}
|
||||
|
||||
initLiveQueriesState :: LiveQueriesOptions -> PGExecCtx -> IO LiveQueriesState
|
||||
initLiveQueriesState options pgCtx = LiveQueriesState options pgCtx <$> STMMap.newIO
|
||||
initLiveQueriesState :: LiveQueriesOptions -> PGExecCtx -> LiveQueryPostPollHook -> IO LiveQueriesState
|
||||
initLiveQueriesState options pgCtx pollHook =
|
||||
LiveQueriesState options pgCtx <$> STMMap.newIO <*> pure pollHook
|
||||
|
||||
dumpLiveQueriesState :: Bool -> LiveQueriesState -> IO J.Value
|
||||
dumpLiveQueriesState extended (LiveQueriesState opts _ lqMap) = do
|
||||
dumpLiveQueriesState extended (LiveQueriesState opts _ lqMap _) = do
|
||||
lqMapJ <- dumpPollerMap extended lqMap
|
||||
return $ J.object
|
||||
[ "options" J..= opts
|
||||
@ -100,7 +104,7 @@ addLiveQuery logger subscriberMetadata lqState plan onResultAction = do
|
||||
onJust handlerM $ \handler -> do
|
||||
pollerId <- PollerId <$> UUID.nextRandom
|
||||
threadRef <- forkImmortal ("pollQuery." <> show pollerId) logger $ forever $ do
|
||||
pollQuery logger pollerId lqOpts pgExecCtx query $ _pCohorts handler
|
||||
pollQuery pollerId lqOpts pgExecCtx query (_pCohorts handler) postPollHook
|
||||
sleep $ unRefetchInterval refetchInterval
|
||||
let !pState = PollerIOState threadRef pollerId
|
||||
$assertNFHere pState -- so we don't write thunks to mutable vars
|
||||
@ -108,7 +112,7 @@ addLiveQuery logger subscriberMetadata lqState plan onResultAction = do
|
||||
|
||||
pure $ LiveQueryId handlerId cohortKey subscriberId
|
||||
where
|
||||
LiveQueriesState lqOpts pgExecCtx lqMap = lqState
|
||||
LiveQueriesState lqOpts pgExecCtx lqMap postPollHook = lqState
|
||||
LiveQueriesOptions _ refetchInterval = lqOpts
|
||||
LiveQueryPlan (ParameterizedLiveQueryPlan role query) cohortKey = plan
|
||||
|
||||
|
@ -62,6 +62,7 @@ import Hasura.SQL.Types
|
||||
|
||||
import qualified Hasura.GraphQL.Execute as E
|
||||
import qualified Hasura.GraphQL.Execute.LiveQuery as EL
|
||||
import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as EL
|
||||
import qualified Hasura.GraphQL.Explain as GE
|
||||
import qualified Hasura.GraphQL.Transport.HTTP as GH
|
||||
import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH
|
||||
@ -578,18 +579,20 @@ mkWaiApp
|
||||
-> EL.LiveQueriesOptions
|
||||
-> E.PlanCacheOptions
|
||||
-> ResponseInternalErrorsConfig
|
||||
-> Maybe EL.LiveQueryPostPollHook
|
||||
-> (RebuildableSchemaCache Run, Maybe UTCTime)
|
||||
-> m HasuraApp
|
||||
mkWaiApp env isoLevel logger sqlGenCtx enableAL pool pgExecCtxCustom ci httpManager mode corsCfg enableConsole consoleAssetsDir
|
||||
enableTelemetry instanceId apis lqOpts planCacheOptions responseErrorsConfig (schemaCache, cacheBuiltTime) = do
|
||||
enableTelemetry instanceId apis lqOpts planCacheOptions responseErrorsConfig liveQueryHook (schemaCache, cacheBuiltTime) = do
|
||||
|
||||
(planCache, schemaCacheRef) <- initialiseCache
|
||||
let getSchemaCache = first lastBuiltSchemaCache <$> readIORef (_scrCache schemaCacheRef)
|
||||
|
||||
let corsPolicy = mkDefaultCorsPolicy corsCfg
|
||||
pgExecCtx = fromMaybe (mkPGExecCtx isoLevel pool) pgExecCtxCustom
|
||||
postPollHook = fromMaybe (EL.defaultLiveQueryPostPollHook logger) liveQueryHook
|
||||
|
||||
lqState <- liftIO $ EL.initLiveQueriesState lqOpts pgExecCtx
|
||||
lqState <- liftIO $ EL.initLiveQueriesState lqOpts pgExecCtx postPollHook
|
||||
wsServerEnv <- WS.createWSServerEnv logger pgExecCtx lqState getSchemaCache httpManager
|
||||
corsPolicy sqlGenCtx enableAL planCache
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user