From 8389a7e2730dd8fdc224a48f5943925a31b3fcf6 Mon Sep 17 00:00:00 2001 From: Vamshi Surabhi <0x777@users.noreply.github.com> Date: Tue, 30 Apr 2019 10:45:23 +0530 Subject: [PATCH] multiplexed subscription improvements (#2081) * split stm transactions when snapshotting to make it faster * mx subs: push to both old and new sinks at the same time * expose dev APIs through allowed APIs flag --- server/graphql-engine.cabal | 2 +- .../Hasura/GraphQL/Execute/LiveQuery.hs | 6 +-- .../GraphQL/Execute/LiveQuery/Multiplexed.hs | 47 ++++++++++++------- server/src-lib/Hasura/Server/App.hs | 24 ++++++---- server/src-lib/Hasura/Server/Init.hs | 16 +++++-- 5 files changed, 61 insertions(+), 34 deletions(-) diff --git a/server/graphql-engine.cabal b/server/graphql-engine.cabal index 0ac31c68f98..4e36f912c70 100644 --- a/server/graphql-engine.cabal +++ b/server/graphql-engine.cabal @@ -290,7 +290,7 @@ library if flag(profile) ghc-prof-options: -rtsopts -fprof-auto -fno-prof-count-entries if flag(developer) - cpp-options: -DInternalAPIs -DLocalConsole + cpp-options: -DDeveloperAPIs -DLocalConsole if flag(local-console) cpp-options: -DLocalConsole diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs index ceea89b8846..83be0a11aac 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery.hs @@ -69,9 +69,9 @@ data LiveQueriesState } dumpLiveQueriesState - :: LiveQueriesState -> IO J.Value -dumpLiveQueriesState (LiveQueriesState mx fallback _) = do - mxJ <- LQM.dumpLiveQueriesState mx + :: Bool -> LiveQueriesState -> IO J.Value +dumpLiveQueriesState extended (LiveQueriesState mx fallback _) = do + mxJ <- LQM.dumpLiveQueriesState extended mx fallbackJ <- LQF.dumpLiveQueriesState fallback return $ J.object [ "fallback" J..= fallbackJ diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs index 47d56320f15..fcc0c821acd 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Multiplexed.hs @@ -148,23 +148,26 @@ initLiveQueriesState lqOptions = lqOptions <$> STMMap.new -dumpLiveQueriesState :: LiveQueriesState -> IO J.Value -dumpLiveQueriesState (LiveQueriesState opts lqMap) = do - lqMapJ <- dumpLiveQueryMap lqMap +dumpLiveQueriesState :: Bool -> LiveQueriesState -> IO J.Value +dumpLiveQueriesState extended (LiveQueriesState opts lqMap) = do + lqMapJ <- dumpLiveQueryMap extended lqMap return $ J.object [ "options" J..= opts , "live_queries_map" J..= lqMapJ ] -dumpLiveQueryMap :: LiveQueryMap -> IO J.Value -dumpLiveQueryMap lqMap = +dumpLiveQueryMap :: Bool -> LiveQueryMap -> IO J.Value +dumpLiveQueryMap extended lqMap = fmap J.toJSON $ do entries <- STM.atomically $ ListT.toList $ STMMap.listT lqMap forM entries $ \(lq, (lqHandler, threadRef)) -> do ThreadState threadId metrics <- STM.atomically $ STM.readTMVar threadRef metricsJ <- dumpReftechMetrics metrics - candidatesJ <- dumpCandidates $ _mhCandidates lqHandler + candidatesJ <- + if extended + then fmap Just $ dumpCandidates $ _mhCandidates lqHandler + else return Nothing return $ J.object [ "key" J..= lq , "thread_id" J..= show (A.asyncThreadId threadId) @@ -194,8 +197,8 @@ dumpLiveQueryMap lqMap = , "min" J..= Metrics.min stats , "max" J..= Metrics.max stats ] - dumpCandidates candidateMap = STM.atomically $ do - candidates <- toListTMap candidateMap + dumpCandidates candidateMap = do + candidates <- STM.atomically $ toListTMap candidateMap forM candidates $ \((usrVars, varVals), candidate) -> do candidateJ <- dumpCandidate candidate return $ J.object @@ -203,7 +206,8 @@ dumpLiveQueryMap lqMap = , "variable_values" J..= varVals , "candidate" J..= candidateJ ] - dumpCandidate (CandidateState respId _ respTV curOps newOps) = do + dumpCandidate (CandidateState respId _ respTV curOps newOps) = + STM.atomically $ do prevResHash <- STM.readTVar respTV curOpIds <- toListTMap curOps newOpIds <- toListTMap newOps @@ -434,12 +438,16 @@ data CandidateSnapshot pushCandidateResult :: GQResp -> Maybe RespHash -> CandidateSnapshot -> IO () pushCandidateResult resp respHashM candidateSnapshot = do - pushResultToSinks newSinks - -- write to the current websockets if needed prevRespHashM <- STM.readTVarIO respRef - when (isExecError resp || respHashM /= prevRespHashM) $ do - pushResultToSinks curSinks - STM.atomically $ STM.writeTVar respRef respHashM + -- write to the current websockets if needed + sinks <- + if (isExecError resp || respHashM /= prevRespHashM) + then do + STM.atomically $ STM.writeTVar respRef respHashM + return (newSinks <> curSinks) + else + return newSinks + pushResultToSinks sinks where CandidateSnapshot _ respRef curSinks newSinks = candidateSnapshot pushResultToSinks = @@ -488,11 +496,14 @@ pollQuery pollQuery metrics batchSize pgExecCtx handler = do procInit <- Clock.getCurrentTime + -- get a snapshot of all the candidates - candidateSnapshotMap <- STM.atomically $ do - candidates <- toListTMap candidateMap - candidateSnapshots <- mapM getCandidateSnapshot candidates - return $ Map.fromList candidateSnapshots + -- this need not be done in a transaction + candidates <- STM.atomically $ toListTMap candidateMap + candidateSnapshotMap <- + fmap Map.fromList $ + mapM (STM.atomically . getCandidateSnapshot) candidates + let queryVarsBatches = chunks (unBatchSize batchSize) $ getQueryVars candidateSnapshotMap diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index e88ee82ac31..f78b8e94e9e 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -158,6 +158,9 @@ isMetadataEnabled sc = S.member METADATA $ scEnabledAPIs sc isGraphQLEnabled :: ServerCtx -> Bool isGraphQLEnabled sc = S.member GRAPHQL $ scEnabledAPIs sc +isDeveloperAPIEnabled :: ServerCtx -> Bool +isDeveloperAPIEnabled sc = S.member DEVELOPER $ scEnabledAPIs sc + -- {-# SCC parseBody #-} parseBody :: (FromJSON a) => Handler a parseBody = do @@ -418,14 +421,19 @@ httpApp corsCfg serverCtx enableConsole enableTelemetry = do query <- parseBody v1Alpha1GQHandler query -#ifdef InternalAPIs - get "internal/plan_cache" $ do - respJ <- liftIO $ E.dumpPlanCache $ scPlanCache serverCtx - json respJ - get "internal/subscriptions" $ do - respJ <- liftIO $ EL.dumpLiveQueriesState $ scLQState serverCtx - json respJ -#endif + when (isDeveloperAPIEnabled serverCtx) $ do + get "dev/plan_cache" $ mkSpockAction encodeQErr serverCtx $ do + onlyAdmin + respJ <- liftIO $ E.dumpPlanCache $ scPlanCache serverCtx + return $ encJFromJValue respJ + get "dev/subscriptions" $ mkSpockAction encodeQErr serverCtx $ do + onlyAdmin + respJ <- liftIO $ EL.dumpLiveQueriesState False $ scLQState serverCtx + return $ encJFromJValue respJ + get "dev/subscriptions/extended" $ mkSpockAction encodeQErr serverCtx $ do + onlyAdmin + respJ <- liftIO $ EL.dumpLiveQueriesState True $ scLQState serverCtx + return $ encJFromJValue respJ forM_ [GET,POST] $ \m -> hookAny m $ \_ -> do let qErr = err404 NotFound "resource does not exist" diff --git a/server/src-lib/Hasura/Server/Init.hs b/server/src-lib/Hasura/Server/Init.hs index 5c3e3f8920a..2d38188a8bc 100644 --- a/server/src-lib/Hasura/Server/Init.hs +++ b/server/src-lib/Hasura/Server/Init.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE CPP #-} module Hasura.Server.Init where import qualified Database.PG.Query as Q @@ -102,6 +103,7 @@ data HGECommandG a data API = METADATA | GRAPHQL + | DEVELOPER deriving (Show, Eq, Read, Generic) instance Hashable API @@ -263,13 +265,18 @@ mkServeOptions rso = do enableTelemetry <- fromMaybe True <$> withEnv (rsoEnableTelemetry rso) (fst enableTelemetryEnv) strfyNum <- withEnvBool (rsoStringifyNum rso) $ fst stringifyNumEnv - enabledAPIs <- Set.fromList . fromMaybe [METADATA,GRAPHQL] <$> + enabledAPIs <- Set.fromList . fromMaybe defaultAPIs <$> withEnv (rsoEnabledAPIs rso) (fst enabledAPIsEnv) lqOpts <- mkLQOpts return $ ServeOptions port host connParams txIso adminScrt authHook jwtSecret unAuthRole corsCfg enableConsole enableTelemetry strfyNum enabledAPIs lqOpts where +#ifdef DeveloperAPIs + defaultAPIs = [METADATA,GRAPHQL,DEVELOPER] +#else + defaultAPIs = [METADATA,GRAPHQL] +#endif mkConnParams (RawConnParams s c i p) = do stripes <- fromMaybe 1 <$> withEnv s (fst pgStripesEnv) conns <- fromMaybe 50 <$> withEnv c (fst pgConnsEnv) @@ -686,6 +693,7 @@ readAPIs = mapM readAPI . T.splitOn "," . T.pack where readAPI si = case T.toUpper $ T.strip si of "METADATA" -> Right METADATA "GRAPHQL" -> Right GRAPHQL + "DEVELOPER" -> Right DEVELOPER _ -> Left "Only expecting list of comma separated API types metadata / graphql" parseWebHook :: Parser RawAuthHook @@ -796,14 +804,14 @@ parseMxBatchSize = mxRefetchDelayEnv :: (String, String) mxRefetchDelayEnv = ( "HASURA_GRAPHQL_LIVE_QUERIES_MULTIPLEXED_REFETCH_INTERVAL" - , "results will only be sent once in this interval (in milliseconds) for \ + , "results will only be sent once in this interval (in milliseconds) for \\ \live queries which can be multiplexed. Default: 1000 (1sec)" ) mxBatchSizeEnv :: (String, String) mxBatchSizeEnv = ( "HASURA_GRAPHQL_LIVE_QUERIES_MULTIPLEXED_BATCH_SIZE" - , "multiplexed live queries are split into batches of the specified \ + , "multiplexed live queries are split into batches of the specified \\ \size. Default 100. " ) @@ -819,7 +827,7 @@ parseFallbackRefetchInt = fallbackRefetchDelayEnv :: (String, String) fallbackRefetchDelayEnv = ( "HASURA_GRAPHQL_LIVE_QUERIES_FALLBACK_REFETCH_INTERVAL" - , "results will only be sent once in this interval (in milliseconds) for \ + , "results will only be sent once in this interval (in milliseconds) for \\ \live queries which cannot be multiplexed. Default: 1000 (1sec)" )