graphql-engine/server/src-lib/Hasura/Server/SchemaUpdate.hs
Vamshi Surabhi ce243f5899
multiplexed subscriptions (#1934)
* add types to represent unparsed http gql requests

This will help when we add caching of frequently used ASTs

* query plan caching

* move livequery to execute

* add multiplexed module

* session variable can be customised depending on the context

Previously the value was always "current_setting('hasura.user')"

* get rid of typemap requirement in reusable plan

* subscriptions are multiplexed when possible

* use lazytx for introspection to avoid acquiring a pg connection

* refactor to make execute a completely decoupled module

* don't issue a transaction for a query

* don't use current setting for explained sql

* move postgres related types to a different module

* validate variableValues on postgres before multiplexing subs

* don't user current_setting for queries over ws

* plan_cache is only visible when developer flag is enabled

* introduce 'batch size' when multiplexing subscriptions

* bump stackage to 13.16

* fix schema_stitching test case error code

* store hashes instead of actual responses for subscriptions

* internal api to dump subscriptions state

* remove PlanCache from SchemaCacheRef

* allow live query options to be configured on server startup

* capture metrics for multiplexed subscriptions

* more metrics captured for multiplexed subs

* switch to tvar based hashmap for faster snapshotting

* livequery modules do not expose internal details

* fix typo in live query env vars

* switch to hasura's pg-client-hs
2019-04-17 15:18:41 +05:30

211 lines
6.4 KiB
Haskell

module Hasura.Server.SchemaUpdate
(startSchemaSync)
where
import Hasura.Prelude
import Hasura.Logging
import Hasura.RQL.DDL.Schema.Table (buildSchemaCache)
import Hasura.RQL.Types
import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate)
import Hasura.Server.Init (InstanceId (..))
import Hasura.Server.Logging
import Hasura.Server.Query
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
import qualified Control.Concurrent as C
import qualified Control.Concurrent.STM as STM
import qualified Data.Text as T
import qualified Data.Time as UTC
import qualified Database.PG.Query as PG
import qualified Database.PostgreSQL.LibPQ as PQ
import qualified Network.HTTP.Client as HTTP
pgChannel :: PG.PGChannel
pgChannel = "hasura_schema_update"
data ThreadType
= TTListener
| TTProcessor
deriving (Eq)
instance Show ThreadType where
show TTListener = "listener"
show TTProcessor = "processor"
data SchemaSyncThreadLog
= SchemaSyncThreadLog
{ suelLogLevel :: !LogLevel
, suelThreadType :: !ThreadType
, suelInfo :: !Value
} deriving (Show, Eq)
instance ToJSON SchemaSyncThreadLog where
toJSON (SchemaSyncThreadLog _ t info) =
object [ "thread_type" .= show t
, "info" .= info
]
instance ToEngineLog SchemaSyncThreadLog where
toEngineLog threadLog =
(suelLogLevel threadLog, "schema_sync_thread", toJSON threadLog)
data EventPayload
= EventPayload
{ _epInstanceId :: !InstanceId
, _epOccurredAt :: !UTC.UTCTime
} deriving (Show, Eq)
$(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload)
data ThreadError
= TEJsonParse !T.Text
| TEQueryError !QErr
$(deriveToJSON
defaultOptions { constructorTagModifier = snakeCase . drop 2
, sumEncoding = TaggedObject "type" "info"
}
''ThreadError)
-- | An IO action that enables metadata syncing
startSchemaSync
:: Bool
-> PG.PGPool
-> Logger
-> HTTP.Manager
-> SchemaCacheRef
-> InstanceId
-> Maybe UTC.UTCTime -> IO ()
startSchemaSync strfyNum pool logger httpMgr cacheRef instanceId cacheInitTime = do
-- Init events queue
eventsQueue <- STM.newTQueueIO
-- Start listener thread
lTId <- C.forkIO $ listener strfyNum pool
logger httpMgr eventsQueue cacheRef instanceId cacheInitTime
logThreadStarted TTListener lTId
-- Start processor thread
pTId <- C.forkIO $ processor strfyNum pool
logger httpMgr eventsQueue cacheRef instanceId
logThreadStarted TTProcessor pTId
where
logThreadStarted threadType threadId =
let msg = T.pack (show threadType) <> " thread started"
in unLogger logger $
StartupLog LevelInfo "threads" $
object [ "instance_id" .= getInstanceId instanceId
, "thread_id" .= show threadId
, "message" .= msg
]
-- | An IO action that listens to postgres for events and pushes them to a Queue
listener
:: Bool
-> PG.PGPool
-> Logger
-> HTTP.Manager
-> STM.TQueue EventPayload
-> SchemaCacheRef
-> InstanceId
-> Maybe UTC.UTCTime -> IO ()
listener strfyNum pool logger httpMgr eventsQueue
cacheRef instanceId cacheInitTime =
-- Never exits
forever $ do
listenResE <-
liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler
either onError return listenResE
logWarn
C.threadDelay $ 1 * 1000 * 1000 -- 1 second
where
threadType = TTListener
shouldRefresh dbInstId accrdAt =
case cacheInitTime of
Nothing -> True
Just time -> (dbInstId /= instanceId) && accrdAt > time
refreshCache Nothing = return ()
refreshCache (Just (dbInstId, accrdAt)) =
when (shouldRefresh dbInstId accrdAt) $
refreshSchemaCache strfyNum pool logger httpMgr cacheRef
threadType "schema cache reloaded after postgres listen init"
notifyHandler = \case
PG.PNEOnStart -> do
eRes <- runExceptT $ PG.runTx pool
(PG.Serializable, Nothing) fetchLastUpdate
case eRes of
Left e -> onError e
Right mLastUpd -> refreshCache mLastUpd
PG.PNEPQNotify notif ->
case eitherDecodeStrict $ PQ.notifyExtra notif of
Left e -> logError logger threadType $ TEJsonParse $ T.pack e
Right payload -> do
logInfo logger threadType $ object ["received_event" .= payload]
-- Push a notify event to Queue
STM.atomically $ STM.writeTQueue eventsQueue payload
onError = logError logger threadType . TEQueryError
logWarn = unLogger logger $
SchemaSyncThreadLog LevelWarn TTListener $ String
"error occurred, retrying postgres listen after 1 second"
-- | An IO action that processes events from Queue
processor
:: Bool
-> PG.PGPool
-> Logger
-> HTTP.Manager
-> STM.TQueue EventPayload
-> SchemaCacheRef
-> InstanceId -> IO ()
processor strfyNum pool logger httpMgr eventsQueue
cacheRef instanceId =
-- Never exits
forever $ do
event <- STM.atomically $ STM.readTQueue eventsQueue
logInfo logger threadType $ object ["processed_event" .= event]
when (shouldReload event) $
refreshSchemaCache strfyNum pool logger httpMgr cacheRef
threadType "schema cache reloaded"
where
threadType = TTProcessor
-- If event is from another server
shouldReload payload = _epInstanceId payload /= instanceId
refreshSchemaCache
:: Bool
-> PG.PGPool
-> Logger
-> HTTP.Manager
-> SchemaCacheRef
-> ThreadType
-> T.Text -> IO ()
refreshSchemaCache strfyNum pool logger httpManager cacheRef threadType msg = do
-- Reload schema cache from catalog
resE <- liftIO $ runExceptT $ withSCUpdate cacheRef $
peelRun emptySchemaCache adminUserInfo
httpManager strfyNum (PGExecCtx pool PG.Serializable) buildSchemaCache
case resE of
Left e -> logError logger threadType $ TEQueryError e
Right _ ->
logInfo logger threadType $ object ["message" .= msg]
logInfo :: Logger -> ThreadType -> Value -> IO ()
logInfo logger threadType val = unLogger logger $
SchemaSyncThreadLog LevelInfo threadType val
logError :: ToJSON a => Logger -> ThreadType -> a -> IO ()
logError logger threadType err =
unLogger logger $ SchemaSyncThreadLog LevelError threadType $
object ["error" .= toJSON err]