server: V2 maintenance mode fixes

GitOrigin-RevId: dcd69fb217a4e976523f1bc8e7b2693111091002
This commit is contained in:
Karthikeyan Chinnakonda 2021-04-21 16:25:18 +05:30 committed by hasura-bot
parent cc24e50515
commit d09b00040f
10 changed files with 272 additions and 103 deletions

View File

@ -530,8 +530,14 @@ runHGEServer setupHook env ServeOptions{..} ServeCtx{..} initTime postPollHook s
unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers" unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers"
_eventQueueThread <- C.forkManagedT "processEventQueue" logger $ _eventQueueThread <- C.forkManagedT "processEventQueue" logger $
processEventQueue logger logEnvHeaders processEventQueue logger
_scHttpManager (getSCFromRef cacheRef) eventEngineCtx lockedEventsCtx serverMetrics logEnvHeaders
_scHttpManager
(getSCFromRef cacheRef)
eventEngineCtx
lockedEventsCtx
serverMetrics
soEnableMaintenanceMode
-- start a backgroud thread to handle async actions -- start a backgroud thread to handle async actions
case soAsyncActionsFetchInterval of case soAsyncActionsFetchInterval of

View File

@ -27,6 +27,7 @@ import Hasura.RQL.Types.Error
import Hasura.RQL.Types.Source import Hasura.RQL.Types.Source
import Hasura.RQL.Types.Table import Hasura.RQL.Types.Table
import Hasura.SQL.Backend import Hasura.SQL.Backend
import Hasura.Server.Types (MaintenanceMode)
resolveSourceConfig :: resolveSourceConfig ::
@ -52,8 +53,9 @@ resolveSourceConfig _name BigQueryConnSourceConfig{..} = runExceptT $ do
resolveSource resolveSource
:: (MonadIO m) :: (MonadIO m)
=> BigQuerySourceConfig => BigQuerySourceConfig
-> MaintenanceMode
-> m (Either QErr (ResolvedSource 'BigQuery)) -> m (Either QErr (ResolvedSource 'BigQuery))
resolveSource sourceConfig = resolveSource sourceConfig _maintenanceMode =
runExceptT $ do runExceptT $ do
result <- getTables sourceConfig result <- getTables sourceConfig
case result of case result of

View File

@ -13,6 +13,7 @@ import Hasura.RQL.Types.Common
import Hasura.RQL.Types.Error import Hasura.RQL.Types.Error
import Hasura.RQL.Types.Source import Hasura.RQL.Types.Source
import Hasura.SQL.Backend import Hasura.SQL.Backend
import Hasura.Server.Types (MaintenanceMode)
resolveSourceConfig resolveSourceConfig
:: (MonadIO m) :: (MonadIO m)
@ -26,8 +27,9 @@ resolveSourceConfig _name (MSSQLConnConfiguration connInfo) = runExceptT do
resolveDatabaseMetadata resolveDatabaseMetadata
:: (MonadIO m) :: (MonadIO m)
=> MSSQLSourceConfig => MSSQLSourceConfig
-> MaintenanceMode
-> m (Either QErr (ResolvedSource 'MSSQL)) -> m (Either QErr (ResolvedSource 'MSSQL))
resolveDatabaseMetadata config = runExceptT do resolveDatabaseMetadata config _maintenanceMode = runExceptT do
dbTablesMetadata <- loadDBMetadata pool dbTablesMetadata <- loadDBMetadata pool
pure $ ResolvedSource config dbTablesMetadata mempty mempty pure $ ResolvedSource config dbTablesMetadata mempty mempty
where where

View File

@ -24,6 +24,7 @@ import Hasura.RQL.Types.Source
import Hasura.RQL.Types.Table import Hasura.RQL.Types.Table
import Hasura.SQL.Backend import Hasura.SQL.Backend
import Hasura.Server.Migrate.Internal import Hasura.Server.Migrate.Internal
import Hasura.Server.Types (MaintenanceMode (..))
resolveSourceConfig resolveSourceConfig
:: (MonadIO m, MonadResolveSource m) :: (MonadIO m, MonadResolveSource m)
@ -34,23 +35,25 @@ resolveSourceConfig name config = runExceptT do
resolveDatabaseMetadata resolveDatabaseMetadata
:: (MonadIO m, MonadBaseControl IO m) :: (MonadIO m, MonadBaseControl IO m)
=> SourceConfig 'Postgres -> m (Either QErr (ResolvedSource 'Postgres)) => SourceConfig 'Postgres -> MaintenanceMode -> m (Either QErr (ResolvedSource 'Postgres))
resolveDatabaseMetadata sourceConfig = runExceptT do resolveDatabaseMetadata sourceConfig maintenanceMode = runExceptT do
(tablesMeta, functionsMeta, pgScalars) <- runLazyTx (_pscExecCtx sourceConfig) Q.ReadWrite $ do (tablesMeta, functionsMeta, pgScalars) <- runLazyTx (_pscExecCtx sourceConfig) Q.ReadWrite $ do
initSource initSource maintenanceMode
tablesMeta <- fetchTableMetadata tablesMeta <- fetchTableMetadata
functionsMeta <- fetchFunctionMetadata functionsMeta <- fetchFunctionMetadata
pgScalars <- fetchPgScalars pgScalars <- fetchPgScalars
pure (tablesMeta, functionsMeta, pgScalars) pure (tablesMeta, functionsMeta, pgScalars)
pure $ ResolvedSource sourceConfig tablesMeta functionsMeta pgScalars pure $ ResolvedSource sourceConfig tablesMeta functionsMeta pgScalars
initSource :: MonadTx m => m () initSource :: MonadTx m => MaintenanceMode -> m ()
initSource = do initSource maintenanceMode = do
hdbCatalogExist <- doesSchemaExist "hdb_catalog" hdbCatalogExist <- doesSchemaExist "hdb_catalog"
eventLogTableExist <- doesTableExist "hdb_catalog" "event_log" eventLogTableExist <- doesTableExist "hdb_catalog" "event_log"
sourceVersionTableExist <- doesTableExist "hdb_catalog" "hdb_source_catalog_version" sourceVersionTableExist <- doesTableExist "hdb_catalog" "hdb_source_catalog_version"
-- when maintenance mode is enabled, don't perform any migrations
if | maintenanceMode == MaintenanceModeEnabled -> pure ()
-- Fresh database -- Fresh database
if | not hdbCatalogExist -> liftTx do | not hdbCatalogExist -> liftTx do
Q.unitQE defaultTxErrorHandler "CREATE SCHEMA hdb_catalog" () False Q.unitQE defaultTxErrorHandler "CREATE SCHEMA hdb_catalog" () False
enablePgcryptoExtension enablePgcryptoExtension
initPgSourceCatalog initPgSourceCatalog
@ -61,7 +64,7 @@ initSource = do
| not sourceVersionTableExist && eventLogTableExist -> do | not sourceVersionTableExist && eventLogTableExist -> do
-- Update the Source Catalog to v43 to include the new migration -- Update the Source Catalog to v43 to include the new migration
-- changes. Skipping this step will result in errors. -- changes. Skipping this step will result in errors.
currCatalogVersion <- getCatalogVersion currCatalogVersion <- liftTx getCatalogVersion
migrateTo43 currCatalogVersion migrateTo43 currCatalogVersion
liftTx createVersionTable liftTx createVersionTable
| otherwise -> migrateSourceCatalog | otherwise -> migrateSourceCatalog

View File

@ -33,7 +33,6 @@ failed requests at a regular (user-configurable) interval.
module Hasura.Eventing.EventTrigger module Hasura.Eventing.EventTrigger
( initEventEngineCtx ( initEventEngineCtx
, processEventQueue , processEventQueue
, unlockAllEvents
, defaultMaxEventThreads , defaultMaxEventThreads
, defaultFetchInterval , defaultFetchInterval
, Event(..) , Event(..)
@ -81,6 +80,9 @@ import Hasura.HTTP
import Hasura.RQL.DDL.Headers import Hasura.RQL.DDL.Headers
import Hasura.RQL.Types import Hasura.RQL.Types
import Hasura.Server.Init.Config import Hasura.Server.Init.Config
import Hasura.Server.Migrate.Internal (getCatalogVersion)
import Hasura.Server.Migrate.Version (latestCatalogVersionString)
import Hasura.Server.Types
import Hasura.Server.Version (HasVersion) import Hasura.Server.Version (HasVersion)
data TriggerMetadata data TriggerMetadata
@ -96,6 +98,43 @@ newtype EventInternalErr
instance L.ToEngineLog EventInternalErr L.Hasura where instance L.ToEngineLog EventInternalErr L.Hasura where
toEngineLog (EventInternalErr qerr) = (L.LevelError, L.eventTriggerLogType, toJSON qerr) toEngineLog (EventInternalErr qerr) = (L.LevelError, L.eventTriggerLogType, toJSON qerr)
{- Note [Maintenance mode]
~~~~~~~~~~~~~~~~~~~~~~~~~~
Maintenance mode is a mode in which users can upgrade their graphql-engine
without any down time. More on maintenance mode can be found here:
https://github.com/hasura/graphql-engine-mono/issues/431.
Basically, there are a few main things that maintenance mode boils down to:
1. No operation that may change the metadata will be allowed.
2. Migrations are not applied when the graphql-engine is started, so the
catalog schema will be in the older version.
3. Event triggers should continue working in the new code with the older
catalog schema i.e it should work even if there are any schema changes
to the `hdb_catalog.event_log` table.
#1 and #2 are fairly self-explanatory. For #3, we need to support fetching
events depending upon the catalog version. So, fetch events works in the
following way now:
1. Check if maintenance mode is enabled
2. If maintenance mode is enabled then read the catalog version from the DB
and accordingly fire the appropriate query to the events log table.
When maintenance mode is disabled, we query the events log table according
to the latest catalog, we do not read the catalog version for this.
-}
-- | See Note [Maintenance Mode]
--
data MaintenanceModeVersion
= PreviousMMVersion
-- ^ should correspond to the catalog version from which the user
-- is migrating from
| CurrentMMVersion
-- ^ should correspond to the latest catalog version
deriving (Show, Eq)
-- | Change data for a particular row -- | Change data for a particular row
-- --
-- https://docs.hasura.io/1.0/graphql/manual/event-triggers/payload.html -- https://docs.hasura.io/1.0/graphql/manual/event-triggers/payload.html
@ -186,14 +225,17 @@ processEventQueue
-> EventEngineCtx -> EventEngineCtx
-> LockedEventsCtx -> LockedEventsCtx
-> ServerMetrics -> ServerMetrics
-> MaintenanceMode
-> m void -> m void
processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..} LockedEventsCtx{leEvents} serverMetrics = do processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..} LockedEventsCtx{leEvents} serverMetrics maintenanceMode = do
events0 <- popEventsBatch events0 <- popEventsBatch
-- Track number of events fetched in EKG -- Track number of events fetched in EKG
_ <- liftIO $ EKG.Distribution.add (smNumEventsFetched serverMetrics) (fromIntegral $ length events0) _ <- liftIO $ EKG.Distribution.add (smNumEventsFetched serverMetrics) (fromIntegral $ length events0)
go events0 0 False go events0 0 False
where where
fetchBatchSize = 100 fetchBatchSize = 100
popEventsBatch :: m [EventWithSource]
popEventsBatch = do popEventsBatch = do
{- {-
SELECT FOR UPDATE .. SKIP LOCKED can throw serialization errors in RepeatableRead: https://stackoverflow.com/a/53289263/1911889 SELECT FOR UPDATE .. SKIP LOCKED can throw serialization errors in RepeatableRead: https://stackoverflow.com/a/53289263/1911889
@ -206,19 +248,31 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..}
(delivered=t or error=t or archived=t) after a fixed number of tries (assuming it begins with locked='f'). (delivered=t or error=t or archived=t) after a fixed number of tries (assuming it begins with locked='f').
-} -}
pgSources <- scSources <$> liftIO getSchemaCache pgSources <- scSources <$> liftIO getSchemaCache
fmap concat $ forM (M.toList pgSources) $ \(sourceName, sourceCache) -> liftIO $ fmap concat $ forM (M.toList pgSources) $ \(sourceName, sourceCache) ->
case unsafeSourceConfiguration @'Postgres sourceCache of case unsafeSourceConfiguration @'Postgres sourceCache of
Nothing -> pure [] Nothing -> pure []
Just sourceConfig -> Just sourceConfig -> do
liftIO $ runPgSourceWriteTx sourceConfig (fetchEvents sourceName fetchBatchSize) >>= \case fetchEventsTxE <-
Left err -> do case maintenanceMode of
liftIO $ L.unLogger logger $ EventInternalErr err MaintenanceModeEnabled -> do
return [] maintenanceModeVersion <- runPgSourceReadTx sourceConfig getMaintenanceModeVersion
Right events -> do pure $ fmap (fetchEventsMaintenanceMode sourceName fetchBatchSize) maintenanceModeVersion
-- The time when the events were fetched. This is used to calculate the average lock time of an event. MaintenanceModeDisabled -> return $ Right $ fetchEvents sourceName fetchBatchSize
eventsFetchedTime <- liftIO getCurrentTime liftIO $ do
saveLockedEvents (map eId events) leEvents case fetchEventsTxE of
return $ map (, sourceConfig, eventsFetchedTime) events Left err -> do
liftIO $ L.unLogger logger $ EventInternalErr err
return []
Right fetchEventsTx ->
runPgSourceWriteTx sourceConfig fetchEventsTx >>= \case
Left err -> do
liftIO $ L.unLogger logger $ EventInternalErr err
return []
Right events -> do
-- The time when the events were fetched. This is used to calculate the average lock time of an event.
eventsFetchedTime <- liftIO getCurrentTime
saveLockedEvents (map eId events) leEvents
return $ map (, sourceConfig, eventsFetchedTime) events
-- work on this batch of events while prefetching the next. Recurse after we've forked workers -- work on this batch of events while prefetching the next. Recurse after we've forked workers
-- for each in the batch, minding the requested pool size. -- for each in the batch, minding the requested pool size.
@ -255,7 +309,6 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..}
"or we're working on a backlog of events. Consider increasing " <> "or we're working on a backlog of events. Consider increasing " <>
"HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE" "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE"
go eventsNext (fullFetchCount+1) (alreadyWarned || clearlyBehind) go eventsNext (fullFetchCount+1) (alreadyWarned || clearlyBehind)
| otherwise -> do | otherwise -> do
when (lenEvents /= fetchBatchSize && alreadyWarned) $ when (lenEvents /= fetchBatchSize && alreadyWarned) $
-- emit as warning in case users are only logging warning severity and saw above -- emit as warning in case users are only logging warning severity and saw above
@ -294,42 +347,52 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..}
Tracing.runTraceTInContext Tracing.runTraceTInContext
tracingCtx tracingCtx
case getEventTriggerInfoFromEvent cache e of maintenanceModeVersionEither :: Either QErr (Maybe MaintenanceModeVersion) <-
Left err -> do case maintenanceMode of
-- This rare error can happen in the following known cases: MaintenanceModeEnabled -> do
-- i) schema cache is not up-to-date (due to some bug, say during schema syncing across multiple instances) maintenanceModeVersion <-
-- ii) the event trigger is dropped when this event was just fetched liftIO $ runPgSourceReadTx sourceConfig getMaintenanceModeVersion
logQErr $ err500 Unexpected err return $ Just <$> maintenanceModeVersion
liftIO $ runPgSourceWriteTx sourceConfig $ do MaintenanceModeDisabled -> return $ Right Nothing
currentTime <- liftIO getCurrentTime
-- For such an event, we unlock the event and retry after a minute
setRetry e (addUTCTime 60 currentTime)
>>= flip onLeft logQErr
Right eti -> runTraceT (spanName eti) do
let webhook = T.unpack $ wciCachedValue $ etiWebhookInfo eti
retryConf = etiRetryConf eti
timeoutSeconds = fromMaybe defaultTimeoutSeconds (rcTimeoutSec retryConf)
responseTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000)
headerInfos = etiHeaders eti
etHeaders = map encodeHeader headerInfos
headers = addDefaultHeaders etHeaders
ep = createEventPayload retryConf e
payload = encode $ toJSON ep
extraLogCtx = ExtraLogContext Nothing (epId ep) -- avoiding getting current time here to avoid another IO call with each event call
requestDetails = RequestDetails $ LBS.length payload
-- Track the number of active HTTP workers using EKG. case maintenanceModeVersionEither of
res <- bracket_ Left maintenanceModeVersionErr -> logQErr maintenanceModeVersionErr
(liftIO $ EKG.Gauge.inc $ smNumEventHTTPWorkers serverMetrics) Right maintenanceModeVersion ->
(liftIO $ EKG.Gauge.dec $ smNumEventHTTPWorkers serverMetrics) case getEventTriggerInfoFromEvent cache e of
(runExceptT $ tryWebhook headers responseTimeout payload webhook) Left err -> do
-- This rare error can happen in the following known cases:
-- i) schema cache is not up-to-date (due to some bug, say during schema syncing across multiple instances)
-- ii) the event trigger is dropped when this event was just fetched
logQErr $ err500 Unexpected err
liftIO (runPgSourceWriteTx sourceConfig $ do
currentTime <- liftIO getCurrentTime
-- For such an event, we unlock the event and retry after a minute
setRetry e (addUTCTime 60 currentTime) maintenanceModeVersion)
>>= flip onLeft logQErr
Right eti -> runTraceT (spanName eti) do
let webhook = T.unpack $ wciCachedValue $ etiWebhookInfo eti
retryConf = etiRetryConf eti
timeoutSeconds = fromMaybe defaultTimeoutSeconds (rcTimeoutSec retryConf)
responseTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000)
headerInfos = etiHeaders eti
etHeaders = map encodeHeader headerInfos
headers = addDefaultHeaders etHeaders
ep = createEventPayload retryConf e
payload = encode $ toJSON ep
extraLogCtx = ExtraLogContext Nothing (epId ep) -- avoiding getting current time here to avoid another IO call with each event call
requestDetails = RequestDetails $ LBS.length payload
logHTTPForET res extraLogCtx requestDetails -- Track the number of active HTTP workers using EKG.
let decodedHeaders = map (decodeHeader logenv headerInfos) headers res <- bracket_
either (liftIO $ EKG.Gauge.inc $ smNumEventHTTPWorkers serverMetrics)
(processError sourceConfig e retryConf decodedHeaders ep) (liftIO $ EKG.Gauge.dec $ smNumEventHTTPWorkers serverMetrics)
(processSuccess sourceConfig e decodedHeaders ep) res (runExceptT $ tryWebhook headers responseTimeout payload webhook)
>>= flip onLeft logQErr logHTTPForET res extraLogCtx requestDetails
let decodedHeaders = map (decodeHeader logenv headerInfos) headers
either
(processError sourceConfig e retryConf decodedHeaders ep maintenanceModeVersion)
(processSuccess sourceConfig e decodedHeaders ep maintenanceModeVersion) res
>>= flip onLeft logQErr
withEventEngineCtx :: withEventEngineCtx ::
( MonadIO m ( MonadIO m
@ -363,22 +426,33 @@ createEventPayload retryConf e = EventPayload
processSuccess processSuccess
:: ( MonadIO m ) :: ( MonadIO m )
=> SourceConfig 'Postgres -> Event -> [HeaderConf] -> EventPayload -> HTTPResp a => SourceConfig 'Postgres
-> Event
-> [HeaderConf]
-> EventPayload
-> Maybe MaintenanceModeVersion
-> HTTPResp a
-> m (Either QErr ()) -> m (Either QErr ())
processSuccess sourceConfig e decodedHeaders ep resp = do processSuccess sourceConfig e decodedHeaders ep maintenanceModeVersion resp = do
let respBody = hrsBody resp let respBody = hrsBody resp
respHeaders = hrsHeaders resp respHeaders = hrsHeaders resp
respStatus = hrsStatus resp respStatus = hrsStatus resp
invocation = mkInvocation ep respStatus decodedHeaders respBody respHeaders invocation = mkInvocation ep respStatus decodedHeaders respBody respHeaders
liftIO $ runPgSourceWriteTx sourceConfig $ do liftIO $ runPgSourceWriteTx sourceConfig $ do
insertInvocation invocation insertInvocation invocation
setSuccess e setSuccess e maintenanceModeVersion
processError processError
:: ( MonadIO m ) :: ( MonadIO m )
=> SourceConfig 'Postgres -> Event -> RetryConf -> [HeaderConf] -> EventPayload -> HTTPErr a => SourceConfig 'Postgres
-> Event
-> RetryConf
-> [HeaderConf]
-> EventPayload
-> Maybe MaintenanceModeVersion
-> HTTPErr a
-> m (Either QErr ()) -> m (Either QErr ())
processError sourceConfig e retryConf decodedHeaders ep err = do processError sourceConfig e retryConf decodedHeaders ep maintenanceModeVersion err = do
let invocation = case err of let invocation = case err of
HClient excp -> do HClient excp -> do
let errMsg = TBS.fromLBS $ encode $ show excp let errMsg = TBS.fromLBS $ encode $ show excp
@ -396,10 +470,15 @@ processError sourceConfig e retryConf decodedHeaders ep err = do
mkInvocation ep 500 decodedHeaders errMsg [] mkInvocation ep 500 decodedHeaders errMsg []
liftIO $ runPgSourceWriteTx sourceConfig $ do liftIO $ runPgSourceWriteTx sourceConfig $ do
insertInvocation invocation insertInvocation invocation
retryOrSetError e retryConf err retryOrSetError e retryConf maintenanceModeVersion err
retryOrSetError :: Event -> RetryConf -> HTTPErr a -> Q.TxE QErr () retryOrSetError
retryOrSetError e retryConf err = do :: Event
-> RetryConf
-> Maybe MaintenanceModeVersion
-> HTTPErr a
-> Q.TxE QErr ()
retryOrSetError e retryConf maintenanceModeVersion err = do
let mretryHeader = getRetryAfterHeaderFromError err let mretryHeader = getRetryAfterHeaderFromError err
tries = eTries e tries = eTries e
mretryHeaderSeconds = mretryHeader >>= parseRetryHeader mretryHeaderSeconds = mretryHeader >>= parseRetryHeader
@ -408,13 +487,13 @@ retryOrSetError e retryConf err = do
-- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1 -- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1
if triesExhausted && noRetryHeader if triesExhausted && noRetryHeader
then do then do
setError e setError e maintenanceModeVersion
else do else do
currentTime <- liftIO getCurrentTime currentTime <- liftIO getCurrentTime
let delay = fromMaybe (rcIntervalSec retryConf) mretryHeaderSeconds let delay = fromMaybe (rcIntervalSec retryConf) mretryHeaderSeconds
diff = fromIntegral delay diff = fromIntegral delay
retryTime = addUTCTime diff currentTime retryTime = addUTCTime diff currentTime
setRetry e retryTime setRetry e retryTime maintenanceModeVersion
where where
getRetryAfterHeaderFromError (HStatus resp) = getRetryAfterHeaderFromResp resp getRetryAfterHeaderFromError (HStatus resp) = getRetryAfterHeaderFromResp resp
getRetryAfterHeaderFromError _ = Nothing getRetryAfterHeaderFromError _ = Nothing
@ -489,6 +568,35 @@ fetchEvents source limitI =
} }
limit = fromIntegral limitI :: Word64 limit = fromIntegral limitI :: Word64
fetchEventsMaintenanceMode :: SourceName -> Int -> MaintenanceModeVersion -> Q.TxE QErr [Event]
fetchEventsMaintenanceMode sourceName limitI = \case
PreviousMMVersion ->
map uncurryEvent <$> Q.listQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET locked = 't'
WHERE id IN ( SELECT l.id
FROM hdb_catalog.event_log l
WHERE l.delivered = 'f' and l.error = 'f' and l.locked = 'f'
and (l.next_retry_at is NULL or l.next_retry_at <= now())
and l.archived = 'f'
ORDER BY created_at
LIMIT $1
FOR UPDATE SKIP LOCKED )
RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at
|] (Identity limit) True
where uncurryEvent (id', sn, tn, trn, Q.AltJ payload, tries, created) =
Event
{ eId = id'
, eSource = SNDefault -- in v1, there'll only be the default source
, eTable = QualifiedObject sn tn
, eTrigger = TriggerMetadata trn
, eEvent = payload
, eTries = tries
, eCreatedAt = created
}
limit = fromIntegral limitI :: Word64
CurrentMMVersion -> fetchEvents sourceName limitI
insertInvocation :: Invocation 'EventType -> Q.TxE QErr () insertInvocation :: Invocation 'EventType -> Q.TxE QErr ()
insertInvocation invo = do insertInvocation invo = do
Q.unitQE defaultTxErrorHandler [Q.sql| Q.unitQE defaultTxErrorHandler [Q.sql|
@ -500,39 +608,64 @@ insertInvocation invo = do
, Q.AltJ $ toJSON $ iResponse invo) True , Q.AltJ $ toJSON $ iResponse invo) True
Q.unitQE defaultTxErrorHandler [Q.sql| Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log UPDATE hdb_catalog.event_log
SET tries = tries + 1 SET tries = tries + 1
WHERE id = $1 WHERE id = $1
|] (Identity $ iEventId invo) True |] (Identity $ iEventId invo) True
setSuccess :: Event -> Q.TxE QErr () setSuccess :: Event -> Maybe MaintenanceModeVersion -> Q.TxE QErr ()
setSuccess e = Q.unitQE defaultTxErrorHandler [Q.sql| setSuccess e = \case
UPDATE hdb_catalog.event_log Just PreviousMMVersion ->
SET delivered = 't', next_retry_at = NULL, locked = NULL Q.unitQE defaultTxErrorHandler [Q.sql|
WHERE id = $1 UPDATE hdb_catalog.event_log
|] (Identity $ eId e) True SET delivered = 't', next_retry_at = NULL, locked = 'f'
WHERE id = $1
|] (Identity $ eId e) True
Just CurrentMMVersion -> latestVersionSetSuccess
Nothing -> latestVersionSetSuccess
where
latestVersionSetSuccess =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET delivered = 't', next_retry_at = NULL, locked = NULL
WHERE id = $1
|] (Identity $ eId e) True
setError :: Event -> Q.TxE QErr () setError :: Event -> Maybe MaintenanceModeVersion -> Q.TxE QErr ()
setError e = Q.unitQE defaultTxErrorHandler [Q.sql| setError e = \case
UPDATE hdb_catalog.event_log Just PreviousMMVersion ->
SET error = 't', next_retry_at = NULL, locked = NULL Q.unitQE defaultTxErrorHandler [Q.sql|
WHERE id = $1 UPDATE hdb_catalog.event_log
|] (Identity $ eId e) True SET error = 't', next_retry_at = NULL, locked = 'f'
WHERE id = $1
|] (Identity $ eId e) True
Just CurrentMMVersion -> latestVersionSetError
Nothing -> latestVersionSetError
where
latestVersionSetError =
Q.unitQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.event_log
SET error = 't', next_retry_at = NULL, locked = NULL
WHERE id = $1
|] (Identity $ eId e) True
setRetry :: Event -> UTCTime -> Q.TxE QErr () setRetry :: Event -> UTCTime -> Maybe MaintenanceModeVersion -> Q.TxE QErr ()
setRetry e time = setRetry e time = \case
Q.unitQE defaultTxErrorHandler [Q.sql| Just PreviousMMVersion ->
UPDATE hdb_catalog.event_log Q.unitQE defaultTxErrorHandler [Q.sql|
SET next_retry_at = $1, locked = NULL UPDATE hdb_catalog.event_log
WHERE id = $2 SET next_retry_at = $1, locked = 'f'
|] (time, eId e) True WHERE id = $2
|] (time, eId e) True
unlockAllEvents :: Q.TxE QErr () Just CurrentMMVersion -> latestVersionSetRetry
unlockAllEvents = Nothing -> latestVersionSetRetry
Q.unitQE defaultTxErrorHandler [Q.sql| where
UPDATE hdb_catalog.event_log latestVersionSetRetry =
SET locked = NULL Q.unitQE defaultTxErrorHandler [Q.sql|
WHERE locked IS NOT NULL UPDATE hdb_catalog.event_log
|] () True SET next_retry_at = $1, locked = NULL
WHERE id = $2
|] (time, eId e) True
toInt64 :: (Integral a) => a -> Int64 toInt64 :: (Integral a) => a -> Int64
toInt64 = fromIntegral toInt64 = fromIntegral
@ -563,3 +696,12 @@ unlockEvents eventIds =
RETURNING *) RETURNING *)
SELECT count(*) FROM "cte" SELECT count(*) FROM "cte"
|] (Identity $ EventIdArray eventIds) True |] (Identity $ EventIdArray eventIds) True
getMaintenanceModeVersion :: Q.TxE QErr MaintenanceModeVersion
getMaintenanceModeVersion = liftTx $ do
catalogVersion <- getCatalogVersion -- From the user's DB
if | catalogVersion == "40" -> pure PreviousMMVersion
| catalogVersion == latestCatalogVersionString -> pure CurrentMMVersion
| otherwise ->
throw500 $
"Maintenance mode is only supported with catalog versions: 40 and " <> latestCatalogVersionString

View File

@ -262,6 +262,7 @@ buildSchemaCacheRule env = proc (metadata, invalidationKeys) -> do
, MonadIO m, MonadBaseControl IO m , MonadIO m, MonadBaseControl IO m
, MonadResolveSource m , MonadResolveSource m
, BackendMetadata b , BackendMetadata b
, HasServerConfigCtx m
) )
=> ( Inc.Dependency (HashMap SourceName Inc.InvalidationKey) => ( Inc.Dependency (HashMap SourceName Inc.InvalidationKey)
, SourceMetadata b , SourceMetadata b
@ -269,12 +270,13 @@ buildSchemaCacheRule env = proc (metadata, invalidationKeys) -> do
resolveSourceIfNeeded = Inc.cache proc (invalidationKeys, sourceMetadata) -> do resolveSourceIfNeeded = Inc.cache proc (invalidationKeys, sourceMetadata) -> do
let sourceName = _smName sourceMetadata let sourceName = _smName sourceMetadata
metadataObj = MetadataObject (MOSource sourceName) $ toJSON sourceName metadataObj = MetadataObject (MOSource sourceName) $ toJSON sourceName
maintenanceMode <- bindA -< _sccMaintenanceMode <$> askServerConfigCtx
maybeSourceConfig <- getSourceConfigIfNeeded -< (invalidationKeys, sourceName, _smConfiguration sourceMetadata) maybeSourceConfig <- getSourceConfigIfNeeded -< (invalidationKeys, sourceName, _smConfiguration sourceMetadata)
case maybeSourceConfig of case maybeSourceConfig of
Nothing -> returnA -< Nothing Nothing -> returnA -< Nothing
Just sourceConfig -> Just sourceConfig ->
(| withRecordInconsistency ( (| withRecordInconsistency (
liftEitherA <<< bindA -< resolveDatabaseMetadata sourceConfig) liftEitherA <<< bindA -< resolveDatabaseMetadata sourceConfig maintenanceMode)
|) metadataObj |) metadataObj
buildSource buildSource

View File

@ -63,6 +63,7 @@ class (Backend b) => BackendMetadata (b :: BackendType) where
resolveDatabaseMetadata resolveDatabaseMetadata
:: (MonadIO m, MonadBaseControl IO m, MonadResolveSource m) :: (MonadIO m, MonadBaseControl IO m, MonadResolveSource m)
=> SourceConfig b => SourceConfig b
-> MaintenanceMode
-> m (Either QErr (ResolvedSource b)) -> m (Either QErr (ResolvedSource b))
createTableEventTrigger createTableEventTrigger

View File

@ -105,19 +105,23 @@ migrateCatalog
migrateCatalog maybeDefaultSourceConfig maintenanceMode migrationTime = do migrateCatalog maybeDefaultSourceConfig maintenanceMode migrationTime = do
catalogSchemaExists <- doesSchemaExist (SchemaName "hdb_catalog") catalogSchemaExists <- doesSchemaExist (SchemaName "hdb_catalog")
versionTableExists <- doesTableExist (SchemaName "hdb_catalog") (TableName "hdb_version") versionTableExists <- doesTableExist (SchemaName "hdb_catalog") (TableName "hdb_version")
metadataTableExists <- doesTableExist (SchemaName "hdb_catalog") (TableName "hdb_metadata")
migrationResult <- migrationResult <-
if | maintenanceMode == MaintenanceModeEnabled -> do if | maintenanceMode == MaintenanceModeEnabled -> do
if | not catalogSchemaExists -> if | not catalogSchemaExists ->
throw500 "unexpected: hdb_catalog schema not found in maintenance mode" throw500 "unexpected: hdb_catalog schema not found in maintenance mode"
| not versionTableExists -> | not versionTableExists ->
throw500 "unexpected: hdb_catalog.hdb_version table not found in maintenance mode" throw500 "unexpected: hdb_catalog.hdb_version table not found in maintenance mode"
-- TODO: should we also have a check for the catalog version? | not metadataTableExists ->
throw500 $
"the \"hdb_catalog.hdb_metadata\" table is expected to exist and contain" <>
" the metadata of the graphql-engine"
| otherwise -> pure MRMaintanenceMode | otherwise -> pure MRMaintanenceMode
| otherwise -> case catalogSchemaExists of | otherwise -> case catalogSchemaExists of
False -> initialize True False -> initialize True
True -> case versionTableExists of True -> case versionTableExists of
False -> initialize False False -> initialize False
True -> migrateFrom =<< getCatalogVersion True -> migrateFrom =<< liftTx getCatalogVersion
metadata <- liftTx fetchMetadataFromCatalog metadata <- liftTx fetchMetadataFromCatalog
pure (migrationResult, metadata) pure (migrationResult, metadata)
where where
@ -166,7 +170,7 @@ downgradeCatalog
=> Maybe (SourceConnConfiguration 'Postgres) => Maybe (SourceConnConfiguration 'Postgres)
-> DowngradeOptions -> UTCTime -> m MigrationResult -> DowngradeOptions -> UTCTime -> m MigrationResult
downgradeCatalog defaultSourceConfig opts time = do downgradeCatalog defaultSourceConfig opts time = do
downgradeFrom =<< getCatalogVersion downgradeFrom =<< liftTx getCatalogVersion
where where
-- downgrades an existing catalog to the specified version -- downgrades an existing catalog to the specified version
downgradeFrom :: Text -> m MigrationResult downgradeFrom :: Text -> m MigrationResult

View File

@ -6,6 +6,7 @@ module Hasura.Server.Migrate.Internal
where where
import Hasura.Backends.Postgres.Connection import Hasura.Backends.Postgres.Connection
import Hasura.Prelude import Hasura.Prelude
import Hasura.RQL.Types.Error
import Hasura.RQL.Types.EventTrigger import Hasura.RQL.Types.EventTrigger
import qualified Data.Aeson as A import qualified Data.Aeson as A
@ -16,8 +17,8 @@ runTx = liftTx . Q.multiQE defaultTxErrorHandler
-- | The old 0.8 catalog version is non-integral, so we store it in the database as a -- | The old 0.8 catalog version is non-integral, so we store it in the database as a
-- string. -- string.
getCatalogVersion :: MonadTx m => m Text getCatalogVersion :: Q.TxE QErr Text
getCatalogVersion = liftTx $ runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler getCatalogVersion = runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler
[Q.sql| SELECT version FROM hdb_catalog.hdb_version |] () False [Q.sql| SELECT version FROM hdb_catalog.hdb_version |] () False
from3To4 :: MonadTx m => m () from3To4 :: MonadTx m => m ()

View File

@ -39,7 +39,13 @@ DROP VIEW hdb_catalog.hdb_cron_events_stats;
DROP TABLE hdb_catalog.hdb_cron_triggers; DROP TABLE hdb_catalog.hdb_cron_triggers;
-- Create table which stores metadata JSON blob -- Create table which stores metadata JSON blob
CREATE TABLE hdb_catalog.hdb_metadata -- The "IF NOT EXISTS" is added due to the introduction of maintenance mode
-- in which migrations are not applied on startup but the 'hdb_catalog.hdb_table'
-- is expected to exist and contain the metadata of the graphql-engine. Now, when
-- the graphql-engine is run in normal mode (with maintenance mode disabled) this
-- migration file will be run and since this table already exists, we should add
-- the "IF NOT EXISTS" clause to avoid a migration error
CREATE TABLE IF NOT EXISTS hdb_catalog.hdb_metadata
( (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
metadata JSON NOT NULL metadata JSON NOT NULL