sync metadata cache across multiple instances connected to same db (closes #1182) (#1574)

1. Haskel library `pg-client-hs` has been updated to expose a function that helps listen to `postgres` notifications over a `channel` in this [PR](https://github.com/hasura/pg-client-hs/pull/5)
2. The server records an event in a table `hdb_catalog.hdb_cache_update_event` whenever any `/v1/query` (that changes metadata) is requested. A trigger notifies a `cache update` event via `hasura_cache_update` channel
3. The server runs two concurrent threads namely `listener` and `processor`. The `listener` thread listens to events on `hasura_cache_update` channel and pushed into a `Queue`. The `processor` thread fetches events from that `Queue` and processes it. Thus server rebuilds schema cache from database and updates.
This commit is contained in:
Rakesh Emmadi 2019-03-12 11:16:27 +05:30 committed by Shahidh K Muhammed
parent a5b4c5ffd6
commit e32f5a1fb1
20 changed files with 730 additions and 93 deletions

View File

@ -101,14 +101,17 @@ refs:
keys:
- server-deps-cache-{{ checksum "server/graphql-engine.cabal" }}-{{ checksum "server/stack.yaml" }}
- *wait_for_postgres
- run:
name: Install deps
command: |
apt-get update
apt install --yes pgbouncer jq curl postgresql-client
- run:
name: Run Python tests
environment:
HASURA_GRAPHQL_DATABASE_URL: 'postgres://gql_test:@localhost:5432/gql_test'
GRAPHQL_ENGINE: '/build/_server_output/graphql-engine'
command: |
apt-get update
apt install --yes jq curl
OUTPUT_FOLDER=/build/_server_test_output/$PG_VERSION .circleci/test-server.sh
- run:
name: Generate coverage report

View File

@ -0,0 +1,11 @@
[databases]
hs_hge_test = host=localhost port=5432 dbname=hs_hge_test user=gql_test
[pgbouncer]
listen_port = 6543
listen_addr = 127.0.0.1
logfile = pgbouncer/pgbouncer.log
pidfile = pgbouncer/pgbouncer.pid
auth_type = md5
auth_file = pgbouncer/users.txt
admin_users = postgres

View File

@ -0,0 +1 @@
"postgres" "postgres"

View File

@ -128,6 +128,8 @@ export HASURA_GRAPHQL_STRINGIFY_NUMERIC_TYPES=true
PID=""
WH_PID=""
HS_PID=""
trap stop_services ERR
trap stop_services INT
@ -356,4 +358,66 @@ if [ "$RUN_WEBHOOK_TESTS" == "true" ] ; then
fi
# horizontal scale test
unset HASURA_GRAPHQL_AUTH_HOOK
unset HASURA_GRAPHQL_AUTH_HOOK_MODE
unset HASURA_GRAPHQL_ADMIN_SECRET
echo -e "\n<########## TEST GRAPHQL-ENGINE WITH HORIZONTAL SCALING ########>\n"
HASURA_HS_TEST_DB='postgres://postgres:postgres@localhost:6543/hs_hge_test'
psql "$HASURA_GRAPHQL_DATABASE_URL" -c "create database hs_hge_test;"
# create pgbouncer user
useradd pgbouncer
cd $CIRCLECI_FOLDER
chown -R pgbouncer:pgbouncer pgbouncer
# start pgbouncer
pgbouncer -u pgbouncer -d pgbouncer/pgbouncer.ini
cd $PYTEST_ROOT
# start 1st server
"$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve >> "$OUTPUT_FOLDER/graphql-engine.log" 2>&1 & PID=$!
wait_for_port 8080
# start 2nd server
"$GRAPHQL_ENGINE" --database-url "$HASURA_HS_TEST_DB" serve \
--server-port 8081 \
>> "$OUTPUT_FOLDER/hs-graphql-engine.log" 2>&1 & HS_PID=$!
wait_for_port 8081
# run test
pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py
# Shutdown pgbouncer
psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" || true
cd $CIRCLECI_FOLDER
# start pgbouncer again
pgbouncer -u pgbouncer -d pgbouncer/pgbouncer.ini
cd $PYTEST_ROOT
# sleep for 30 seconds
sleep 30
# run test
pytest -vv --hge-url="$HGE_URL" --pg-url="$HASURA_GRAPHQL_DATABASE_URL" --test-hge-scale-url="http://localhost:8081" test_horizontal_scale.py
# Shutdown pgbouncer
psql "postgres://postgres:postgres@localhost:6543/pgbouncer" -c "SHUTDOWN;" || true
kill $PID
kill $HS_PID
psql "$HASURA_GRAPHQL_DATABASE_URL" -c "drop database hs_hge_test;"
sleep 4
combine_hpc_reports
unset HASURA_HS_TEST_DB
# end horizontal scale test
mv graphql-engine-combined.tix "$OUTPUT_FOLDER/graphql-engine.tix" || true

View File

@ -150,6 +150,7 @@ library
, Hasura.Server.Version
, Hasura.Server.CheckUpdates
, Hasura.Server.Telemetry
, Hasura.Server.SchemaUpdate
, Hasura.RQL.Types
, Hasura.RQL.Instances
, Hasura.RQL.Types.SchemaCache
@ -322,6 +323,7 @@ executable graphql-engine
, wreq
, connection
, string-conversions
, uuid
other-modules: Ops
, Migrate

View File

@ -9,6 +9,7 @@ import Options.Applicative
import System.Environment (getEnvironment, lookupEnv)
import System.Exit (exitFailure)
import qualified Control.Concurrent as C
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BC
@ -21,17 +22,17 @@ import qualified Network.HTTP.Client.TLS as HTTP
import qualified Network.Wai.Handler.Warp as Warp
import Hasura.Events.Lib
import Hasura.Logging (Logger (..), defaultLoggerSettings,
mkLogger, mkLoggerCtx)
import Hasura.Logging
import Hasura.Prelude
import Hasura.RQL.DDL.Metadata (fetchMetadata)
import Hasura.RQL.Types (QErr, adminUserInfo,
emptySchemaCache)
import Hasura.Server.App (mkWaiApp)
import Hasura.RQL.Types (adminUserInfo, emptySchemaCache)
import Hasura.Server.App (SchemaCacheRef (..), mkWaiApp)
import Hasura.Server.Auth
import Hasura.Server.CheckUpdates (checkForUpdates)
import Hasura.Server.Init
import Hasura.Server.Logging
import Hasura.Server.Query (peelRun)
import Hasura.Server.SchemaUpdate
import Hasura.Server.Telemetry
import Hasura.Server.Version (currentVersion)
@ -97,13 +98,19 @@ printJSON = BLC.putStrLn . A.encode
printYaml :: (A.ToJSON a) => a -> IO ()
printYaml = BC.putStrLn . Y.encode
mkPGLogger :: Logger -> Q.PGLogger
mkPGLogger (Logger logger) (Q.PLERetryMsg msg) =
logger $ PGLog LevelWarn msg
main :: IO ()
main = do
(HGEOptionsG rci hgeCmd) <- parseArgs
-- global http manager
httpManager <- HTTP.newManager HTTP.tlsManagerSettings
loggerCtx <- mkLoggerCtx $ defaultLoggerSettings True
instanceId <- mkInstanceId
let logger = mkLogger loggerCtx
pgLogger = mkPGLogger logger
case hgeCmd of
HCServe so@(ServeOptions port host cp isoL mAdminSecret mAuthHook mJwtSecret
mUnAuthRole corsCfg enableConsole enableTelemetry strfyNum enabledAPIs) -> do
@ -120,15 +127,21 @@ main = do
-- log postgres connection info
unLogger logger $ connInfoToLog ci
pool <- Q.initPGPool ci cp pgLogger
-- safe init catalog
initRes <- initialise logger ci httpManager
-- prepare event triggers data
prepareEvents logger ci
pool <- Q.initPGPool ci cp
(app, cacheRef) <- mkWaiApp isoL loggerCtx pool httpManager
strfyNum am corsCfg enableConsole enableTelemetry enabledAPIs
(app, cacheRef, cacheInitTime) <-
mkWaiApp isoL loggerCtx strfyNum pool httpManager am
corsCfg enableConsole enableTelemetry instanceId enabledAPIs
-- start a background thread for schema sync
startSchemaSync strfyNum pool logger httpManager
cacheRef instanceId cacheInitTime
let warpSettings = Warp.setPort port $ Warp.setHost host Warp.defaultSettings
@ -138,9 +151,10 @@ main = do
eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evFetchMilliSec
let scRef = _scrCache cacheRef
unLogger logger $
mkGenericStrLog "event_triggers" "starting workers"
void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpManager pool cacheRef eventEngineCtx
void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpManager pool scRef eventEngineCtx
-- start a background thread to check for updates
void $ C.forkIO $ checkForUpdates loggerCtx httpManager
@ -148,7 +162,7 @@ main = do
-- start a background thread for telemetry
when enableTelemetry $ do
unLogger logger $ mkGenericStrLog "telemetry" telemetryNotice
void $ C.forkIO $ runTelemetry logger httpManager cacheRef initRes
void $ C.forkIO $ runTelemetry logger httpManager scRef initRes
unLogger logger $
mkGenericStrLog "server" "starting API server"
@ -156,30 +170,29 @@ main = do
HCExport -> do
ci <- procConnInfo rci
res <- runTx ci fetchMetadata
res <- runTx pgLogger ci fetchMetadata
either printErrJExit printJSON res
HCClean -> do
ci <- procConnInfo rci
res <- runTx ci cleanCatalog
res <- runTx pgLogger ci cleanCatalog
either printErrJExit (const cleanSuccess) res
HCExecute -> do
queryBs <- BL.getContents
ci <- procConnInfo rci
res <- runAsAdmin ci httpManager $ execQuery queryBs
res <- runAsAdmin pgLogger ci httpManager $ execQuery queryBs
either printErrJExit BLC.putStrLn res
HCVersion -> putStrLn $ "Hasura GraphQL Engine: " ++ T.unpack currentVersion
where
runTx :: Q.ConnInfo -> Q.TxE QErr a -> IO (Either QErr a)
runTx ci tx = do
pool <- getMinimalPool ci
runTx pgLogger ci tx = do
pool <- getMinimalPool pgLogger ci
runExceptT $ Q.runTx pool (Q.Serializable, Nothing) tx
runAsAdmin ci httpManager m = do
pool <- getMinimalPool ci
runAsAdmin pgLogger ci httpManager m = do
pool <- getMinimalPool pgLogger ci
res <- runExceptT $ peelRun emptySchemaCache adminUserInfo
httpManager False pool Q.Serializable m
return $ fmap fst res
@ -188,31 +201,32 @@ main = do
either (printErrExit . connInfoErrModifier) return $
mkConnInfo rci
getMinimalPool ci = do
getMinimalPool pgLogger ci = do
let connParams = Q.defaultConnParams { Q.cpConns = 1 }
Q.initPGPool ci connParams
Q.initPGPool ci connParams pgLogger
initialise (Logger logger) ci httpMgr = do
currentTime <- getCurrentTime
let pgLogger = mkPGLogger $ Logger logger
-- initialise the catalog
initRes <- runAsAdmin ci httpMgr $ initCatalogSafe currentTime
initRes <- runAsAdmin pgLogger ci httpMgr $ initCatalogSafe currentTime
either printErrJExit (logger . mkGenericStrLog "db_init") initRes
-- migrate catalog if necessary
migRes <- runAsAdmin ci httpMgr $ migrateCatalog currentTime
migRes <- runAsAdmin pgLogger ci httpMgr $ migrateCatalog currentTime
either printErrJExit (logger . mkGenericStrLog "db_migrate") migRes
-- generate and retrieve uuids
getUniqIds ci
getUniqIds pgLogger ci
prepareEvents (Logger logger) ci = do
let pgLogger = mkPGLogger $ Logger logger
logger $ mkGenericStrLog "event_triggers" "preparing data"
res <- runTx ci unlockAllEvents
res <- runTx pgLogger ci unlockAllEvents
either printErrJExit return res
getUniqIds ci = do
eDbId <- runTx ci getDbId
getUniqIds pgLogger ci = do
eDbId <- runTx pgLogger ci getDbId
dbId <- either printErrJExit return eDbId
fp <- liftIO generateFingerprint
return (dbId, fp)

View File

@ -19,7 +19,7 @@ import qualified Data.Yaml.TH as Y
import qualified Database.PG.Query as Q
curCatalogVer :: T.Text
curCatalogVer = "10"
curCatalogVer = "11"
migrateMetadata
:: ( MonadTx m
@ -251,6 +251,13 @@ from9To10 = liftTx $ do
$(Q.sqlFromFile "src-rsr/migrate_from_9_to_10.sql")
return ()
from10To11 :: (MonadTx m) => m ()
from10To11 = liftTx $ do
-- Migrate database
Q.Discard () <- Q.multiQE defaultTxErrorHandler
$(Q.sqlFromFile "src-rsr/migrate_from_10_to_11.sql")
return ()
migrateCatalog
:: ( MonadTx m
, CacheRWM m
@ -274,10 +281,13 @@ migrateCatalog migrationTime = do
| preVer == "7" -> from7ToCurrent
| preVer == "8" -> from8ToCurrent
| preVer == "9" -> from9ToCurrent
| preVer == "10" -> from10ToCurrent
| otherwise -> throw400 NotSupported $
"unsupported version : " <> preVer
where
from9ToCurrent = from9To10 >> postMigrate
from10ToCurrent = from10To11 >> postMigrate
from9ToCurrent = from9To10 >> from10ToCurrent
from8ToCurrent = from8To9 >> from9ToCurrent

View File

@ -88,17 +88,39 @@ mkConsoleHTML path authMode enableTelemetry =
errMsg = "console template rendering failed: " ++ show errs
data SchemaCacheRef
= SchemaCacheRef
{ _scrLock :: MVar ()
, _scrCache :: IORef SchemaCache
}
withSCUpdate
:: (MonadIO m, MonadError e m)
=> SchemaCacheRef -> m (a, SchemaCache) -> m a
withSCUpdate scr action = do
acquireLock
(res, newSC) <- action `catchError` onError
-- update schemacache in IO reference
liftIO $ writeIORef cacheRef newSC
releaseLock
return res
where
SchemaCacheRef lk cacheRef = scr
onError e = releaseLock >> throwError e
acquireLock = liftIO $ takeMVar lk
releaseLock = liftIO $ putMVar lk ()
data ServerCtx
= ServerCtx
{ scIsolation :: Q.TxIsolation
, scPGPool :: Q.PGPool
, scLogger :: L.Logger
, scCacheRef :: IORef SchemaCache
, scCacheLock :: MVar ()
, scCacheRef :: SchemaCacheRef
, scAuthMode :: AuthMode
, scManager :: HTTP.Manager
, scStringifyNum :: Bool
, scEnabledAPIs :: S.HashSet API
, scInstanceId :: InstanceId
}
data HandlerCtx
@ -135,7 +157,7 @@ buildQCtx :: Handler QCtx
buildQCtx = do
scRef <- scCacheRef . hcServerCtx <$> ask
userInfo <- asks hcUser
cache <- liftIO $ readIORef scRef
cache <- liftIO $ readIORef $ _scrCache scRef
strfyNum <- scStringifyNum . hcServerCtx <$> ask
return $ QCtx userInfo cache $ SQLGenCtx strfyNum
@ -198,39 +220,27 @@ mkSpockAction qErrEncoder serverCtx handler = do
uncurry setHeader jsonHeader
lazyBytes resp
withLock :: (MonadIO m, MonadError e m)
=> MVar () -> m a -> m a
withLock lk action = do
acquireLock
res <- action `catchError` onError
releaseLock
return res
where
onError e = releaseLock >> throwError e
acquireLock = liftIO $ takeMVar lk
releaseLock = liftIO $ putMVar lk ()
v1QueryHandler :: RQLQuery -> Handler BL.ByteString
v1QueryHandler query = do
lk <- scCacheLock . hcServerCtx <$> ask
bool (fst <$> dbAction) (withLock lk dbActionReload) $
scRef <- scCacheRef . hcServerCtx <$> ask
bool (fst <$> dbAction) (withSCUpdate scRef dbActionReload) $
queryNeedsReload query
where
-- Hit postgres
dbAction = do
userInfo <- asks hcUser
scRef <- scCacheRef . hcServerCtx <$> ask
schemaCache <- liftIO $ readIORef scRef
schemaCache <- liftIO $ readIORef $ _scrCache scRef
httpMgr <- scManager . hcServerCtx <$> ask
strfyNum <- scStringifyNum . hcServerCtx <$> ask
pool <- scPGPool . hcServerCtx <$> ask
isoL <- scIsolation . hcServerCtx <$> ask
runQuery pool isoL userInfo schemaCache httpMgr strfyNum query
instanceId <- scInstanceId . hcServerCtx <$> ask
runQuery pool isoL instanceId userInfo schemaCache httpMgr strfyNum query
-- Also update the schema cache
dbActionReload = do
(resp, newSc) <- dbAction
scRef <- scCacheRef . hcServerCtx <$> ask
httpMgr <- scManager . hcServerCtx <$> ask
--FIXME: should we be fetching the remote schema again? if not how do we get the remote schema?
newGCtxMap <- GS.mkGCtxMap (scTables newSc) (scFunctions newSc)
@ -238,8 +248,7 @@ v1QueryHandler query = do
mergeSchemas (scRemoteResolvers newSc) newGCtxMap httpMgr
let newSc' =
newSc { scGCtxMap = mergedGCtxMap, scDefaultRemoteGCtx = defGCtx }
liftIO $ writeIORef scRef newSc'
return resp
return (resp, newSc')
v1Alpha1GQHandler :: GH.GraphQLRequest -> Handler BL.ByteString
v1Alpha1GQHandler query = do
@ -248,7 +257,7 @@ v1Alpha1GQHandler query = do
reqHeaders <- asks hcReqHeaders
manager <- scManager . hcServerCtx <$> ask
scRef <- scCacheRef . hcServerCtx <$> ask
sc <- liftIO $ readIORef scRef
sc <- liftIO $ readIORef $ _scrCache scRef
pool <- scPGPool . hcServerCtx <$> ask
isoL <- scIsolation . hcServerCtx <$> ask
strfyNum <- scStringifyNum . hcServerCtx <$> ask
@ -258,7 +267,7 @@ gqlExplainHandler :: GE.GQLExplain -> Handler BL.ByteString
gqlExplainHandler query = do
onlyAdmin
scRef <- scCacheRef . hcServerCtx <$> ask
sc <- liftIO $ readIORef scRef
sc <- liftIO $ readIORef $ _scrCache scRef
pool <- scPGPool . hcServerCtx <$> ask
isoL <- scIsolation . hcServerCtx <$> ask
strfyNum <- scStringifyNum . hcServerCtx <$> ask
@ -294,28 +303,27 @@ legacyQueryHandler tn queryType =
mkWaiApp
:: Q.TxIsolation
-> L.LoggerCtx
-> Q.PGPool
-> HTTP.Manager
-> Bool
-> AuthMode
-> CorsConfig
-> Bool
-> Bool
-> S.HashSet API
-> IO (Wai.Application, IORef SchemaCache)
mkWaiApp isoLevel loggerCtx pool httpManager strfyNum mode corsCfg enableConsole enableTelemetry apis = do
cacheRef <- do
:: Q.TxIsolation -> L.LoggerCtx -> Bool
-> Q.PGPool -> HTTP.Manager -> AuthMode
-> CorsConfig -> Bool -> Bool
-> InstanceId -> S.HashSet API
-> IO (Wai.Application, SchemaCacheRef, Maybe UTCTime)
mkWaiApp isoLevel loggerCtx strfyNum pool httpManager mode corsCfg
enableConsole enableTelemetry instanceId apis = do
(cacheRef, cacheBuiltTime) <- do
pgResp <- runExceptT $ peelRun emptySchemaCache adminUserInfo
httpManager strfyNum pool Q.Serializable buildSchemaCache
either initErrExit return pgResp >>= newIORef . snd
httpManager strfyNum pool Q.Serializable $ do
buildSchemaCache
liftTx fetchLastUpdate
(time, sc) <- either initErrExit return pgResp
scRef <- newIORef sc
return (scRef, snd <$> time)
cacheLock <- newMVar ()
let serverCtx =
ServerCtx isoLevel pool (L.mkLogger loggerCtx) cacheRef
cacheLock mode httpManager strfyNum apis
let schemaCacheRef = SchemaCacheRef cacheLock cacheRef
serverCtx = ServerCtx isoLevel pool (L.mkLogger loggerCtx)
schemaCacheRef mode httpManager strfyNum apis instanceId
spockApp <- spockAsApp $ spockT id $
httpApp corsCfg serverCtx enableConsole enableTelemetry
@ -328,7 +336,10 @@ mkWaiApp isoLevel loggerCtx pool httpManager strfyNum mode corsCfg enableConsole
cacheRef runTx corsPolicy
let wsServerApp = WS.createWSServerApp mode wsServerEnv
return (WS.websocketsOr WS.defaultConnectionOptions wsServerApp spockApp, cacheRef)
return ( WS.websocketsOr WS.defaultConnectionOptions wsServerApp spockApp
, schemaCacheRef
, cacheBuiltTime
)
httpApp :: CorsConfig -> ServerCtx -> Bool -> Bool -> SpockT IO ()
httpApp corsCfg serverCtx enableConsole enableTelemetry = do

View File

@ -9,6 +9,9 @@ import qualified Data.Aeson as J
import qualified Data.HashSet as Set
import qualified Data.String as DataString
import qualified Data.Text as T
import qualified Data.UUID as UUID
import qualified Data.UUID.V4 as UUID
import qualified Hasura.Logging as L
import qualified Text.PrettyPrint.ANSI.Leijen as PP
import Hasura.Prelude
@ -19,8 +22,12 @@ import Hasura.Server.Logging
import Hasura.Server.Utils
import Network.Wai.Handler.Warp
import qualified Hasura.Logging as L
newtype InstanceId
= InstanceId {getInstanceId :: T.Text}
deriving (Show, Eq, J.ToJSON, J.FromJSON)
mkInstanceId :: IO InstanceId
mkInstanceId = (InstanceId . UUID.toText) <$> UUID.nextRandom
initErrExit :: (Show e) => e -> IO a
initErrExit e = print e >> exitFailure
@ -79,6 +86,7 @@ data RawConnInfo =
, connUrl :: !(Maybe String)
, connDatabase :: !(Maybe String)
, connOptions :: !(Maybe String)
, connRetries :: !(Maybe Int)
} deriving (Eq, Read, Show)
data HGECommandG a
@ -219,9 +227,13 @@ mkHGEOptions (HGEOptionsG rawConnInfo rawCmd) =
mkRawConnInfo :: RawConnInfo -> WithEnv RawConnInfo
mkRawConnInfo rawConnInfo = do
withEnvUrl <- withEnv rawDBUrl $ fst databaseUrlEnv
return $ rawConnInfo {connUrl = withEnvUrl}
withEnvRetries <- withEnv retries $ fst retriesNumEnv
return $ rawConnInfo { connUrl = withEnvUrl
, connRetries = withEnvRetries
}
where
rawDBUrl = connUrl rawConnInfo
retries = connRetries rawConnInfo
mkServeOptions :: RawServeOptions -> WithEnv ServeOptions
mkServeOptions rso = do
@ -309,7 +321,7 @@ mainCmdFooter =
]
]
envVarDoc = mkEnvVarDoc [databaseUrlEnv]
envVarDoc = mkEnvVarDoc [databaseUrlEnv, retriesNumEnv]
databaseUrlEnv :: (String, String)
databaseUrlEnv =
@ -357,7 +369,8 @@ serveCmdFooter =
envVarDoc = mkEnvVarDoc $ envVars <> eventEnvs
envVars =
[ servePortEnv, serveHostEnv, pgStripesEnv, pgConnsEnv, pgTimeoutEnv
[ databaseUrlEnv, retriesNumEnv, servePortEnv, serveHostEnv,
pgStripesEnv, pgConnsEnv, pgTimeoutEnv
, pgUsePrepareEnv, txIsoEnv, adminSecretEnv
, accessKeyEnv, authHookEnv, authHookModeEnv
, jwtSecretEnv, unAuthRoleEnv, corsDomainEnv, enableConsoleEnv
@ -373,6 +386,12 @@ serveCmdFooter =
)
]
retriesNumEnv :: (String, String)
retriesNumEnv =
( "HASURA_GRAPHQL_NO_OF_RETRIES"
, "No.of retries if Postgres connection error occurs (default: 1)"
)
servePortEnv :: (String, String)
servePortEnv =
( "HASURA_GRAPHQL_SERVER_PORT"
@ -496,6 +515,7 @@ parseRawConnInfo :: Parser RawConnInfo
parseRawConnInfo =
RawConnInfo <$> host <*> port <*> user <*> password
<*> dbUrl <*> dbName <*> pure Nothing
<*> retries
where
host = optional $
strOption ( long "host" <>
@ -534,24 +554,31 @@ parseRawConnInfo =
metavar "<DBNAME>" <>
help "Database name to connect to"
)
retries = optional $
option auto ( long "retries" <>
metavar "NO OF RETRIES" <>
help (snd retriesNumEnv)
)
connInfoErrModifier :: String -> String
connInfoErrModifier s = "Fatal Error : " ++ s
mkConnInfo ::RawConnInfo -> Either String Q.ConnInfo
mkConnInfo (RawConnInfo mHost mPort mUser pass mURL mDB opts) =
mkConnInfo (RawConnInfo mHost mPort mUser pass mURL mDB opts mRetries) =
case (mHost, mPort, mUser, mDB, mURL) of
(Just host, Just port, Just user, Just db, Nothing) ->
return $ Q.ConnInfo host port user pass db opts
return $ Q.ConnInfo host port user pass db opts retries
(_, _, _, _, Just dbURL) -> maybe (throwError invalidUrlMsg)
return $ parseDatabaseUrl dbURL opts
withRetries $ parseDatabaseUrl dbURL opts
_ -> throwError $ "Invalid options. "
++ "Expecting all database connection params "
++ "(host, port, user, dbname, password) or "
++ "database-url (HASURA_GRAPHQL_DATABASE_URL)"
where
retries = fromMaybe 1 mRetries
withRetries ci = return $ ci{Q.connRetries = retries}
invalidUrlMsg = "Invalid database-url (HASURA_GRAPHQL_DATABASE_URL). "
++ "Example postgres://foo:bar@example.com:2345/database"
@ -730,13 +757,14 @@ parseEnabledAPIs = optional $
-- Init logging related
connInfoToLog :: Q.ConnInfo -> StartupLog
connInfoToLog (Q.ConnInfo host port user _ db _) =
connInfoToLog (Q.ConnInfo host port user _ db _ retries) =
StartupLog L.LevelInfo "postgres_connection" infoVal
where
infoVal = J.object [ "host" J..= host
, "port" J..= port
, "user" J..= user
, "database" J..= db
, "retries" J..= retries
]
serveOptsToLog :: ServeOptions -> StartupLog

View File

@ -2,6 +2,7 @@
module Hasura.Server.Logging
( StartupLog(..)
, PGLog(..)
, mkAccessLog
, getRequestHeader
, WebHookLog(..)
@ -54,6 +55,20 @@ instance L.ToEngineLog StartupLog where
toEngineLog startupLog =
(slLogLevel startupLog, "startup", toJSON startupLog)
data PGLog
= PGLog
{ plLogLevel :: !L.LogLevel
, plMessage :: !T.Text
} deriving (Show, Eq)
instance ToJSON PGLog where
toJSON (PGLog _ msg) =
object ["message" .= msg]
instance L.ToEngineLog PGLog where
toEngineLog pgLog =
(plLogLevel pgLog, "pg-client", toJSON pgLog)
data WebHookLog
= WebHookLog
{ whlLogLevel :: !L.LogLevel

View File

@ -3,6 +3,7 @@ module Hasura.Server.Query where
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
import Data.Time (UTCTime)
import Language.Haskell.TH.Syntax (Lift)
import qualified Data.ByteString.Builder as BB
@ -10,6 +11,7 @@ import qualified Data.ByteString.Lazy as BL
import qualified Data.Vector as V
import qualified Network.HTTP.Client as HTTP
import Hasura.Prelude
import Hasura.RQL.DDL.Metadata
import Hasura.RQL.DDL.Permission
@ -28,6 +30,7 @@ import Hasura.RQL.DML.Returning (encodeJSONVector)
import Hasura.RQL.DML.Select
import Hasura.RQL.DML.Update
import Hasura.RQL.Types
import Hasura.Server.Init (InstanceId (..))
import Hasura.Server.Utils
import qualified Database.PG.Query as Q
@ -115,6 +118,30 @@ instance HasHttpManager Run where
instance HasSQLGenCtx Run where
askSQLGenCtx = asks _3
fetchLastUpdate :: Q.TxE QErr (Maybe (InstanceId, UTCTime))
fetchLastUpdate = do
l <- Q.listQE defaultTxErrorHandler
[Q.sql|
SELECT instance_id::text, occurred_at
FROM hdb_catalog.hdb_schema_update_event
ORDER BY occurred_at DESC LIMIT 1
|] () True
case l of
[] -> return Nothing
[(instId, occurredAt)] ->
return $ Just (InstanceId instId, occurredAt)
-- never happens
_ -> throw500 "more than one row returned by query"
recordSchemaUpdate :: InstanceId -> Q.TxE QErr ()
recordSchemaUpdate instanceId =
liftTx $ Q.unitQE defaultTxErrorHandler [Q.sql|
INSERT INTO
hdb_catalog.hdb_schema_update_event
(instance_id, occurred_at)
VALUES ($1::uuid, DEFAULT)
|] (Identity $ getInstanceId instanceId) True
peelRun
:: SchemaCache
-> UserInfo
@ -130,13 +157,20 @@ peelRun sc userInfo httMgr strfyNum pgPool txIso (Run m) =
runQuery
:: (MonadIO m, MonadError QErr m)
=> Q.PGPool -> Q.TxIsolation -> UserInfo
-> SchemaCache -> HTTP.Manager -> Bool
-> RQLQuery -> m (BL.ByteString, SchemaCache)
runQuery pool isoL userInfo sc hMgr strfyNum query = do
res <- liftIO $ runExceptT $
peelRun sc userInfo hMgr strfyNum pool isoL $ runQueryM query
liftEither res
=> Q.PGPool -> Q.TxIsolation -> InstanceId
-> UserInfo -> SchemaCache -> HTTP.Manager
-> Bool -> RQLQuery -> m (BL.ByteString, SchemaCache)
runQuery pool isoL instanceId userInfo sc hMgr strfyNum query = do
resE <- liftIO $ runExceptT $
peelRun sc userInfo hMgr strfyNum pool isoL $ runQueryM query
either throwError withReload resE
where
withReload r = do
when (queryNeedsReload query) $ do
e <- liftIO $ runExceptT $ Q.runTx pool (isoL, Nothing)
$ recordSchemaUpdate instanceId
liftEither e
return r
queryNeedsReload :: RQLQuery -> Bool
queryNeedsReload qi = case qi of

View File

@ -0,0 +1,210 @@
module Hasura.Server.SchemaUpdate
(startSchemaSync)
where
import Hasura.Prelude
import Hasura.Logging
import Hasura.RQL.DDL.Schema.Table (buildSchemaCache)
import Hasura.RQL.Types
import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate)
import Hasura.Server.Init (InstanceId (..))
import Hasura.Server.Logging
import Hasura.Server.Query
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
import qualified Control.Concurrent as C
import qualified Control.Concurrent.STM as STM
import qualified Data.Text as T
import qualified Data.Time as UTC
import qualified Database.PG.Query as PG
import qualified Database.PostgreSQL.LibPQ as PQ
import qualified Network.HTTP.Client as HTTP
pgChannel :: PG.PGChannel
pgChannel = "hasura_schema_update"
data ThreadType
= TTListener
| TTProcessor
deriving (Eq)
instance Show ThreadType where
show TTListener = "listener"
show TTProcessor = "processor"
data SchemaSyncThreadLog
= SchemaSyncThreadLog
{ suelLogLevel :: !LogLevel
, suelThreadType :: !ThreadType
, suelInfo :: !Value
} deriving (Show, Eq)
instance ToJSON SchemaSyncThreadLog where
toJSON (SchemaSyncThreadLog _ t info) =
object [ "thread_type" .= show t
, "info" .= info
]
instance ToEngineLog SchemaSyncThreadLog where
toEngineLog threadLog =
(suelLogLevel threadLog, "schema_sync_thread", toJSON threadLog)
data EventPayload
= EventPayload
{ _epInstanceId :: !InstanceId
, _epOccurredAt :: !UTC.UTCTime
} deriving (Show, Eq)
$(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload)
data ThreadError
= TEJsonParse !T.Text
| TEQueryError !QErr
$(deriveToJSON
defaultOptions { constructorTagModifier = snakeCase . drop 2
, sumEncoding = TaggedObject "type" "info"
}
''ThreadError)
-- | An IO action that enables metadata syncing
startSchemaSync
:: Bool
-> PG.PGPool
-> Logger
-> HTTP.Manager
-> SchemaCacheRef
-> InstanceId
-> Maybe UTC.UTCTime -> IO ()
startSchemaSync strfyNum pool logger httpMgr cacheRef instanceId cacheInitTime = do
-- Init events queue
eventsQueue <- STM.newTQueueIO
-- Start listener thread
lTId <- C.forkIO $ listener strfyNum pool
logger httpMgr eventsQueue cacheRef instanceId cacheInitTime
logThreadStarted TTListener lTId
-- Start processor thread
pTId <- C.forkIO $ processor strfyNum pool
logger httpMgr eventsQueue cacheRef instanceId
logThreadStarted TTProcessor pTId
where
logThreadStarted threadType threadId =
let msg = T.pack (show threadType) <> " thread started"
in unLogger logger $
StartupLog LevelInfo "threads" $
object [ "instance_id" .= getInstanceId instanceId
, "thread_id" .= show threadId
, "message" .= msg
]
-- | An IO action that listens to postgres for events and pushes them to a Queue
listener
:: Bool
-> PG.PGPool
-> Logger
-> HTTP.Manager
-> STM.TQueue EventPayload
-> SchemaCacheRef
-> InstanceId
-> Maybe UTC.UTCTime -> IO ()
listener strfyNum pool logger httpMgr eventsQueue
cacheRef instanceId cacheInitTime =
-- Never exits
forever $ do
listenResE <-
liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler
either onError return listenResE
logWarn
C.threadDelay $ 1 * 1000 * 1000 -- 1 second
where
threadType = TTListener
shouldRefresh dbInstId accrdAt =
case cacheInitTime of
Nothing -> True
Just time -> (dbInstId /= instanceId) && accrdAt > time
refreshCache Nothing = return ()
refreshCache (Just (dbInstId, accrdAt)) =
when (shouldRefresh dbInstId accrdAt) $
refreshSchemaCache strfyNum pool logger httpMgr cacheRef
threadType "schema cache reloaded after postgres listen init"
notifyHandler = \case
PG.PNEOnStart -> do
eRes <- runExceptT $ PG.runTx pool
(PG.Serializable, Nothing) fetchLastUpdate
case eRes of
Left e -> onError e
Right mLastUpd -> refreshCache mLastUpd
PG.PNEPQNotify notif ->
case eitherDecodeStrict $ PQ.notifyExtra notif of
Left e -> logError logger threadType $ TEJsonParse $ T.pack e
Right payload -> do
logInfo logger threadType $ object ["received_event" .= payload]
-- Push a notify event to Queue
STM.atomically $ STM.writeTQueue eventsQueue payload
onError = logError logger threadType . TEQueryError
logWarn = unLogger logger $
SchemaSyncThreadLog LevelWarn TTListener $ String
"error occurred, retrying postgres listen after 1 second"
-- | An IO action that processes events from Queue
processor
:: Bool
-> PG.PGPool
-> Logger
-> HTTP.Manager
-> STM.TQueue EventPayload
-> SchemaCacheRef
-> InstanceId -> IO ()
processor strfyNum pool logger httpMgr eventsQueue
cacheRef instanceId =
-- Never exits
forever $ do
event <- STM.atomically $ STM.readTQueue eventsQueue
logInfo logger threadType $ object ["processed_event" .= event]
when (shouldReload event) $
refreshSchemaCache strfyNum pool logger httpMgr cacheRef
threadType "schema cache reloaded"
where
threadType = TTProcessor
-- If event is from another server
shouldReload payload = _epInstanceId payload /= instanceId
refreshSchemaCache
:: Bool
-> PG.PGPool
-> Logger
-> HTTP.Manager
-> SchemaCacheRef
-> ThreadType
-> T.Text -> IO ()
refreshSchemaCache strfyNum pool logger httpManager cacheRef threadType msg = do
-- Reload schema cache from catalog
resE <- liftIO $ runExceptT $ withSCUpdate cacheRef $
peelRun emptySchemaCache adminUserInfo
httpManager strfyNum pool PG.Serializable buildSchemaCache
case resE of
Left e -> logError logger threadType $ TEQueryError e
Right _ ->
logInfo logger threadType $ object ["message" .= msg]
logInfo :: Logger -> ThreadType -> Value -> IO ()
logInfo logger threadType val = unLogger logger $
SchemaSyncThreadLog LevelInfo threadType val
logError :: ToJSON a => Logger -> ThreadType -> a -> IO ()
logError logger threadType err =
unLogger logger $ SchemaSyncThreadLog LevelError threadType $
object ["error" .= toJSON err]

View File

@ -405,3 +405,32 @@ CREATE TABLE hdb_catalog.remote_schemas (
definition JSON,
comment TEXT
);
CREATE TABLE hdb_catalog.hdb_schema_update_event (
id BIGSERIAL PRIMARY KEY,
instance_id uuid NOT NULL,
occurred_at timestamptz NOT NULL DEFAULT NOW()
);
CREATE FUNCTION hdb_catalog.hdb_schema_update_event_notifier() RETURNS trigger AS
$function$
DECLARE
instance_id uuid;
occurred_at timestamptz;
curr_rec record;
BEGIN
instance_id = NEW.instance_id;
occurred_at = NEW.occurred_at;
PERFORM pg_notify('hasura_schema_update', json_build_object(
'instance_id', instance_id,
'occurred_at', occurred_at
)::text);
RETURN curr_rec;
END;
$function$
LANGUAGE plpgsql;
CREATE TRIGGER hdb_schema_update_event_notifier AFTER INSERT ON hdb_catalog.hdb_schema_update_event
FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.hdb_schema_update_event_notifier();

View File

@ -0,0 +1,26 @@
CREATE TABLE hdb_catalog.hdb_schema_update_event (
id BIGSERIAL PRIMARY KEY,
instance_id uuid NOT NULL,
occurred_at timestamptz NOT NULL DEFAULT NOW()
);
CREATE FUNCTION hdb_catalog.hdb_schema_update_event_notifier() RETURNS trigger AS
$function$
DECLARE
instance_id uuid;
occurred_at timestamptz;
curr_rec record;
BEGIN
instance_id = NEW.instance_id;
occurred_at = NEW.occurred_at;
PERFORM pg_notify('hasura_schema_update', json_build_object(
'instance_id', instance_id,
'occurred_at', occurred_at
)::text);
RETURN curr_rec;
END;
$function$
LANGUAGE plpgsql;
CREATE TRIGGER hdb_schema_update_event_notifier AFTER INSERT ON hdb_catalog.hdb_schema_update_event
FOR EACH ROW EXECUTE PROCEDURE hdb_catalog.hdb_schema_update_event_notifier();

View File

@ -16,8 +16,8 @@ packages:
# Packages to be pulled from upstream that are not in the resolver (e.g., acme-missiles-0.3)
extra-deps:
# use https URLs so that build systems can clone these repos
- git: https://github.com/hasura/pg-client-hs.git
commit: f3d1e9e67bdfbfa3de85b7cbdb4c557dce7fd84d
- git: https://github.com/rakeshkky/pg-client-hs.git
commit: ed3dcfb864a2a23ac6c22ed947a5095b0d03170d
- git: https://github.com/hasura/graphql-parser-hs.git
commit: ff95d9a96aa5ef9e5390f8712958e4118e3831f6
- ginger-0.8.1.0
@ -27,6 +27,9 @@ extra-deps:
- deferred-folds-0.9.9
- primitive-0.6.4.0
# extra dep for pg-client-hs
- select-0.4.0.1
# jose for the x5t bugfix (0.8.0.0)
- git: https://github.com/frasertweedale/hs-jose.git
commit: d47572fb0650ac6cc2c9e00711c7f99132d897cb

View File

@ -50,6 +50,13 @@ def pytest_addoption(parser):
help="Run Test cases with GraphQL queries being disabled"
)
parser.addoption(
"--test-hge-scale-url",
metavar="<url>",
required=False,
help="Run testcases for horizontal scaling"
)
@pytest.fixture(scope='session')
def hge_ctx(request):
@ -63,6 +70,7 @@ def hge_ctx(request):
hge_jwt_conf = request.config.getoption('--hge-jwt-conf')
ws_read_cookie = request.config.getoption('--test-ws-init-cookie')
metadata_disabled = request.config.getoption('--test-metadata-disabled')
hge_scale_url = request.config.getoption('--test-hge-scale-url')
try:
hge_ctx = HGECtx(
hge_url=hge_url,
@ -73,7 +81,8 @@ def hge_ctx(request):
hge_jwt_key_file=hge_jwt_key_file,
hge_jwt_conf=hge_jwt_conf,
ws_read_cookie=ws_read_cookie,
metadata_disabled=metadata_disabled
metadata_disabled=metadata_disabled,
hge_scale_url=hge_scale_url
)
except HGECtxError as e:
pytest.exit(str(e))

View File

@ -75,7 +75,7 @@ class WebhookServer(http.server.HTTPServer):
class HGECtx:
def __init__(self, hge_url, pg_url, hge_key, hge_webhook, webhook_insecure,
hge_jwt_key_file, hge_jwt_conf, metadata_disabled, ws_read_cookie):
hge_jwt_key_file, hge_jwt_conf, metadata_disabled, ws_read_cookie, hge_scale_url):
server_address = ('0.0.0.0', 5592)
self.resp_queue = queue.Queue(maxsize=1)
@ -118,6 +118,8 @@ class HGECtx:
self.ws_read_cookie = ws_read_cookie
self.hge_scale_url = hge_scale_url
result = subprocess.run(['../../scripts/get-version.sh'], shell=False, stdout=subprocess.PIPE, check=True)
self.version = result.stdout.decode('utf-8').strip()
if not self.metadata_disabled:

View File

@ -0,0 +1,93 @@
-
operation:
server: '1'
query:
type: bulk
args:
- type: run_sql
args:
sql: |
create table test_t1(
t1_c1 int,
t1_c2 text,
PRIMARY KEY (t1_c1)
);
- type: track_table
args:
schema: public
name: test_t1
- type: run_sql
args:
sql: |
insert into test_t1(t1_c1, t1_c2) VALUES(1, 'table1');
validate:
server: '2'
response:
data:
test_t1:
- t1_c1: 1
t1_c2: table1
query:
query: |
query {
test_t1 {
t1_c1
t1_c2
}
}
-
operation:
server: '2'
query:
type: bulk
args:
- type: run_sql
args:
sql: |
create table test_t2(
t2_c1 int,
t2_c2 text,
PRIMARY KEY (t2_c1)
);
- type: run_sql
args:
sql: |
ALTER TABLE test_t2 ADD FOREIGN KEY (t2_c1) REFERENCES test_t1 (t1_c1);
- type: track_table
args:
schema: public
name: test_t2
- type: create_object_relationship
args:
name: testT1Byc1
table:
name: test_t2
schema: public
using:
foreign_key_constraint_on: t2_c1
- type: run_sql
args:
sql: |
insert into test_t2(t2_c1, t2_c2) VALUES(1, 'table2');
validate:
server: '1'
query:
query: |
query {
test_t2 {
t2_c1
t2_c2
testT1Byc1 {
t1_c1
t1_c2
}
}
}
response:
data:
test_t2:
- t2_c1: 1
t2_c2: table2
testT1Byc1:
t1_c1: 1
t1_c2: table1

View File

@ -0,0 +1,12 @@
type: bulk
args:
- type: clear_metadata
args: {}
- type: run_sql
args:
sql: |
drop table test_t2;
- type: run_sql
args:
sql: |
drop table test_t1;

View File

@ -0,0 +1,60 @@
import pytest
import yaml
import time
import jsondiff
from validate import json_ordered
if not pytest.config.getoption("--test-hge-scale-url"):
pytest.skip("--test-hge-scale-url flag is missing, skipping tests", allow_module_level=True)
class TestHorizantalScaleBasic():
servers = {}
@pytest.fixture(autouse=True, scope='class')
def transact(self, hge_ctx):
self.servers['1'] = hge_ctx.hge_url
self.servers['2'] = hge_ctx.hge_scale_url
yield
# teardown
st_code, resp = hge_ctx.v1q_f(self.dir() + '/teardown.yaml')
assert st_code == 200, resp
def test_horizontal_scale_basic(self, hge_ctx):
with open(self.dir() + "/steps.yaml") as c:
conf = yaml.load(c)
assert isinstance(conf, list) == True, 'Not an list'
for _, step in enumerate(conf):
# execute operation
response = hge_ctx.http.post(
self.servers[step['operation']['server']] + "/v1/query",
json=step['operation']['query']
)
st_code = response.status_code
resp = response.json()
assert st_code == 200, resp
# wait for 30 sec
time.sleep(30)
# validate data
response = hge_ctx.http.post(
self.servers[step['validate']['server']] + "/v1alpha1/graphql",
json=step['validate']['query']
)
st_code = response.status_code
resp = response.json()
assert st_code == 200, resp
if 'response' in step['validate']:
assert json_ordered(resp) == json_ordered(step['validate']['response']), yaml.dump({
'response': resp,
'expected': step['validate']['response'],
'diff': jsondiff.diff(step['validate']['response'], resp)
})
@classmethod
def dir(cls):
return 'queries/horizontal_scale/basic'