server/mssql: source catalog initialization for event triggers (Incremental PR - I)

PR-URL: https://github.com/hasura/graphql-engine-mono/pull/2505
Co-authored-by: Naveen Naidu <30195193+Naveenaidu@users.noreply.github.com>
GitOrigin-RevId: 56681f90cfbfcf2f99c27f08c01d32790bd03c4d
This commit is contained in:
Karthikeyan Chinnakonda 2022-02-24 13:43:19 +05:30 committed by hasura-bot
parent 91cc962e5d
commit bea650b3e0
14 changed files with 205 additions and 59 deletions

View File

@ -382,6 +382,7 @@ library
, Hasura.Backends.MSSQL.DDL.BoolExp
, Hasura.Backends.MSSQL.DDL.RunSQL
, Hasura.Backends.MSSQL.DDL.Source
, Hasura.Backends.MSSQL.DDL.Source.Version
, Hasura.Backends.MSSQL.Execute.MutationResponse
, Hasura.Backends.MSSQL.Execute.Delete
, Hasura.Backends.MSSQL.Execute.Insert

View File

@ -2,6 +2,7 @@ module Database.MSSQL.Transaction
( TxET (..),
MSSQLTxError (..),
TxT,
TxE,
runTx,
runTxE,
unitQuery,
@ -16,7 +17,7 @@ module Database.MSSQL.Transaction
where
import Control.Exception (try)
import Control.Monad.Morph (hoist)
import Control.Monad.Morph (MFunctor (hoist))
import Control.Monad.Trans.Control (MonadBaseControl)
import Database.MSSQL.Pool
import Database.ODBC.SQLServer (FromRow)
@ -40,6 +41,9 @@ newtype TxET e m a = TxET
MonadFix
)
instance MFunctor (TxET e) where
hoist f = TxET . hoist (hoist f) . txHandler
instance MonadTrans (TxET e) where
lift = TxET . lift . lift
@ -50,6 +54,8 @@ data MSSQLTxError
| MSSQLInternal !Text
deriving (Eq, Show)
type TxE e a = TxET e IO a
-- | The transaction command to run, returning an MSSQLTxError or the result.
type TxT m a = TxET MSSQLTxError m a

View File

@ -9,6 +9,7 @@ module Hasura.Backends.MSSQL.Connection
( MSSQLConnConfiguration (MSSQLConnConfiguration),
MSSQLSourceConfig (MSSQLSourceConfig, _mscExecCtx),
MSSQLExecCtx (..),
MonadMSSQLTx (..),
createMSSQLPool,
getEnv,
odbcValueToJValue,
@ -16,6 +17,7 @@ module Hasura.Backends.MSSQL.Connection
)
where
import Control.Monad.Morph (hoist)
import Control.Monad.Trans.Control
import Data.Aeson
import Data.Aeson qualified as J
@ -30,6 +32,25 @@ import Hasura.Base.Error
import Hasura.Incremental (Cacheable (..))
import Hasura.Prelude
class MonadError QErr m => MonadMSSQLTx m where
liftMSSQLTx :: MSTx.TxE QErr a -> m a
instance MonadMSSQLTx m => MonadMSSQLTx (ReaderT s m) where
liftMSSQLTx = lift . liftMSSQLTx
instance MonadMSSQLTx m => MonadMSSQLTx (StateT s m) where
liftMSSQLTx = lift . liftMSSQLTx
instance (Monoid w, MonadMSSQLTx m) => MonadMSSQLTx (WriterT w m) where
liftMSSQLTx = lift . liftMSSQLTx
instance MonadIO m => MonadMSSQLTx (MSTx.TxET QErr m) where
liftMSSQLTx = hoist liftIO
-- | ODBC connection string for MSSQL server
newtype MSSQLConnectionString = MSSQLConnectionString {unMSSQLConnectionString :: Text}
deriving (Show, Eq, ToJSON, FromJSON, Cacheable, Hashable, NFData)
-- * Orphan instances
instance Cacheable MSPool.ConnectionString

View File

@ -10,16 +10,26 @@ module Hasura.Backends.MSSQL.DDL.Source
( resolveSourceConfig,
resolveDatabaseMetadata,
postDropSourceHook,
initCatalogForSource,
)
where
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Environment qualified as Env
import Data.FileEmbed (makeRelativeToProject)
import Database.MSSQL.Transaction
import Database.MSSQL.Transaction qualified as Tx
import Database.ODBC.SQLServer
import Database.ODBC.TH qualified as ODBC
import Hasura.Backends.MSSQL.Connection
import Hasura.Backends.MSSQL.DDL.Source.Version
import Hasura.Backends.MSSQL.Meta
import Hasura.Backends.MSSQL.SQL.Error qualified as HGE
import Hasura.Backends.MSSQL.Types
import Hasura.Base.Error
import Hasura.Prelude
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger (RecreateEventTriggers (..))
import Hasura.RQL.Types.Source
import Hasura.RQL.Types.SourceCustomization
import Hasura.SQL.Backend
@ -46,9 +56,49 @@ resolveDatabaseMetadata config customization = runExceptT do
MSSQLSourceConfig _connString mssqlExecCtx = config
postDropSourceHook ::
(MonadIO m) =>
(MonadIO m, MonadBaseControl IO m) =>
MSSQLSourceConfig ->
m ()
postDropSourceHook (MSSQLSourceConfig _ mssqlExecCtx) = do
_ <- runExceptT $ mssqlRunReadWrite mssqlExecCtx dropSourceCatalog
-- Close the connection
liftIO $ mssqlDestroyConn mssqlExecCtx
doesSchemaExist :: MonadMSSQLTx m => SchemaName -> m Bool
doesSchemaExist (SchemaName schemaName) = do
liftMSSQLTx $
Tx.singleRowQueryE
HGE.defaultMSSQLTxErrorHandler
[ODBC.sql|
SELECT CAST (
CASE
WHEN EXISTS( SELECT 1 FROM sys.schemas WHERE name = $schemaName )
THEN 1
ELSE 0
END
AS BIT)
|]
-- | Initialise catalog tables for a source, including those required by the event delivery subsystem.
initCatalogForSource :: MonadMSSQLTx m => m RecreateEventTriggers
initCatalogForSource = do
hdbCatalogExist <- doesSchemaExist "hdb_catalog"
if
-- Fresh database
| not hdbCatalogExist -> liftMSSQLTx do
unitQueryE HGE.defaultMSSQLTxErrorHandler "CREATE SCHEMA hdb_catalog"
initSourceCatalog
return RETDoNothing
-- TODO: When we need to make any changes to the source catalog, we'll have to introduce code which which will migrate
-- from one source catalog version to the next one
| otherwise -> pure RETDoNothing
where
initSourceCatalog = do
unitQueryE HGE.defaultMSSQLTxErrorHandler $(makeRelativeToProject "src-rsr/init_mssql_source.sql" >>= ODBC.sqlFile)
setSourceCatalogVersion latestSourceCatalogVersion
dropSourceCatalog :: MonadMSSQLTx m => m ()
dropSourceCatalog = do
let sql = $(makeRelativeToProject "src-rsr/drop_mssql_source.sql" >>= ODBC.sqlFile)
liftMSSQLTx $ unitQueryE HGE.defaultMSSQLTxErrorHandler sql

View File

@ -0,0 +1,20 @@
module Hasura.Backends.MSSQL.DDL.Source.Version
( latestSourceCatalogVersion,
setSourceCatalogVersion,
)
where
import Database.MSSQL.Transaction
import Database.ODBC.SQLServer
import Database.ODBC.TH qualified as ODBC
import Hasura.Backends.MSSQL.Connection (MonadMSSQLTx (..))
import Hasura.Backends.MSSQL.SQL.Error qualified as HGE
import Hasura.Prelude
latestSourceCatalogVersion :: Int
latestSourceCatalogVersion = 1
setSourceCatalogVersion :: MonadMSSQLTx m => Int -> m ()
setSourceCatalogVersion version = liftMSSQLTx $ unitQueryE HGE.defaultMSSQLTxErrorHandler setSourceCatalogVersionQuery
where
setSourceCatalogVersionQuery = [ODBC.sql| INSERT INTO hdb_catalog.hdb_source_catalog_version(version, upgraded_on) VALUES ($version, SYSDATETIMEOFFSET()) |]

View File

@ -132,7 +132,7 @@ instance FromJSON SysForeignKeyColumn where
transformTable :: SysTable -> (TableName, DBTableMetadata 'MSSQL)
transformTable tableInfo =
let schemaName = ssName $ staJoinedSysSchema tableInfo
let schemaName = SchemaName $ ssName $ staJoinedSysSchema tableInfo
tableName = TableName (staName tableInfo) schemaName
tableOID = OID $ staObjectId tableInfo
(columns, foreignKeys) = unzip $ transformColumn <$> staJoinedSysColumn tableInfo
@ -167,7 +167,7 @@ transformColumn columnInfo =
scJoinedForeignKeyColumns columnInfo <&> \foreignKeyColumn ->
let _fkConstraint = Constraint "fk_mssql" $ OID $ sfkcConstraintObjectId foreignKeyColumn
schemaName = ssName $ sfkcJoinedReferencedSysSchema foreignKeyColumn
schemaName = SchemaName $ ssName $ sfkcJoinedReferencedSysSchema foreignKeyColumn
_fkForeignTable = TableName (sfkcJoinedReferencedTableName foreignKeyColumn) schemaName
_fkColumnMapping = HM.singleton rciName $ ColumnName $ sfkcJoinedReferencedColumnName foreignKeyColumn
in ForeignKey {..}

View File

@ -748,7 +748,7 @@ fromJsonFieldSpec =
FieldPath r f -> go r <+> ".\"" <+> fromString (T.unpack f) <+> "\""
fromTableName :: TableName -> Printer
fromTableName TableName {tableName, tableSchema} =
fromTableName (TableName tableName (SchemaName tableSchema)) =
fromNameText tableSchema <+> "." <+> fromNameText tableName
fromAliased :: Aliased Printer -> Printer

View File

@ -162,7 +162,7 @@ instance ToTxt ScalarType where
toTxt = tshow -- TODO: include schema
instance ToTxt TableName where
toTxt TableName {tableName, tableSchema} =
toTxt (TableName tableName (SchemaName tableSchema)) =
if tableSchema == "dbo"
then tableName
else tableSchema <> "." <> tableName
@ -200,7 +200,7 @@ instance ToJSON TableName where
toJSON = genericToJSON hasuraJSON
instance ToJSONKey TableName where
toJSONKey = toJSONKeyText $ \(TableName schema name) -> schema <> "." <> name
toJSONKey = toJSONKeyText $ \(TableName name (SchemaName schema)) -> schema <> "." <> name
instance ToJSONKey ScalarType

View File

@ -104,12 +104,14 @@ import Data.Aeson qualified as J
import Data.Text.Encoding (encodeUtf8)
import Database.ODBC.SQLServer qualified as ODBC
import Hasura.Base.Error
import Hasura.Incremental (Cacheable)
import Hasura.Prelude
import Hasura.RQL.Types.Common qualified as RQL
import Hasura.SQL.Backend
import Hasura.SQL.GeoJSON qualified as Geo
import Hasura.SQL.WKT qualified as WKT
import Language.GraphQL.Draft.Syntax qualified as G
import Language.Haskell.TH.Syntax (Lift)
--------------------------------------------------------------------------------
-- Phantom pretend-generic types that are actually specific
@ -467,13 +469,12 @@ data Aliased a = Aliased
aliasedAlias :: Text
}
newtype SchemaName = SchemaName
{ schemaNameParts :: [Text]
}
newtype SchemaName = SchemaName {_unSchemaName :: Text}
deriving (Show, Eq, Ord, Data, J.ToJSON, J.FromJSON, NFData, Generic, Cacheable, IsString, Hashable, Lift)
data TableName = TableName
{ tableName :: Text,
tableSchema :: Text
{ tableName :: !Text,
tableSchema :: !SchemaName
}
data FieldName = FieldName
@ -668,7 +669,7 @@ getGQLTableName tn = do
"cannot include " <> textName <> " in the GraphQL schema because it is not a valid GraphQL identifier"
snakeCaseTableName :: TableName -> Text
snakeCaseTableName TableName {tableName, tableSchema} =
snakeCaseTableName (TableName tableName (SchemaName tableSchema)) =
if tableSchema == "dbo"
then tableName
else tableSchema <> "_" <> tableName

View File

@ -45,7 +45,6 @@ import Hasura.RQL.Types.SourceCustomization
import Hasura.RQL.Types.Table
import Hasura.SQL.Backend
import Hasura.Server.Migrate.Internal
import Hasura.Server.Types (EventingMode (..), MaintenanceMode (..), ReadOnlyMode (..))
import Language.Haskell.TH.Lib qualified as TH
import Language.Haskell.TH.Syntax qualified as TH
@ -148,19 +147,13 @@ resolveDatabaseMetadata sourceConfig sourceCustomization = runExceptT do
-- | Initialise catalog tables for a source, including those required by the event delivery subsystem.
initCatalogForSource ::
forall m. MonadTx m => MaintenanceMode -> EventingMode -> ReadOnlyMode -> UTCTime -> m RecreateEventTriggers
initCatalogForSource maintenanceMode eventingMode readOnlyMode migrationTime = do
forall m. MonadTx m => UTCTime -> m RecreateEventTriggers
initCatalogForSource migrationTime = do
hdbCatalogExist <- doesSchemaExist "hdb_catalog"
eventLogTableExist <- doesTableExist "hdb_catalog" "event_log"
sourceVersionTableExist <- doesTableExist "hdb_catalog" "hdb_source_catalog_version"
if
-- when safe mode is enabled, don't perform any migrations
| readOnlyMode == ReadOnlyModeEnabled -> pure RETDoNothing
-- when eventing mode is disabled, don't perform any migrations
| eventingMode == EventingDisabled -> pure RETDoNothing
-- when maintenance mode is enabled, don't perform any migrations
| maintenanceMode == MaintenanceModeEnabled -> pure RETDoNothing
-- Fresh database
| not hdbCatalogExist -> liftTx do
Q.unitQE defaultTxErrorHandler "CREATE SCHEMA hdb_catalog" () False

View File

@ -21,7 +21,6 @@ module Hasura.RQL.DDL.Schema.Cache
where
import Control.Arrow.Extended
import Control.Concurrent.Async.Lifted.Safe qualified as LA
import Control.Lens hiding ((.=))
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Retry qualified as Retry
@ -39,8 +38,10 @@ import Data.Text.Extended
import Data.These (These (..))
import Data.Time.Clock (getCurrentTime)
import Database.PG.Query qualified as Q
import Hasura.Backends.MSSQL.Connection
import Hasura.Backends.MSSQL.DDL.Source qualified as MSSQL
import Hasura.Backends.Postgres.Connection
import Hasura.Backends.Postgres.DDL.Source (initCatalogForSource, logPGSourceCatalogMigrationLockedQueries)
import Hasura.Backends.Postgres.DDL.Source (initCatalogForSource)
import Hasura.Base.Error
import Hasura.GraphQL.Execute.Types
import Hasura.GraphQL.RemoteServer (getSchemaIntrospection)
@ -70,7 +71,8 @@ import Hasura.SQL.AnyBackend qualified as AB
import Hasura.SQL.Tag
import Hasura.SQL.Tag qualified as Tag
import Hasura.Server.Types
( MaintenanceMode (..),
( EventingMode (..),
MaintenanceMode (..),
ReadOnlyMode (..),
)
import Hasura.Session
@ -410,7 +412,8 @@ buildSchemaCacheRule logger env = proc (metadata, invalidationKeys) -> do
MonadIO m,
BackendMetadata b,
HasServerConfigCtx m,
MonadError QErr m
MonadError QErr m,
MonadBaseControl IO m
) =>
(Int, SourceConfig b) `arr` RecreateEventTriggers
initCatalogIfNeeded = Inc.cache proc (numEventTriggers, sourceConfig) -> do
@ -418,38 +421,47 @@ buildSchemaCacheRule logger env = proc (metadata, invalidationKeys) -> do
-< do
if numEventTriggers > 0
then do
case backendTag @b of
Tag.PostgresVanillaTag -> do
migrationTime <- liftIO getCurrentTime
maintenanceMode <- _sccMaintenanceMode <$> askServerConfigCtx
eventingMode <- _sccEventingMode <$> askServerConfigCtx
readOnlyMode <- _sccReadOnlyMode <$> askServerConfigCtx
liftEitherM $
liftIO $
LA.withAsync (logPGSourceCatalogMigrationLockedQueries logger sourceConfig) $
const $ do
let initCatalogAction =
runExceptT $ runTx (_pscExecCtx sourceConfig) Q.ReadWrite (initCatalogForSource maintenanceMode eventingMode readOnlyMode migrationTime)
-- The `initCatalogForSource` action is retried here because
-- in cloud there will be multiple workers (graphql-engine instances)
-- trying to migrate the source catalog, when needed. This introduces
-- a race condition as both the workers try to migrate the source catalog
-- concurrently and when one of them succeeds the other ones will fail
-- and be in an inconsistent state. To avoid the inconsistency, we retry
-- migrating the catalog on error and in the retry `initCatalogForSource`
-- will see that the catalog is already migrated, so it won't attempt the
-- migration again
Retry.retrying
( Retry.constantDelay (fromIntegral $ diffTimeToMicroSeconds $ seconds $ Seconds 10)
<> Retry.limitRetries 3
)
(const $ return . isLeft)
(const initCatalogAction)
-- TODO: When event triggers are supported on new databases,
-- the initialization of the source catalog should also return
-- if the event triggers are to be re-created or not, essentially
-- replacing the `RETDoNothing` below
_ -> pure RETDoNothing
migrationTime <- liftIO getCurrentTime
maintenanceMode <- _sccMaintenanceMode <$> askServerConfigCtx
eventingMode <- _sccEventingMode <$> askServerConfigCtx
readOnlyMode <- _sccReadOnlyMode <$> askServerConfigCtx
if
-- when safe mode is enabled, don't perform any migrations
| readOnlyMode == ReadOnlyModeEnabled -> pure RETDoNothing
-- when eventing mode is disabled, don't perform any migrations
| eventingMode == EventingDisabled -> pure RETDoNothing
-- when maintenance mode is enabled, don't perform any migrations
| maintenanceMode == MaintenanceModeEnabled -> pure RETDoNothing
| otherwise -> do
let initCatalogAction =
case backendTag @b of
Tag.PostgresVanillaTag -> do
runExceptT $ runTx (_pscExecCtx sourceConfig) Q.ReadWrite (initCatalogForSource migrationTime)
Tag.MSSQLTag -> do
runExceptT $
mssqlRunReadWrite (_mscExecCtx sourceConfig) MSSQL.initCatalogForSource
-- TODO: When event triggers are supported on new databases,
-- the initialization of the source catalog should also return
-- if the event triggers are to be re-created or not, essentially
-- replacing the `RETDoNothing` below
_ -> pure $ Right RETDoNothing
-- The `initCatalogForSource` action is retried here because
-- in cloud there will be multiple workers (graphql-engine instances)
-- trying to migrate the source catalog, when needed. This introduces
-- a race condition as both the workers try to migrate the source catalog
-- concurrently and when one of them succeeds the other ones will fail
-- and be in an inconsistent state. To avoid the inconsistency, we retry
-- migrating the catalog on error and in the retry `initCatalogForSource`
-- will see that the catalog is already migrated, so it won't attempt the
-- migration again
liftEither
=<< Retry.retrying
( Retry.constantDelay (fromIntegral $ diffTimeToMicroSeconds $ seconds $ Seconds 10)
<> Retry.limitRetries 3
)
(const $ return . isLeft)
(const initCatalogAction)
else pure RETDoNothing
buildSource ::

View File

@ -0,0 +1,4 @@
DROP TABLE IF EXISTS hdb_catalog.hdb_source_catalog_version;
DROP TABLE IF EXISTS hdb_catalog.event_invocation_logs;
DROP TABLE IF EXISTS hdb_catalog.event_log;
DROP SCHEMA IF EXISTS hdb_catalog;

View File

@ -0,0 +1,38 @@
CREATE TABLE hdb_catalog.hdb_source_catalog_version (
version INTEGER NOT NULL PRIMARY KEY,
upgraded_on DATETIME2(7) NOT NULL
);
CREATE TABLE hdb_catalog.event_log
(
id UNIQUEIDENTIFIER DEFAULT newid() PRIMARY KEY,
schema_name NVARCHAR(MAX) NOT NULL,
table_name NVARCHAR(MAX) NOT NULL,
trigger_name NVARCHAR(MAX) NOT NULL,
payload NVARCHAR(MAX) NOT NULL,
delivered BIT NOT NULL DEFAULT 0,
error BIT NOT NULL DEFAULT 0,
tries INTEGER NOT NULL DEFAULT 0,
created_at DATETIMEOFFSET(7) NOT NULL DEFAULT SYSDATETIMEOFFSET(),
locked DATETIMEOFFSET(7),
next_retry_at DATETIMEOFFSET(7),
archived BIT NOT NULL DEFAULT 0
);
/* This index powers `fetchEvents` */
CREATE INDEX event_log_fetch_events
ON hdb_catalog.event_log (created_at)
WHERE delivered = 0
AND error = 0
AND archived = 0;
CREATE TABLE hdb_catalog.event_invocation_logs (
id UNIQUEIDENTIFIER NOT NULL DEFAULT newid(),
event_id UNIQUEIDENTIFIER NOT NULL,
status INTEGER,
request NVARCHAR(MAX),
response NVARCHAR(MAX),
created_at DATETIMEOFFSET(7) NOT NULL DEFAULT SYSDATETIMEOFFSET(),
FOREIGN KEY (event_id) REFERENCES hdb_catalog.event_log(id)
);

View File

@ -47,7 +47,7 @@ CREATE INDEX ON hdb_catalog.event_log (trigger_name);
/* This index powers `fetchEvents` */
CREATE INDEX event_log_fetch_events
ON hdb_catalog.event_log (locked NULLS FIRST, next_retry_at NULLS FIRST, created_at)
WHERE delivered = 'f'
WHERE delivered = 'f'
and error = 'f'
and archived = 'f'
;