mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-15 09:22:43 +03:00
server: source initialization fix
GitOrigin-RevId: fcb94ca743a99ee3ffe30d40717bb1f0a13cf751
This commit is contained in:
parent
148d3756d4
commit
f1f56ccf75
@ -133,6 +133,7 @@ library
|
|||||||
, postgresql-libpq
|
, postgresql-libpq
|
||||||
, process
|
, process
|
||||||
, profunctors
|
, profunctors
|
||||||
|
, retry
|
||||||
, safe-exceptions
|
, safe-exceptions
|
||||||
, scientific
|
, scientific
|
||||||
, semialign
|
, semialign
|
||||||
|
@ -61,6 +61,7 @@ module Data.Time.Clock.Units
|
|||||||
-- - a 'DiffTime' or 'NominalDiffTime' may be negative
|
-- - a 'DiffTime' or 'NominalDiffTime' may be negative
|
||||||
-- - 'addUTCTime' and 'diffUTCTime' do not attempt to handle leap seconds
|
-- - 'addUTCTime' and 'diffUTCTime' do not attempt to handle leap seconds
|
||||||
, DiffTime
|
, DiffTime
|
||||||
|
, diffTimeToMicroSeconds
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Prelude
|
import Prelude
|
||||||
@ -182,3 +183,6 @@ instance Duration NominalDiffTime where
|
|||||||
-- | Safe conversion between duration units.
|
-- | Safe conversion between duration units.
|
||||||
convertDuration :: (Duration x, Duration y) => x -> y
|
convertDuration :: (Duration x, Duration y) => x -> y
|
||||||
convertDuration = fromDiffTime . toDiffTime
|
convertDuration = fromDiffTime . toDiffTime
|
||||||
|
|
||||||
|
diffTimeToMicroSeconds :: DiffTime -> Integer
|
||||||
|
diffTimeToMicroSeconds = (`div` 1000000) . diffTimeToPicoseconds
|
||||||
|
@ -18,6 +18,8 @@ import qualified Language.Haskell.TH.Syntax as TH
|
|||||||
|
|
||||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||||
import Data.FileEmbed (makeRelativeToProject)
|
import Data.FileEmbed (makeRelativeToProject)
|
||||||
|
import Data.Time.Clock (UTCTime)
|
||||||
|
|
||||||
|
|
||||||
import Hasura.Backends.Postgres.Connection
|
import Hasura.Backends.Postgres.Connection
|
||||||
import Hasura.Backends.Postgres.SQL.Types
|
import Hasura.Backends.Postgres.SQL.Types
|
||||||
@ -64,8 +66,8 @@ resolveDatabaseMetadata sourceConfig = runExceptT do
|
|||||||
pure $ ResolvedSource sourceConfig tablesMeta functionsMeta pgScalars
|
pure $ ResolvedSource sourceConfig tablesMeta functionsMeta pgScalars
|
||||||
|
|
||||||
-- | Initialise catalog tables for a source, including those required by the event delivery subsystem.
|
-- | Initialise catalog tables for a source, including those required by the event delivery subsystem.
|
||||||
initCatalogForSource :: MonadTx m => MaintenanceMode -> m ()
|
initCatalogForSource :: forall m . MonadTx m => MaintenanceMode -> UTCTime -> m ()
|
||||||
initCatalogForSource maintenanceMode = do
|
initCatalogForSource maintenanceMode migrationTime = do
|
||||||
hdbCatalogExist <- doesSchemaExist "hdb_catalog"
|
hdbCatalogExist <- doesSchemaExist "hdb_catalog"
|
||||||
eventLogTableExist <- doesTableExist "hdb_catalog" "event_log"
|
eventLogTableExist <- doesTableExist "hdb_catalog" "event_log"
|
||||||
sourceVersionTableExist <- doesTableExist "hdb_catalog" "hdb_source_catalog_version"
|
sourceVersionTableExist <- doesTableExist "hdb_catalog" "hdb_source_catalog_version"
|
||||||
@ -84,7 +86,10 @@ initCatalogForSource maintenanceMode = do
|
|||||||
-- Update the Source Catalog to v43 to include the new migration
|
-- Update the Source Catalog to v43 to include the new migration
|
||||||
-- changes. Skipping this step will result in errors.
|
-- changes. Skipping this step will result in errors.
|
||||||
currCatalogVersion <- liftTx getCatalogVersion
|
currCatalogVersion <- liftTx getCatalogVersion
|
||||||
|
-- we migrate to the 43 version, which is the migration where
|
||||||
|
-- metadata separation is introduced
|
||||||
migrateTo43 currCatalogVersion
|
migrateTo43 currCatalogVersion
|
||||||
|
setCatalogVersion "43" migrationTime
|
||||||
liftTx createVersionTable
|
liftTx createVersionTable
|
||||||
| otherwise -> migrateSourceCatalog
|
| otherwise -> migrateSourceCatalog
|
||||||
where
|
where
|
||||||
|
@ -86,6 +86,7 @@ import Hasura.Server.Migrate.Version (latestCatalogVersionStr
|
|||||||
import Hasura.Server.Types
|
import Hasura.Server.Types
|
||||||
import Hasura.Server.Version (HasVersion)
|
import Hasura.Server.Version (HasVersion)
|
||||||
|
|
||||||
|
|
||||||
data TriggerMetadata
|
data TriggerMetadata
|
||||||
= TriggerMetadata { tmName :: TriggerName }
|
= TriggerMetadata { tmName :: TriggerName }
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
@ -706,8 +707,14 @@ unlockEvents eventIds =
|
|||||||
getMaintenanceModeVersion :: Q.TxE QErr MaintenanceModeVersion
|
getMaintenanceModeVersion :: Q.TxE QErr MaintenanceModeVersion
|
||||||
getMaintenanceModeVersion = liftTx $ do
|
getMaintenanceModeVersion = liftTx $ do
|
||||||
catalogVersion <- getCatalogVersion -- From the user's DB
|
catalogVersion <- getCatalogVersion -- From the user's DB
|
||||||
|
-- the previous version and the current version will change depending
|
||||||
|
-- upon between which versions we need to support maintenance mode
|
||||||
if | catalogVersion == "40" -> pure PreviousMMVersion
|
if | catalogVersion == "40" -> pure PreviousMMVersion
|
||||||
|
-- The catalog is migrated to the 43rd version for a source
|
||||||
|
-- which was initialised by a v1 graphql-engine instance (See @initSource@).
|
||||||
|
| catalogVersion == "43" -> pure CurrentMMVersion
|
||||||
| catalogVersion == latestCatalogVersionString -> pure CurrentMMVersion
|
| catalogVersion == latestCatalogVersionString -> pure CurrentMMVersion
|
||||||
| otherwise ->
|
| otherwise ->
|
||||||
throw500 $
|
throw500 $
|
||||||
"Maintenance mode is only supported with catalog versions: 40 and " <> latestCatalogVersionString
|
"Maintenance mode is only supported with catalog versions: 40, 43 and "
|
||||||
|
<> tshow latestCatalogVersionString
|
||||||
|
@ -20,6 +20,7 @@ module Hasura.RQL.DDL.Schema.Cache
|
|||||||
|
|
||||||
import Hasura.Prelude
|
import Hasura.Prelude
|
||||||
|
|
||||||
|
import qualified Control.Retry as Retry
|
||||||
import qualified Data.Dependent.Map as DMap
|
import qualified Data.Dependent.Map as DMap
|
||||||
import qualified Data.Environment as Env
|
import qualified Data.Environment as Env
|
||||||
import qualified Data.HashMap.Strict.Extended as M
|
import qualified Data.HashMap.Strict.Extended as M
|
||||||
@ -35,8 +36,10 @@ import Control.Lens hiding ((.=))
|
|||||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||||
import Control.Monad.Unique
|
import Control.Monad.Unique
|
||||||
import Data.Aeson
|
import Data.Aeson
|
||||||
|
import Data.Either (isLeft)
|
||||||
import Data.Proxy
|
import Data.Proxy
|
||||||
import Data.Text.Extended
|
import Data.Text.Extended
|
||||||
|
import Data.Time.Clock (getCurrentTime)
|
||||||
import Network.HTTP.Client.Extended hiding (Proxy)
|
import Network.HTTP.Client.Extended hiding (Proxy)
|
||||||
|
|
||||||
import qualified Hasura.Incremental as Inc
|
import qualified Hasura.Incremental as Inc
|
||||||
@ -314,8 +317,25 @@ buildSchemaCacheRule env = proc (metadata, invalidationKeys) -> do
|
|||||||
when (numEventTriggers > 0) do
|
when (numEventTriggers > 0) do
|
||||||
case backendTag @b of
|
case backendTag @b of
|
||||||
Tag.PostgresVanillaTag -> do
|
Tag.PostgresVanillaTag -> do
|
||||||
|
migrationTime <- liftIO getCurrentTime
|
||||||
maintenanceMode <- _sccMaintenanceMode <$> askServerConfigCtx
|
maintenanceMode <- _sccMaintenanceMode <$> askServerConfigCtx
|
||||||
liftEither =<< runExceptT do runLazyTx (_pscExecCtx sc) Q.ReadWrite (initCatalogForSource maintenanceMode)
|
let
|
||||||
|
initCatalogAction =
|
||||||
|
runExceptT $ runLazyTx (_pscExecCtx sc) Q.ReadWrite (initCatalogForSource maintenanceMode migrationTime)
|
||||||
|
-- The `initCatalogForSource` action is retried here because
|
||||||
|
-- in cloud there will be multiple workers (graphql-engine instances)
|
||||||
|
-- trying to migrate the source catalog, when needed. This introduces
|
||||||
|
-- a race condition as both the workers try to migrate the source catalog
|
||||||
|
-- concurrently and when one of them succeeds the other ones will fail
|
||||||
|
-- and be in an inconsistent state. To avoid the inconsistency, we retry
|
||||||
|
-- migrating the catalog on error and in the retry `initCatalogForSource`
|
||||||
|
-- will see that the catalog is already migrated, so it won't attempt the
|
||||||
|
-- migration again
|
||||||
|
liftEither =<< Retry.retrying
|
||||||
|
(Retry.constantDelay (fromIntegral $ diffTimeToMicroSeconds $ seconds $ Seconds 10)
|
||||||
|
<> Retry.limitRetries 3)
|
||||||
|
(const $ return . isLeft)
|
||||||
|
(const initCatalogAction)
|
||||||
_ -> pure ()
|
_ -> pure ()
|
||||||
|
|
||||||
buildSource
|
buildSource
|
||||||
|
@ -224,13 +224,6 @@ downgradeCatalog defaultSourceConfig opts time = do
|
|||||||
| x == upper = Right [y]
|
| x == upper = Right [y]
|
||||||
| otherwise = (y:) <$> dropOlderDowngrades xs
|
| otherwise = (y:) <$> dropOlderDowngrades xs
|
||||||
|
|
||||||
setCatalogVersion :: MonadTx m => Text -> UTCTime -> m ()
|
|
||||||
setCatalogVersion ver time = liftTx $ Q.unitQE defaultTxErrorHandler [Q.sql|
|
|
||||||
INSERT INTO hdb_catalog.hdb_version (version, upgraded_on) VALUES ($1, $2)
|
|
||||||
ON CONFLICT ((version IS NOT NULL))
|
|
||||||
DO UPDATE SET version = $1, upgraded_on = $2
|
|
||||||
|] (ver, time) False
|
|
||||||
|
|
||||||
migrations
|
migrations
|
||||||
:: forall m. (MonadIO m, MonadTx m)
|
:: forall m. (MonadIO m, MonadTx m)
|
||||||
=> Maybe (SourceConnConfiguration ('Postgres 'Vanilla)) -> Bool -> MaintenanceMode -> [(Text, MigrationPair m)]
|
=> Maybe (SourceConnConfiguration ('Postgres 'Vanilla)) -> Bool -> MaintenanceMode -> [(Text, MigrationPair m)]
|
||||||
|
@ -2,10 +2,13 @@ module Hasura.Server.Migrate.Internal
|
|||||||
( runTx
|
( runTx
|
||||||
, getCatalogVersion
|
, getCatalogVersion
|
||||||
, from3To4
|
, from3To4
|
||||||
|
, setCatalogVersion
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Hasura.Prelude
|
import Hasura.Prelude
|
||||||
|
|
||||||
|
import Data.Time.Clock (UTCTime)
|
||||||
|
|
||||||
import qualified Data.Aeson as A
|
import qualified Data.Aeson as A
|
||||||
import qualified Database.PG.Query as Q
|
import qualified Database.PG.Query as Q
|
||||||
|
|
||||||
@ -49,3 +52,10 @@ from3To4 = liftTx $ Q.catchE defaultTxErrorHandler $ do
|
|||||||
configuration = $1
|
configuration = $1
|
||||||
WHERE name = $2
|
WHERE name = $2
|
||||||
|] (Q.AltJ $ A.toJSON etc, name) True
|
|] (Q.AltJ $ A.toJSON etc, name) True
|
||||||
|
|
||||||
|
setCatalogVersion :: MonadTx m => Text -> UTCTime -> m ()
|
||||||
|
setCatalogVersion ver time = liftTx $ Q.unitQE defaultTxErrorHandler [Q.sql|
|
||||||
|
INSERT INTO hdb_catalog.hdb_version (version, upgraded_on) VALUES ($1, $2)
|
||||||
|
ON CONFLICT ((version IS NOT NULL))
|
||||||
|
DO UPDATE SET version = $1, upgraded_on = $2
|
||||||
|
|] (ver, time) False
|
||||||
|
Loading…
Reference in New Issue
Block a user