server: changes catalog initialization and logging for pro customization (#5139)

* new typeclass to abstract the logic of QueryLog-ing

* abstract the logic of logging websocket-server logs

  introduce a MonadWSLog typeclass

* move catalog initialization to init step

  expose a helper function to migrate catalog
  create schema cache in initialiseCtx

* expose various modules and functions for pro
This commit is contained in:
Anon Ray 2020-06-19 12:12:32 +05:30 committed by GitHub
parent f2428e3984
commit a7a60c2dfe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 280 additions and 218 deletions

View File

@ -252,11 +252,13 @@ library
-- exposed for Pro
, Hasura.GraphQL.Execute
, Hasura.GraphQL.Execute.Query
, Hasura.GraphQL.Execute.LiveQuery
, Hasura.GraphQL.Validate
, Hasura.GraphQL.Transport.HTTP
, Hasura.GraphQL.Transport.WebSocket.Protocol
, Hasura.GraphQL.Transport.WebSocket.Server
, Hasura.GraphQL.Logging
, Hasura.RQL.Types
, Hasura.RQL.Types.Run
@ -272,6 +274,9 @@ library
, Data.Aeson.Ordered
, Data.TByteString
, Network.Wai.Extended
, Control.Concurrent.Extended
other-modules: Hasura.Incremental.Select
, Hasura.Incremental.Internal.Cache
, Hasura.Incremental.Internal.Dependency
@ -372,7 +377,6 @@ library
, Hasura.GraphQL.Validate.InputValue
, Hasura.GraphQL.Explain
, Hasura.GraphQL.Execute.Plan
, Hasura.GraphQL.Execute.Query
, Hasura.GraphQL.Execute.LiveQuery.Options
, Hasura.GraphQL.Execute.LiveQuery.Plan
, Hasura.GraphQL.Execute.LiveQuery.Poll
@ -390,13 +394,11 @@ library
, Hasura.GraphQL.Resolve.Select
, Hasura.GraphQL.RemoteServer
, Hasura.GraphQL.Context
, Hasura.GraphQL.Logging
, Hasura.Eventing.HTTP
, Hasura.Eventing.EventTrigger
, Hasura.Eventing.ScheduledTrigger
, Control.Concurrent.Extended
, Control.Lens.Extended
, Data.Aeson.Extended
, Data.List.Extended
@ -415,7 +417,6 @@ library
, Hasura.SQL.Value
, Network.URI.Extended
, Network.Wai.Extended
, Network.Wai.Handler.WebSockets.Custom
executable graphql-engine

View File

@ -2,63 +2,71 @@
module Hasura.App where
import Control.Concurrent.STM.TVar (readTVarIO)
import Control.Concurrent.STM.TVar (readTVarIO)
import Control.Lens (view, _2)
import Control.Monad.Base
import Control.Monad.Catch (MonadCatch, MonadThrow, onException)
import Control.Monad.Catch (MonadCatch, MonadThrow, onException)
import Control.Monad.Stateless
import Control.Monad.STM (atomically)
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Data.Aeson ((.=))
import Data.Time.Clock (UTCTime)
import Control.Monad.STM (atomically)
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Data.Aeson ((.=))
import Data.Time.Clock (UTCTime)
import GHC.AssertNF
import Options.Applicative
import System.Environment (getEnvironment, lookupEnv)
import System.Exit (exitFailure)
import System.Environment (getEnvironment, lookupEnv)
import System.Exit (exitFailure)
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Control.Concurrent.Extended as C
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy.Char8 as BLC
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.Time.Clock as Clock
import qualified Data.Yaml as Y
import qualified Database.PG.Query as Q
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as HTTP
import qualified Network.Wai.Handler.Warp as Warp
import qualified System.Log.FastLogger as FL
import qualified Text.Mustache.Compile as M
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Control.Concurrent.Extended as C
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy.Char8 as BLC
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.Time.Clock as Clock
import qualified Data.Yaml as Y
import qualified Database.PG.Query as Q
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as HTTP
import qualified Network.Wai.Handler.Warp as Warp
import qualified System.Log.FastLogger as FL
import qualified Text.Mustache.Compile as M
import Hasura.Db
import Hasura.EncJSON
import Hasura.Eventing.EventTrigger
import Hasura.Eventing.ScheduledTrigger
import Hasura.GraphQL.Execute (MonadGQLExecutionCheck (..),
checkQueryInAllowlist)
import Hasura.GraphQL.Resolve.Action (asyncActionsProcessor)
import Hasura.GraphQL.Transport.HTTP.Protocol (toParsed)
import Hasura.GraphQL.Execute (MonadGQLExecutionCheck (..),
checkQueryInAllowlist)
import Hasura.GraphQL.Logging (MonadQueryLog (..), QueryLog (..))
import Hasura.GraphQL.Resolve.Action (asyncActionsProcessor)
import Hasura.GraphQL.Transport.HTTP.Protocol (toParsed)
import Hasura.Logging
import Hasura.Prelude
import Hasura.RQL.Types (CacheRWM, Code (..), HasHttpManager,
HasSQLGenCtx, HasSystemDefined, QErr (..),
SQLGenCtx (..), SchemaCache (..),
UserInfoM, buildSchemaCacheStrict,
decodeValue, throw400, withPathK)
import Hasura.RQL.DDL.Schema.Cache
import Hasura.RQL.Types (CacheRWM, Code (..), HasHttpManager,
HasSQLGenCtx, HasSystemDefined,
QErr (..), SQLGenCtx (..),
SchemaCache (..), UserInfoM,
buildSchemaCacheStrict, decodeValue,
throw400, withPathK)
import Hasura.RQL.Types.Run
import Hasura.Server.API.Query (requiresAdmin, runQueryM)
import Hasura.Server.API.Query (fetchLastUpdate, requiresAdmin,
runQueryM)
import Hasura.Server.App
import Hasura.Server.Auth
import Hasura.Server.CheckUpdates (checkForUpdates)
import Hasura.Server.CheckUpdates (checkForUpdates)
import Hasura.Server.Init
import Hasura.Server.Logging
import Hasura.Server.Migrate (migrateCatalog)
import Hasura.Server.SchemaUpdate
import Hasura.Server.Telemetry
import Hasura.Server.Version
import Hasura.Session
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
printErrExit :: (MonadIO m) => forall a . String -> m a
printErrExit = liftIO . (>> exitFailure) . putStrLn
@ -118,6 +126,7 @@ data InitCtx
, _icConnInfo :: !Q.ConnInfo
, _icPgPool :: !Q.PGPool
, _icShutdownLatch :: !ShutdownLatch
, _icSchemaCache :: !(RebuildableSchemaCache Run, Maybe UTCTime)
}
-- | Collection of the LoggerCtx, the regular Logger and the PGLogger
@ -138,7 +147,7 @@ newtype AppM a = AppM { unAppM :: IO a }
-- this exists as a separate function because the context (logger, http manager, pg pool) can be
-- used by other functions as well
initialiseCtx
:: MonadIO m
:: (HasVersion, MonadIO m, MonadCatch m)
=> HGECommand Hasura
-> RawConnInfo
-> m (InitCtx, UTCTime)
@ -149,8 +158,8 @@ initialiseCtx hgeCmd rci = do
instanceId <- liftIO generateInstanceId
connInfo <- liftIO procConnInfo
latch <- liftIO newShutdownLatch
(loggers, pool) <- case hgeCmd of
-- for server command generate a proper pool
(loggers, pool, sqlGenCtx) <- case hgeCmd of
-- for the @serve@ command generate a regular PG pool
HCServe so@ServeOptions{..} -> do
l@(Loggers _ logger pgLogger) <- mkLoggers soEnabledLogTypes soLogLevel
-- log serve options
@ -158,15 +167,17 @@ initialiseCtx hgeCmd rci = do
-- log postgres connection info
unLogger logger $ connInfoToLog connInfo
pool <- liftIO $ Q.initPGPool connInfo soConnParams pgLogger
pure (l, pool)
pure (l, pool, SQLGenCtx soStringifyNum)
-- for other commands generate a minimal pool
-- for other commands generate a minimal PG pool
_ -> do
l@(Loggers _ _ pgLogger) <- mkLoggers defaultEnabledLogTypes LevelInfo
pool <- getMinimalPool pgLogger connInfo
pure (l, pool)
pure (l, pool, SQLGenCtx False)
pure (InitCtx httpManager instanceId loggers connInfo pool latch, initTime)
res <- flip onException (flushLogger (_lsLoggerCtx loggers)) $
migrateCatalogSchema (_lsLogger loggers) pool httpManager sqlGenCtx
pure (InitCtx httpManager instanceId loggers connInfo pool latch res, initTime)
where
procConnInfo =
either (printErrExit . ("Fatal Error : " <>)) return $ mkConnInfo rci
@ -181,6 +192,29 @@ initialiseCtx hgeCmd rci = do
pgLogger = mkPGLogger logger
return $ Loggers loggerCtx logger pgLogger
-- | helper function to initialize or migrate the @hdb_catalog@ schema (used by pro as well)
migrateCatalogSchema
:: (HasVersion, MonadIO m)
=> Logger Hasura -> Q.PGPool -> HTTP.Manager -> SQLGenCtx
-> m (RebuildableSchemaCache Run, Maybe UTCTime)
migrateCatalogSchema logger pool httpManager sqlGenCtx = do
let pgExecCtx = mkPGExecCtx Q.Serializable pool
adminRunCtx = RunCtx adminUserInfo httpManager sqlGenCtx
currentTime <- liftIO Clock.getCurrentTime
initialiseResult <- runExceptT $ peelRun adminRunCtx pgExecCtx Q.ReadWrite $
(,) <$> migrateCatalog currentTime <*> liftTx fetchLastUpdate
((migrationResult, schemaCache), lastUpdateEvent) <-
initialiseResult `onLeft` \err -> do
unLogger logger StartupLog
{ slLogLevel = LevelError
, slKind = "db_migrate"
, slInfo = A.toJSON err
}
liftIO exitFailure
unLogger logger migrationResult
return (schemaCache, view _2 <$> lastUpdateEvent)
-- | Run a transaction and if an error is encountered, log the error and abort the program
runTxIO :: Q.PGPool -> Q.TxMode -> Q.TxE QErr a -> IO a
runTxIO pool isoLevel tx = do
@ -202,6 +236,13 @@ waitForShutdown = C.takeMVar . unShutdownLatch
shutdownGracefully :: InitCtx -> IO ()
shutdownGracefully = flip C.putMVar () . unShutdownLatch . _icShutdownLatch
-- | If an exception is encountered , flush the log buffer and
-- rethrow If we do not flush the log buffer on exception, then log lines
-- may be missed
-- See: https://github.com/hasura/graphql-engine/issues/4772
flushLogger :: (MonadIO m) => LoggerCtx impl -> m ()
flushLogger loggerCtx = liftIO $ FL.flushLogStr $ _lcLoggerSet loggerCtx
runHGEServer
:: ( HasVersion
, MonadIO m
@ -211,9 +252,11 @@ runHGEServer
, UserAuthentication m
, MetadataApiAuthorization m
, HttpLog m
, MonadQueryLog m
, ConsoleRenderer m
, MonadGQLExecutionCheck m
, MonadConfigApiHandler m
, WS.MonadWSLog m
)
=> ServeOptions impl
-> InitCtx
@ -240,12 +283,7 @@ runHGEServer ServeOptions{..} InitCtx{..} pgExecCtx initTime = do
authMode <- either (printErrExit . T.unpack) return authModeRes
-- If an exception is encountered in 'mkWaiApp', flush the log buffer and
-- rethrow If we do not flush the log buffer on exception, then log lines
-- written in 'mkWaiApp' may be missed
-- See: https://github.com/hasura/graphql-engine/issues/4772
let flushLogger = liftIO $ FL.flushLogStr $ _lcLoggerSet loggerCtx
HasuraApp app cacheRef cacheInitTime shutdownApp <- flip onException flushLogger $
HasuraApp app cacheRef cacheInitTime shutdownApp <- flip onException (flushLogger loggerCtx) $
mkWaiApp soTxIso
logger
sqlGenCtx
@ -264,6 +302,7 @@ runHGEServer ServeOptions{..} InitCtx{..} pgExecCtx initTime = do
soLiveQueryOpts
soPlanCacheOptions
soResponseInternalErrorsConfig
_icSchemaCache
-- log inconsistent schema objects
inconsObjs <- scInconsistentObjs <$> liftIO (getSCFromRef cacheRef)
@ -451,6 +490,14 @@ instance MonadConfigApiHandler AppM where
runConfigApiHandler = configApiGetHandler
instance MonadQueryLog AppM where
logQueryLog logger query genSqlM reqId =
unLogger logger $ QueryLog query genSqlM reqId
instance WS.MonadWSLog AppM where
logWSLog = unLogger
--- helper functions ---
mkConsoleHTML :: HasVersion => Text -> AuthMode -> Bool -> Maybe Text -> Either String Text

View File

@ -390,6 +390,7 @@ execRemoteGQ
, MonadIO m
, MonadError QErr m
, MonadReader ExecutionCtx m
, MonadQueryLog m
)
=> RequestId
-> UserInfo
@ -403,7 +404,8 @@ execRemoteGQ reqId userInfo reqHdrs q rsi opType = do
execCtx <- ask
let logger = _ecxLogger execCtx
manager = _ecxHttpManager execCtx
L.unLogger logger $ QueryLog q Nothing reqId
-- L.unLogger logger $ QueryLog q Nothing reqId
logQueryLog logger q Nothing reqId
(time, respHdrs, resp) <- execRemoteGQ' manager userInfo reqHdrs q rsi opType
let !httpResp = HttpResponse (encJFromLBS resp) respHdrs
return (time, httpResp)

View File

@ -5,6 +5,7 @@ layer. In contrast with, logging at the HTTP server layer.
module Hasura.GraphQL.Logging
( QueryLog(..)
, MonadQueryLog (..)
) where
import qualified Data.Aeson as J
@ -45,3 +46,20 @@ encodeSql sql =
where
alName = G.unName . G.unAlias
jValFromAssocList xs = J.object $ map (uncurry (J..=)) xs
class Monad m => MonadQueryLog m where
logQueryLog
:: L.Logger L.Hasura
-- ^ logger
-> GQLReqUnparsed
-- ^ GraphQL request
-> (Maybe EQ.GeneratedSqlMap)
-- ^ Generated SQL if any
-> RequestId
-> m ()
instance MonadQueryLog m => MonadQueryLog (ExceptT e m) where
logQueryLog l req sqlMap reqId = lift $ logQueryLog l req sqlMap reqId
instance MonadQueryLog m => MonadQueryLog (ReaderT r m) where
logQueryLog l req sqlMap reqId = lift $ logQueryLog l req sqlMap reqId

View File

@ -1,4 +1,5 @@
-- | Execution of GraphQL queries over HTTP transport
{-# LANGUAGE RecordWildCards #-}
module Hasura.GraphQL.Transport.HTTP
( runGQ
, runGQBatched
@ -12,7 +13,7 @@ module Hasura.GraphQL.Transport.HTTP
) where
import Hasura.EncJSON
import Hasura.GraphQL.Logging
import Hasura.GraphQL.Logging (MonadQueryLog (..))
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.HTTP
import Hasura.Prelude
@ -24,7 +25,6 @@ import Hasura.Session
import qualified Database.PG.Query as Q
import qualified Hasura.GraphQL.Execute as E
import qualified Hasura.Logging as L
import qualified Hasura.Server.Telemetry.Counters as Telem
import qualified Language.GraphQL.Draft.Syntax as G
import qualified Network.HTTP.Types as HTTP
@ -38,6 +38,7 @@ runGQ
, MonadError QErr m
, MonadReader E.ExecutionCtx m
, E.MonadGQLExecutionCheck m
, MonadQueryLog m
)
=> RequestId
-> UserInfo
@ -82,6 +83,7 @@ runGQBatched
, MonadError QErr m
, MonadReader E.ExecutionCtx m
, E.MonadGQLExecutionCheck m
, MonadQueryLog m
)
=> RequestId
-> ResponseInternalErrorsConfig
@ -115,6 +117,7 @@ runHasuraGQ
:: ( MonadIO m
, MonadError QErr m
, MonadReader E.ExecutionCtx m
, MonadQueryLog m
)
=> RequestId
-> GQLReqUnparsed
@ -125,21 +128,29 @@ runHasuraGQ
-- spent in the PG query; for telemetry.
runHasuraGQ reqId query userInfo resolvedOp = do
(E.ExecutionCtx logger _ pgExecCtx _ _ _ _ _) <- ask
logQuery' logger
(telemTimeIO, respE) <- withElapsedTime $ liftIO $ runExceptT $ case resolvedOp of
E.ExOpQuery tx genSql -> do
E.ExOpQuery tx _genSql -> do
-- log the generated SQL and the graphql query
L.unLogger logger $ QueryLog query genSql reqId
-- L.unLogger logger $ QueryLog query genSql reqId
([],) <$> runQueryTx pgExecCtx tx
E.ExOpMutation respHeaders tx -> do
-- log the graphql query
L.unLogger logger $ QueryLog query Nothing reqId
(respHeaders,) <$> runLazyTx pgExecCtx Q.ReadWrite (withUserInfo userInfo tx)
E.ExOpSubs _ ->
throw400 UnexpectedPayload
"subscriptions are not supported over HTTP, use websockets instead"
(respHdrs, resp) <- liftEither respE
let !json = encodeGQResp $ GQSuccess $ encJToLBS resp
telemQueryType = case resolvedOp of E.ExOpMutation{} -> Telem.Mutation ; _ -> Telem.Query
return (telemTimeIO, telemQueryType, respHdrs, json)
where
logQuery' logger = case resolvedOp of
-- log the generated SQL and the graphql query
E.ExOpQuery _ genSql -> logQueryLog logger query genSql reqId
-- log the graphql query
E.ExOpMutation _ _ -> logQueryLog logger query Nothing reqId
E.ExOpSubs _ -> return ()

View File

@ -27,21 +27,20 @@ import qualified Data.Text.Encoding as TE
import qualified Data.Time.Clock as TC
import qualified Database.PG.Query as Q
import qualified Language.GraphQL.Draft.Syntax as G
import qualified ListT
import qualified Network.HTTP.Client as H
import qualified Network.HTTP.Types as H
import qualified Network.WebSockets as WS
import qualified Network.Wai.Extended as Wai
import qualified Network.WebSockets as WS
import qualified StmContainers.Map as STMMap
import Control.Concurrent.Extended (sleep)
import Control.Exception.Lifted
import Data.String
import GHC.AssertNF
import qualified ListT
import Hasura.EncJSON
import Hasura.GraphQL.Logging
import Hasura.GraphQL.Logging (MonadQueryLog (..))
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.WebSocket.Protocol
import Hasura.HTTP
@ -122,8 +121,13 @@ sendMsgWithMetadata wsConn msg (LQ.LiveQueryMetadata execTime) =
liftIO $ WS.sendMsg wsConn $ WS.WSQueueResponse bs wsInfo
where
bs = encodeServerMsg msg
(msgType, operationId) = case msg of
(SMData (DataMsg opId _)) -> (Just SMT_GQL_DATA, Just opId)
_ -> (Nothing, Nothing)
wsInfo = Just $! WS.WSEventInfo
{ WS._wseiQueryExecutionTime = Just $! realToFrac execTime
{ WS._wseiEventType = msgType
, WS._wseiOperationId = operationId
, WS._wseiQueryExecutionTime = Just $! realToFrac execTime
, WS._wseiResponseSize = Just $! BL.length bs
}
@ -297,7 +301,7 @@ onConn (L.Logger logger) corsPolicy wsId requestHead ipAdress = do
<> "HASURA_GRAPHQL_WS_READ_COOKIE to force read cookie when CORS is disabled."
onStart
:: forall m. (HasVersion, MonadIO m, E.MonadGQLExecutionCheck m)
:: forall m. (HasVersion, MonadIO m, E.MonadGQLExecutionCheck m, MonadQueryLog m)
=> WSServerEnv -> WSConn -> StartMsg -> m ()
onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
timerTot <- startTimer
@ -327,8 +331,7 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
planCache userInfo sqlGenCtx sc scVer queryType httpMgr reqHdrs (q, reqParsed)
(telemCacheHit, execPlan) <- either (withComplete . preExecErr requestId) return execPlanE
let execCtx = E.ExecutionCtx logger sqlGenCtx pgExecCtx
planCache sc scVer httpMgr enableAL
let execCtx = E.ExecutionCtx logger sqlGenCtx pgExecCtx planCache sc scVer httpMgr enableAL
case execPlan of
E.GExPHasura resolvedOp ->
@ -349,7 +352,8 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
runLazyTx pgExecCtx Q.ReadWrite $ withUserInfo userInfo opTx
E.ExOpSubs lqOp -> do
-- log the graphql query
L.unLogger logger $ QueryLog query Nothing reqId
-- L.unLogger logger $ QueryLog query Nothing reqId
logQueryLog logger query Nothing reqId
let subscriberMetadata = LQ.mkSubscriberMetadata $ J.object
[ "websocket_id" J..= WS.getWSId wsConn
, "operation_id" J..= opId
@ -370,8 +374,8 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
execQueryOrMut telemQueryType genSql action = do
logOpEv ODStarted (Just reqId)
-- log the generated SQL and the graphql query
L.unLogger logger $ QueryLog query genSql reqId
(withElapsedTime $ liftIO $ runExceptT action) >>= \case
logQueryLog logger query genSql reqId
(withElapsedTime $ runExceptT action) >>= \case
(_, Left err) -> postExecErr reqId err
(telemTimeIO_DT, Right encJson) -> do
-- Telemetry. NOTE: don't time network IO:
@ -459,6 +463,7 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
ERTGraphqlCompliant -> J.object ["errors" J..= [errFn False qErr]]
sendMsg wsConn (SMErr $ ErrorMsg opId err)
-- sendSuccResp :: _
sendSuccResp encJson =
sendMsgWithMetadata wsConn
(SMData $ DataMsg opId $ GRHasura $ GQSuccess $ encJToLBS encJson)
@ -483,7 +488,7 @@ onStart serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
catchAndIgnore m = void $ runExceptT m
onMessage
:: (HasVersion, MonadIO m, UserAuthentication m, E.MonadGQLExecutionCheck m)
:: (HasVersion, MonadIO m, UserAuthentication m, E.MonadGQLExecutionCheck m, MonadQueryLog m)
=> AuthMode
-> WSServerEnv
-> WSConn -> BL.ByteString -> m ()
@ -660,7 +665,9 @@ createWSServerApp
, MC.MonadBaseControl IO m
, LA.Forall (LA.Pure m)
, UserAuthentication m
, WS.MonadWSLog m
, E.MonadGQLExecutionCheck m
, MonadQueryLog m
)
=> AuthMode
-> WSServerEnv

View File

@ -6,6 +6,7 @@ module Hasura.GraphQL.Transport.WebSocket.Protocol
, StopMsg(..)
, ClientMsg(..)
, ServerMsg(..)
, ServerMsgType(..)
, encodeServerMsg
, DataMsg(..)
, ErrorMsg(..)

View File

@ -19,38 +19,43 @@ module Hasura.GraphQL.Transport.WebSocket.Server
, WSServer
, WSEventInfo(..)
, WSQueueResponse(..)
, ServerMsgType(..)
, createWSServer
, closeAll
, createServerApp
, shutdown
, MonadWSLog (..)
, HasuraServerApp
, WSEvent(..)
, WSLog(..)
) where
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Control.Concurrent.STM as STM
import Control.Exception.Lifted
import qualified Control.Monad.Trans.Control as MC
import qualified Data.Aeson as J
import qualified Data.Aeson.Casing as J
import qualified Data.Aeson.TH as J
import qualified Data.ByteString.Lazy as BL
import qualified Control.Monad.Trans.Control as MC
import qualified Data.Aeson as J
import qualified Data.Aeson.Casing as J
import qualified Data.Aeson.TH as J
import qualified Data.ByteString.Lazy as BL
import Data.String
import qualified Data.TByteString as TBS
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import Data.Word (Word16)
import qualified Data.TByteString as TBS
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import Data.Word (Word16)
import GHC.AssertNF
import GHC.Int (Int64)
import GHC.Int (Int64)
import Hasura.Prelude
import Network.Wai.Extended (IpAddress)
import qualified ListT
import qualified Network.WebSockets as WS
import qualified StmContainers.Map as STMMap
import qualified System.IO.Error as E
import Network.Wai.Extended (IpAddress)
import qualified Network.WebSockets as WS
import qualified StmContainers.Map as STMMap
import qualified System.IO.Error as E
import qualified Hasura.Logging as L
import Hasura.GraphQL.Transport.WebSocket.Protocol (OperationId, ServerMsgType (..))
import qualified Hasura.Logging as L
newtype WSId
= WSId { unWSId :: UUID.UUID }
@ -80,7 +85,9 @@ $(J.deriveToJSON
-- extra websocket event info
data WSEventInfo
= WSEventInfo
{ _wseiQueryExecutionTime :: !(Maybe Double)
{ _wseiEventType :: !(Maybe ServerMsgType)
, _wseiOperationId :: !(Maybe OperationId)
, _wseiQueryExecutionTime :: !(Maybe Double)
, _wseiResponseSize :: !(Maybe Int64)
} deriving (Show, Eq)
$(J.deriveToJSON
@ -89,7 +96,6 @@ $(J.deriveToJSON
}
''WSEventInfo)
data WSLog
= WSLog
{ _wslWebsocketId :: !WSId
@ -106,6 +112,17 @@ instance L.ToEngineLog WSLog L.Hasura where
toEngineLog wsLog =
(L.LevelDebug, L.ELTInternal L.ILTWsServer, J.toJSON wsLog)
class Monad m => MonadWSLog m where
-- | Takes WS server log data and logs it
-- logWSServer
logWSLog :: L.Logger L.Hasura -> WSLog -> m ()
instance MonadWSLog m => MonadWSLog (ExceptT e m) where
logWSLog l ws = lift $ logWSLog l ws
instance MonadWSLog m => MonadWSLog (ReaderT r m) where
logWSLog l ws = lift $ logWSLog l ws
data WSQueueResponse
= WSQueueResponse
{ _wsqrMessage :: !BL.ByteString
@ -219,7 +236,7 @@ data WSHandlers m a
type HasuraServerApp m = IpAddress -> WS.PendingConnection -> m ()
createServerApp
:: (MonadIO m, MC.MonadBaseControl IO m, LA.Forall (LA.Pure m))
:: (MonadIO m, MC.MonadBaseControl IO m, LA.Forall (LA.Pure m), MonadWSLog m)
=> WSServer a
-- user provided handlers
-> WSHandlers m a
@ -228,7 +245,7 @@ createServerApp
{-# INLINE createServerApp #-}
createServerApp (WSServer logger@(L.Logger writeLog) serverStatus) wsHandlers !ipAddress !pendingConn = do
wsId <- WSId <$> liftIO UUID.nextRandom
writeLog $ WSLog wsId EConnectionRequest Nothing
logWSLog logger $ WSLog wsId EConnectionRequest Nothing
status <- liftIO $ STM.readTVarIO serverStatus
case status of
AcceptingConns _ -> logUnexpectedExceptions $ do
@ -255,11 +272,11 @@ createServerApp (WSServer logger@(L.Logger writeLog) serverStatus) wsHandlers !i
onReject wsId rejectRequest = do
liftIO $ WS.rejectRequestWith pendingConn rejectRequest
writeLog $ WSLog wsId ERejected Nothing
logWSLog logger $ WSLog wsId ERejected Nothing
onAccept wsId (AcceptWith a acceptWithParams keepAlive onJwtExpiry) = do
conn <- liftIO $ WS.acceptRequestWith pendingConn acceptWithParams
writeLog $ WSLog wsId EAccepted Nothing
logWSLog logger $ WSLog wsId EAccepted Nothing
sendQ <- liftIO STM.newTQueueIO
let !wsConn = WSConn wsId logger conn sendQ a
-- TODO there are many thunks here. Difficult to trace how much is retained, and
@ -299,13 +316,13 @@ createServerApp (WSServer logger@(L.Logger writeLog) serverStatus) wsHandlers !i
-- Regardless this should be safe:
handleJust (guard . E.isResourceVanishedError) (\()-> throw WS.ConnectionClosed) $
WS.receiveData conn
writeLog $ WSLog wsId (EMessageReceived $ TBS.fromLBS msg) Nothing
logWSLog logger $ WSLog wsId (EMessageReceived $ TBS.fromLBS msg) Nothing
_hOnMessage wsHandlers wsConn msg
let send = forever $ do
WSQueueResponse msg wsInfo <- liftIO $ STM.atomically $ STM.readTQueue sendQ
liftIO $ WS.sendTextData conn msg
writeLog $ WSLog wsId (EMessageSent $ TBS.fromLBS msg) wsInfo
logWSLog logger $ WSLog wsId (EMessageSent $ TBS.fromLBS msg) wsInfo
-- withAsync lets us be very sure that if e.g. an async exception is raised while we're
-- forking that the threads we launched will be cleaned up. See also below.
@ -323,17 +340,17 @@ createServerApp (WSServer logger@(L.Logger writeLog) serverStatus) wsHandlers !i
-- exceptions; for now handle all ConnectionException by closing
-- and cleaning up, see: https://github.com/jaspervdj/websockets/issues/48
Left ( _ :: WS.ConnectionException) -> do
writeLog $ WSLog (_wcConnId wsConn) ECloseReceived Nothing
logWSLog logger $ WSLog (_wcConnId wsConn) ECloseReceived Nothing
-- this will happen when jwt is expired
Right _ -> do
writeLog $ WSLog (_wcConnId wsConn) EJwtExpired Nothing
logWSLog logger $ WSLog (_wcConnId wsConn) EJwtExpired Nothing
onConnClose wsConn = \case
ShuttingDown -> pure ()
AcceptingConns connMap -> do
liftIO $ STM.atomically $ STMMap.delete (_wcConnId wsConn) connMap
_hOnClose wsHandlers wsConn
writeLog $ WSLog (_wcConnId wsConn) EClosed Nothing
logWSLog logger $ WSLog (_wcConnId wsConn) EClosed Nothing
shutdown :: WSServer a -> IO ()

View File

@ -1,5 +1,9 @@
-- | API related to server configuration
module Hasura.Server.API.Config (runGetConfig) where
module Hasura.Server.API.Config
-- required by pro
( ServerConfig(..)
, runGetConfig
) where
import Data.Aeson.Casing
import Data.Aeson.TH

View File

@ -4,66 +4,66 @@
module Hasura.Server.App where
import Control.Concurrent.MVar.Lifted
import Control.Exception (IOException, try)
import Control.Lens (view, _2)
import Control.Exception (IOException, try)
import Control.Monad.Stateless
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson hiding (json)
import Data.Int (Int64)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson hiding (json)
import Data.Int (Int64)
import Data.IORef
import Data.Time.Clock (UTCTime, getCurrentTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Network.Mime (defaultMimeLookup)
import System.Exit (exitFailure)
import System.FilePath (joinPath, takeFileName)
import Web.Spock.Core ((<//>))
import Data.Time.Clock (UTCTime)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Network.Mime (defaultMimeLookup)
import System.Exit (exitFailure)
import System.FilePath (joinPath, takeFileName)
import Web.Spock.Core ((<//>))
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Data.ByteString.Lazy as BL
import qualified Data.CaseInsensitive as CI
import qualified Data.HashMap.Strict as M
import qualified Data.HashSet as S
import qualified Data.Text as T
import qualified Database.PG.Query as Q
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTP
import qualified Network.Wai.Extended as Wai
import qualified Network.WebSockets as WS
import qualified System.Metrics as EKG
import qualified System.Metrics.Json as EKG
import qualified Text.Mustache as M
import qualified Web.Spock.Core as Spock
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Data.ByteString.Lazy as BL
import qualified Data.CaseInsensitive as CI
import qualified Data.HashMap.Strict as M
import qualified Data.HashSet as S
import qualified Data.Text as T
import qualified Database.PG.Query as Q
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTP
import qualified Network.Wai.Extended as Wai
import qualified Network.WebSockets as WS
import qualified System.Metrics as EKG
import qualified System.Metrics.Json as EKG
import qualified Text.Mustache as M
import qualified Web.Spock.Core as Spock
import Hasura.Db
import Hasura.EncJSON
import Hasura.GraphQL.Logging (MonadQueryLog (..))
import Hasura.GraphQL.Resolve.Action
import Hasura.HTTP
import Hasura.Prelude hiding (get, put)
import Hasura.Prelude hiding (get, put)
import Hasura.RQL.DDL.Schema
import Hasura.RQL.Types
import Hasura.RQL.Types.Run
import Hasura.Server.API.Config (runGetConfig)
import Hasura.Server.API.Config (runGetConfig)
import Hasura.Server.API.Query
import Hasura.Server.Auth (AuthMode (..), UserAuthentication (..))
import Hasura.Server.Auth (AuthMode (..), UserAuthentication (..))
import Hasura.Server.Compression
import Hasura.Server.Cors
import Hasura.Server.Init
import Hasura.Server.Logging
import Hasura.Server.Middleware (corsMiddleware)
import Hasura.Server.Migrate (migrateCatalog)
import Hasura.Server.Middleware (corsMiddleware)
import Hasura.Server.Utils
import Hasura.Server.Version
import Hasura.Session
import Hasura.SQL.Types
import qualified Hasura.GraphQL.Execute as E
import qualified Hasura.GraphQL.Execute.LiveQuery as EL
import qualified Hasura.GraphQL.Explain as GE
import qualified Hasura.GraphQL.Transport.HTTP as GH
import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH
import qualified Hasura.GraphQL.Transport.WebSocket as WS
import qualified Hasura.Logging as L
import qualified Hasura.Server.API.PGDump as PGD
import qualified Hasura.GraphQL.Execute as E
import qualified Hasura.GraphQL.Execute.LiveQuery as EL
import qualified Hasura.GraphQL.Explain as GE
import qualified Hasura.GraphQL.Transport.HTTP as GH
import qualified Hasura.GraphQL.Transport.HTTP.Protocol as GH
import qualified Hasura.GraphQL.Transport.WebSocket as WS
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
import qualified Hasura.Logging as L
import qualified Hasura.Server.API.PGDump as PGD
import qualified Network.Wai.Handler.WebSockets.Custom as WSC
@ -329,7 +329,7 @@ v1QueryHandler query = do
runQuery pgExecCtx instanceId userInfo schemaCache httpMgr sqlGenCtx (SystemDefined False) query
v1Alpha1GQHandler
:: (HasVersion, MonadIO m, E.MonadGQLExecutionCheck m)
:: (HasVersion, MonadIO m, E.MonadGQLExecutionCheck m, MonadQueryLog m)
=> E.GraphQLQueryType -> GH.GQLBatchedReqs GH.GQLQueryText -> Handler m (HttpResponse EncJSON)
v1Alpha1GQHandler queryType query = do
userInfo <- asks hcUser
@ -351,13 +351,13 @@ v1Alpha1GQHandler queryType query = do
GH.runGQBatched requestId responseErrorsConfig userInfo ipAddress reqHeaders queryType query
v1GQHandler
:: (HasVersion, MonadIO m, E.MonadGQLExecutionCheck m)
:: (HasVersion, MonadIO m, E.MonadGQLExecutionCheck m, MonadQueryLog m)
=> GH.GQLBatchedReqs GH.GQLQueryText
-> Handler m (HttpResponse EncJSON)
v1GQHandler = v1Alpha1GQHandler E.QueryHasura
v1GQRelayHandler
:: (HasVersion, MonadIO m, E.MonadGQLExecutionCheck m)
:: (HasVersion, MonadIO m, E.MonadGQLExecutionCheck m, MonadQueryLog m)
=> GH.GQLBatchedReqs GH.GQLQueryText -> Handler m (HttpResponse EncJSON)
v1GQRelayHandler = v1Alpha1GQHandler E.QueryRelay
@ -485,10 +485,12 @@ mkWaiApp
, LA.Forall (LA.Pure m)
, ConsoleRenderer m
, HttpLog m
, MonadQueryLog m
, UserAuthentication m
, MetadataApiAuthorization m
, E.MonadGQLExecutionCheck m
, MonadConfigApiHandler m
, WS.MonadWSLog m
)
=> Q.TxIsolation
-> L.Logger L.Hasura
@ -508,11 +510,12 @@ mkWaiApp
-> EL.LiveQueriesOptions
-> E.PlanCacheOptions
-> ResponseInternalErrorsConfig
-> (RebuildableSchemaCache Run, Maybe UTCTime)
-> m HasuraApp
mkWaiApp isoLevel logger sqlGenCtx enableAL pool pgExecCtxCustom ci httpManager mode corsCfg enableConsole consoleAssetsDir
enableTelemetry instanceId apis lqOpts planCacheOptions responseErrorsConfig = do
enableTelemetry instanceId apis lqOpts planCacheOptions responseErrorsConfig (schemaCache, cacheBuiltTime) = do
(planCache, schemaCacheRef, cacheBuiltTime) <- migrateAndInitialiseSchemaCache
(planCache, schemaCacheRef) <- initialiseCache
let getSchemaCache = first lastBuiltSchemaCache <$> readIORef (_scrCache schemaCacheRef)
let corsPolicy = mkDefaultCorsPolicy corsCfg
@ -560,30 +563,13 @@ mkWaiApp isoLevel logger sqlGenCtx enableAL pool pgExecCtxCustom ci httpManager
getTimeMs :: IO Int64
getTimeMs = (round . (* 1000)) `fmap` getPOSIXTime
migrateAndInitialiseSchemaCache :: m (E.PlanCache, SchemaCacheRef, Maybe UTCTime)
migrateAndInitialiseSchemaCache = do
let pgExecCtx = mkPGExecCtx Q.Serializable pool
adminRunCtx = RunCtx adminUserInfo httpManager sqlGenCtx
currentTime <- liftIO getCurrentTime
initialiseResult <- runExceptT $ peelRun adminRunCtx pgExecCtx Q.ReadWrite do
(,) <$> migrateCatalog currentTime <*> liftTx fetchLastUpdate
((migrationResult, schemaCache), lastUpdateEvent) <-
initialiseResult `onLeft` \err -> do
L.unLogger logger StartupLog
{ slLogLevel = L.LevelError
, slKind = "db_migrate"
, slInfo = toJSON err
}
liftIO exitFailure
L.unLogger logger migrationResult
initialiseCache :: m (E.PlanCache, SchemaCacheRef)
initialiseCache = do
cacheLock <- liftIO $ newMVar ()
cacheCell <- liftIO $ newIORef (schemaCache, initSchemaCacheVer)
planCache <- liftIO $ E.initPlanCache planCacheOptions
let cacheRef = SchemaCacheRef cacheLock cacheCell (E.clearPlanCache planCache)
pure (planCache, cacheRef, view _2 <$> lastUpdateEvent)
pure (planCache, cacheRef)
httpApp
:: ( HasVersion
@ -595,6 +581,7 @@ httpApp
, MetadataApiAuthorization m
, E.MonadGQLExecutionCheck m
, MonadConfigApiHandler m
, MonadQueryLog m
)
=> CorsConfig
-> ServerCtx

View File

@ -1,40 +1,8 @@
-- | Types and functions related to the server initialisation
{-# LANGUAGE CPP #-}
{-# LANGUAGE CPP #-}
{-# OPTIONS_GHC -O0 #-}
module Hasura.Server.Init
( DbUid(..)
, getDbId
, PGVersion(..)
, getPgVersion
, InstanceId(..)
, generateInstanceId
, StartupTimeInfo(..)
-- * Server command related
, parseRawConnInfo
, mkRawConnInfo
, mkHGEOptions
, mkConnInfo
, serveOptionsParser
-- * Downgrade command related
, downgradeShortcuts
, downgradeOptionsParser
-- * Help footers
, mainCmdFooter
, serveCmdFooter
-- * Startup logs
, mkGenericStrLog
, mkGenericLog
, inconsistentMetadataLog
, serveOptsToLog
, connInfoToLog
( module Hasura.Server.Init
, module Hasura.Server.Init.Config
) where

View File

@ -111,7 +111,6 @@ instance ToJSON WebHookLog where
, "message" .= whlMessage whl
]
class (Monad m) => HttpLog m where
logHttpError
:: Logger Hasura

View File

@ -37,15 +37,14 @@ import qualified Language.Haskell.TH.Syntax as TH
import Control.Lens (view, _2)
import Control.Monad.Unique
import Data.Time.Clock (UTCTime)
import Hasura.Logging (Hasura, LogLevel (..), ToEngineLog (..))
import Hasura.RQL.DDL.Relationship
import Hasura.RQL.DDL.Schema
import Hasura.Server.Init (DowngradeOptions (..))
import Hasura.RQL.Types
import Hasura.Server.Logging (StartupLog (..))
import Hasura.Server.Version
import Hasura.Server.Migrate.Version (latestCatalogVersion, latestCatalogVersionString)
import Hasura.Server.Version (HasVersion)
import Hasura.SQL.Types
import System.Directory (doesFileExist)
@ -160,13 +159,13 @@ migrateCatalog migrationTime = do
where
neededMigrations = dropWhile ((/= previousVersion) . fst) (migrations False)
updateCatalogVersion = setCatalogVersion latestCatalogVersionString migrationTime
buildCacheAndRecreateSystemMetadata :: m (RebuildableSchemaCache m)
buildCacheAndRecreateSystemMetadata = do
schemaCache <- buildRebuildableSchemaCache
view _2 <$> runCacheRWT schemaCache recreateSystemMetadata
updateCatalogVersion = setCatalogVersion latestCatalogVersionString migrationTime
doesSchemaExist schemaName =
liftTx $ (runIdentity . Q.getRow) <$> Q.withQE defaultTxErrorHandler [Q.sql|
SELECT EXISTS

View File

@ -5,6 +5,7 @@ module Hasura.Session
, isAdmin
, roleNameToTxt
, SessionVariable
, SessionVariableValue
, mkSessionVariable
, SessionVariables
, sessionVariableToText

View File

@ -1,26 +1,26 @@
module Network.Wai.Extended
( module Wai
, getSourceFromFallback
, IpAddress
, IpAddress (..)
, showIPAddress
) where
import qualified Data.ByteString.Char8 as BS
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Encoding.Error as TE
import qualified Data.ByteString.Char8 as BS
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Encoding.Error as TE
import Data.List (find)
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import Prelude
import Data.Maybe (fromMaybe)
import Data.List (find)
import Data.Text (Text)
import Data.Bits (shift, (.&.))
import Data.ByteString.Char8 (ByteString)
import Data.Word (Word32)
import Network.Socket (SockAddr (..))
import Network.Wai as Wai
import System.ByteOrder (ByteOrder (..), byteOrder)
import Text.Printf (printf)
import Data.Bits (shift, (.&.))
import Data.ByteString.Char8 (ByteString)
import Data.Word (Word32)
import Network.Socket (SockAddr (..))
import Network.Wai as Wai
import System.ByteOrder (ByteOrder (..), byteOrder)
import Text.Printf (printf)
-- | IP Address related code