diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index 2297833aece..954f4e3869b 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -530,8 +530,14 @@ runHGEServer setupHook env ServeOptions{..} ServeCtx{..} initTime postPollHook s unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers" _eventQueueThread <- C.forkManagedT "processEventQueue" logger $ - processEventQueue logger logEnvHeaders - _scHttpManager (getSCFromRef cacheRef) eventEngineCtx lockedEventsCtx serverMetrics + processEventQueue logger + logEnvHeaders + _scHttpManager + (getSCFromRef cacheRef) + eventEngineCtx + lockedEventsCtx + serverMetrics + soEnableMaintenanceMode -- start a backgroud thread to handle async actions case soAsyncActionsFetchInterval of diff --git a/server/src-lib/Hasura/Backends/BigQuery/DDL/Source.hs b/server/src-lib/Hasura/Backends/BigQuery/DDL/Source.hs index 6b5e32749d4..69b16f6af81 100644 --- a/server/src-lib/Hasura/Backends/BigQuery/DDL/Source.hs +++ b/server/src-lib/Hasura/Backends/BigQuery/DDL/Source.hs @@ -27,6 +27,7 @@ import Hasura.RQL.Types.Error import Hasura.RQL.Types.Source import Hasura.RQL.Types.Table import Hasura.SQL.Backend +import Hasura.Server.Types (MaintenanceMode) resolveSourceConfig :: @@ -52,8 +53,9 @@ resolveSourceConfig _name BigQueryConnSourceConfig{..} = runExceptT $ do resolveSource :: (MonadIO m) => BigQuerySourceConfig + -> MaintenanceMode -> m (Either QErr (ResolvedSource 'BigQuery)) -resolveSource sourceConfig = +resolveSource sourceConfig _maintenanceMode = runExceptT $ do result <- getTables sourceConfig case result of diff --git a/server/src-lib/Hasura/Backends/MSSQL/DDL/Source.hs b/server/src-lib/Hasura/Backends/MSSQL/DDL/Source.hs index 3b09ec935ec..099437a2a72 100644 --- a/server/src-lib/Hasura/Backends/MSSQL/DDL/Source.hs +++ b/server/src-lib/Hasura/Backends/MSSQL/DDL/Source.hs @@ -13,6 +13,7 @@ import Hasura.RQL.Types.Common import Hasura.RQL.Types.Error import Hasura.RQL.Types.Source import Hasura.SQL.Backend +import Hasura.Server.Types (MaintenanceMode) resolveSourceConfig :: (MonadIO m) @@ -26,8 +27,9 @@ resolveSourceConfig _name (MSSQLConnConfiguration connInfo) = runExceptT do resolveDatabaseMetadata :: (MonadIO m) => MSSQLSourceConfig + -> MaintenanceMode -> m (Either QErr (ResolvedSource 'MSSQL)) -resolveDatabaseMetadata config = runExceptT do +resolveDatabaseMetadata config _maintenanceMode = runExceptT do dbTablesMetadata <- loadDBMetadata pool pure $ ResolvedSource config dbTablesMetadata mempty mempty where diff --git a/server/src-lib/Hasura/Backends/Postgres/DDL/Source.hs b/server/src-lib/Hasura/Backends/Postgres/DDL/Source.hs index 2a744e8b7cf..452372885df 100644 --- a/server/src-lib/Hasura/Backends/Postgres/DDL/Source.hs +++ b/server/src-lib/Hasura/Backends/Postgres/DDL/Source.hs @@ -24,6 +24,7 @@ import Hasura.RQL.Types.Source import Hasura.RQL.Types.Table import Hasura.SQL.Backend import Hasura.Server.Migrate.Internal +import Hasura.Server.Types (MaintenanceMode (..)) resolveSourceConfig :: (MonadIO m, MonadResolveSource m) @@ -34,23 +35,25 @@ resolveSourceConfig name config = runExceptT do resolveDatabaseMetadata :: (MonadIO m, MonadBaseControl IO m) - => SourceConfig 'Postgres -> m (Either QErr (ResolvedSource 'Postgres)) -resolveDatabaseMetadata sourceConfig = runExceptT do + => SourceConfig 'Postgres -> MaintenanceMode -> m (Either QErr (ResolvedSource 'Postgres)) +resolveDatabaseMetadata sourceConfig maintenanceMode = runExceptT do (tablesMeta, functionsMeta, pgScalars) <- runLazyTx (_pscExecCtx sourceConfig) Q.ReadWrite $ do - initSource + initSource maintenanceMode tablesMeta <- fetchTableMetadata functionsMeta <- fetchFunctionMetadata pgScalars <- fetchPgScalars pure (tablesMeta, functionsMeta, pgScalars) pure $ ResolvedSource sourceConfig tablesMeta functionsMeta pgScalars -initSource :: MonadTx m => m () -initSource = do +initSource :: MonadTx m => MaintenanceMode -> m () +initSource maintenanceMode = do hdbCatalogExist <- doesSchemaExist "hdb_catalog" eventLogTableExist <- doesTableExist "hdb_catalog" "event_log" sourceVersionTableExist <- doesTableExist "hdb_catalog" "hdb_source_catalog_version" + -- when maintenance mode is enabled, don't perform any migrations + if | maintenanceMode == MaintenanceModeEnabled -> pure () -- Fresh database - if | not hdbCatalogExist -> liftTx do + | not hdbCatalogExist -> liftTx do Q.unitQE defaultTxErrorHandler "CREATE SCHEMA hdb_catalog" () False enablePgcryptoExtension initPgSourceCatalog @@ -61,7 +64,7 @@ initSource = do | not sourceVersionTableExist && eventLogTableExist -> do -- Update the Source Catalog to v43 to include the new migration -- changes. Skipping this step will result in errors. - currCatalogVersion <- getCatalogVersion + currCatalogVersion <- liftTx getCatalogVersion migrateTo43 currCatalogVersion liftTx createVersionTable | otherwise -> migrateSourceCatalog diff --git a/server/src-lib/Hasura/Eventing/EventTrigger.hs b/server/src-lib/Hasura/Eventing/EventTrigger.hs index 2bd701a4da1..f251f69fb2d 100644 --- a/server/src-lib/Hasura/Eventing/EventTrigger.hs +++ b/server/src-lib/Hasura/Eventing/EventTrigger.hs @@ -33,7 +33,6 @@ failed requests at a regular (user-configurable) interval. module Hasura.Eventing.EventTrigger ( initEventEngineCtx , processEventQueue - , unlockAllEvents , defaultMaxEventThreads , defaultFetchInterval , Event(..) @@ -81,6 +80,9 @@ import Hasura.HTTP import Hasura.RQL.DDL.Headers import Hasura.RQL.Types import Hasura.Server.Init.Config +import Hasura.Server.Migrate.Internal (getCatalogVersion) +import Hasura.Server.Migrate.Version (latestCatalogVersionString) +import Hasura.Server.Types import Hasura.Server.Version (HasVersion) data TriggerMetadata @@ -96,6 +98,43 @@ newtype EventInternalErr instance L.ToEngineLog EventInternalErr L.Hasura where toEngineLog (EventInternalErr qerr) = (L.LevelError, L.eventTriggerLogType, toJSON qerr) +{- Note [Maintenance mode] +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Maintenance mode is a mode in which users can upgrade their graphql-engine +without any down time. More on maintenance mode can be found here: +https://github.com/hasura/graphql-engine-mono/issues/431. + +Basically, there are a few main things that maintenance mode boils down to: + +1. No operation that may change the metadata will be allowed. +2. Migrations are not applied when the graphql-engine is started, so the + catalog schema will be in the older version. +3. Event triggers should continue working in the new code with the older + catalog schema i.e it should work even if there are any schema changes + to the `hdb_catalog.event_log` table. + +#1 and #2 are fairly self-explanatory. For #3, we need to support fetching +events depending upon the catalog version. So, fetch events works in the +following way now: + +1. Check if maintenance mode is enabled +2. If maintenance mode is enabled then read the catalog version from the DB + and accordingly fire the appropriate query to the events log table. + When maintenance mode is disabled, we query the events log table according + to the latest catalog, we do not read the catalog version for this. +-} + +-- | See Note [Maintenance Mode] +-- +data MaintenanceModeVersion + = PreviousMMVersion + -- ^ should correspond to the catalog version from which the user + -- is migrating from + | CurrentMMVersion + -- ^ should correspond to the latest catalog version + deriving (Show, Eq) + -- | Change data for a particular row -- -- https://docs.hasura.io/1.0/graphql/manual/event-triggers/payload.html @@ -186,14 +225,17 @@ processEventQueue -> EventEngineCtx -> LockedEventsCtx -> ServerMetrics + -> MaintenanceMode -> m void -processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..} LockedEventsCtx{leEvents} serverMetrics = do +processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..} LockedEventsCtx{leEvents} serverMetrics maintenanceMode = do events0 <- popEventsBatch -- Track number of events fetched in EKG _ <- liftIO $ EKG.Distribution.add (smNumEventsFetched serverMetrics) (fromIntegral $ length events0) go events0 0 False where fetchBatchSize = 100 + + popEventsBatch :: m [EventWithSource] popEventsBatch = do {- SELECT FOR UPDATE .. SKIP LOCKED can throw serialization errors in RepeatableRead: https://stackoverflow.com/a/53289263/1911889 @@ -206,19 +248,31 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..} (delivered=t or error=t or archived=t) after a fixed number of tries (assuming it begins with locked='f'). -} pgSources <- scSources <$> liftIO getSchemaCache - fmap concat $ forM (M.toList pgSources) $ \(sourceName, sourceCache) -> + liftIO $ fmap concat $ forM (M.toList pgSources) $ \(sourceName, sourceCache) -> case unsafeSourceConfiguration @'Postgres sourceCache of Nothing -> pure [] - Just sourceConfig -> - liftIO $ runPgSourceWriteTx sourceConfig (fetchEvents sourceName fetchBatchSize) >>= \case - Left err -> do - liftIO $ L.unLogger logger $ EventInternalErr err - return [] - Right events -> do - -- The time when the events were fetched. This is used to calculate the average lock time of an event. - eventsFetchedTime <- liftIO getCurrentTime - saveLockedEvents (map eId events) leEvents - return $ map (, sourceConfig, eventsFetchedTime) events + Just sourceConfig -> do + fetchEventsTxE <- + case maintenanceMode of + MaintenanceModeEnabled -> do + maintenanceModeVersion <- runPgSourceReadTx sourceConfig getMaintenanceModeVersion + pure $ fmap (fetchEventsMaintenanceMode sourceName fetchBatchSize) maintenanceModeVersion + MaintenanceModeDisabled -> return $ Right $ fetchEvents sourceName fetchBatchSize + liftIO $ do + case fetchEventsTxE of + Left err -> do + liftIO $ L.unLogger logger $ EventInternalErr err + return [] + Right fetchEventsTx -> + runPgSourceWriteTx sourceConfig fetchEventsTx >>= \case + Left err -> do + liftIO $ L.unLogger logger $ EventInternalErr err + return [] + Right events -> do + -- The time when the events were fetched. This is used to calculate the average lock time of an event. + eventsFetchedTime <- liftIO getCurrentTime + saveLockedEvents (map eId events) leEvents + return $ map (, sourceConfig, eventsFetchedTime) events -- work on this batch of events while prefetching the next. Recurse after we've forked workers -- for each in the batch, minding the requested pool size. @@ -255,7 +309,6 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..} "or we're working on a backlog of events. Consider increasing " <> "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE" go eventsNext (fullFetchCount+1) (alreadyWarned || clearlyBehind) - | otherwise -> do when (lenEvents /= fetchBatchSize && alreadyWarned) $ -- emit as warning in case users are only logging warning severity and saw above @@ -294,42 +347,52 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..} Tracing.runTraceTInContext tracingCtx - case getEventTriggerInfoFromEvent cache e of - Left err -> do - -- This rare error can happen in the following known cases: - -- i) schema cache is not up-to-date (due to some bug, say during schema syncing across multiple instances) - -- ii) the event trigger is dropped when this event was just fetched - logQErr $ err500 Unexpected err - liftIO $ runPgSourceWriteTx sourceConfig $ do - currentTime <- liftIO getCurrentTime - -- For such an event, we unlock the event and retry after a minute - setRetry e (addUTCTime 60 currentTime) - >>= flip onLeft logQErr - Right eti -> runTraceT (spanName eti) do - let webhook = T.unpack $ wciCachedValue $ etiWebhookInfo eti - retryConf = etiRetryConf eti - timeoutSeconds = fromMaybe defaultTimeoutSeconds (rcTimeoutSec retryConf) - responseTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000) - headerInfos = etiHeaders eti - etHeaders = map encodeHeader headerInfos - headers = addDefaultHeaders etHeaders - ep = createEventPayload retryConf e - payload = encode $ toJSON ep - extraLogCtx = ExtraLogContext Nothing (epId ep) -- avoiding getting current time here to avoid another IO call with each event call - requestDetails = RequestDetails $ LBS.length payload + maintenanceModeVersionEither :: Either QErr (Maybe MaintenanceModeVersion) <- + case maintenanceMode of + MaintenanceModeEnabled -> do + maintenanceModeVersion <- + liftIO $ runPgSourceReadTx sourceConfig getMaintenanceModeVersion + return $ Just <$> maintenanceModeVersion + MaintenanceModeDisabled -> return $ Right Nothing - -- Track the number of active HTTP workers using EKG. - res <- bracket_ - (liftIO $ EKG.Gauge.inc $ smNumEventHTTPWorkers serverMetrics) - (liftIO $ EKG.Gauge.dec $ smNumEventHTTPWorkers serverMetrics) - (runExceptT $ tryWebhook headers responseTimeout payload webhook) + case maintenanceModeVersionEither of + Left maintenanceModeVersionErr -> logQErr maintenanceModeVersionErr + Right maintenanceModeVersion -> + case getEventTriggerInfoFromEvent cache e of + Left err -> do + -- This rare error can happen in the following known cases: + -- i) schema cache is not up-to-date (due to some bug, say during schema syncing across multiple instances) + -- ii) the event trigger is dropped when this event was just fetched + logQErr $ err500 Unexpected err + liftIO (runPgSourceWriteTx sourceConfig $ do + currentTime <- liftIO getCurrentTime + -- For such an event, we unlock the event and retry after a minute + setRetry e (addUTCTime 60 currentTime) maintenanceModeVersion) + >>= flip onLeft logQErr + Right eti -> runTraceT (spanName eti) do + let webhook = T.unpack $ wciCachedValue $ etiWebhookInfo eti + retryConf = etiRetryConf eti + timeoutSeconds = fromMaybe defaultTimeoutSeconds (rcTimeoutSec retryConf) + responseTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000) + headerInfos = etiHeaders eti + etHeaders = map encodeHeader headerInfos + headers = addDefaultHeaders etHeaders + ep = createEventPayload retryConf e + payload = encode $ toJSON ep + extraLogCtx = ExtraLogContext Nothing (epId ep) -- avoiding getting current time here to avoid another IO call with each event call + requestDetails = RequestDetails $ LBS.length payload - logHTTPForET res extraLogCtx requestDetails - let decodedHeaders = map (decodeHeader logenv headerInfos) headers - either - (processError sourceConfig e retryConf decodedHeaders ep) - (processSuccess sourceConfig e decodedHeaders ep) res - >>= flip onLeft logQErr + -- Track the number of active HTTP workers using EKG. + res <- bracket_ + (liftIO $ EKG.Gauge.inc $ smNumEventHTTPWorkers serverMetrics) + (liftIO $ EKG.Gauge.dec $ smNumEventHTTPWorkers serverMetrics) + (runExceptT $ tryWebhook headers responseTimeout payload webhook) + logHTTPForET res extraLogCtx requestDetails + let decodedHeaders = map (decodeHeader logenv headerInfos) headers + either + (processError sourceConfig e retryConf decodedHeaders ep maintenanceModeVersion) + (processSuccess sourceConfig e decodedHeaders ep maintenanceModeVersion) res + >>= flip onLeft logQErr withEventEngineCtx :: ( MonadIO m @@ -363,22 +426,33 @@ createEventPayload retryConf e = EventPayload processSuccess :: ( MonadIO m ) - => SourceConfig 'Postgres -> Event -> [HeaderConf] -> EventPayload -> HTTPResp a + => SourceConfig 'Postgres + -> Event + -> [HeaderConf] + -> EventPayload + -> Maybe MaintenanceModeVersion + -> HTTPResp a -> m (Either QErr ()) -processSuccess sourceConfig e decodedHeaders ep resp = do +processSuccess sourceConfig e decodedHeaders ep maintenanceModeVersion resp = do let respBody = hrsBody resp respHeaders = hrsHeaders resp respStatus = hrsStatus resp invocation = mkInvocation ep respStatus decodedHeaders respBody respHeaders liftIO $ runPgSourceWriteTx sourceConfig $ do insertInvocation invocation - setSuccess e + setSuccess e maintenanceModeVersion processError :: ( MonadIO m ) - => SourceConfig 'Postgres -> Event -> RetryConf -> [HeaderConf] -> EventPayload -> HTTPErr a + => SourceConfig 'Postgres + -> Event + -> RetryConf + -> [HeaderConf] + -> EventPayload + -> Maybe MaintenanceModeVersion + -> HTTPErr a -> m (Either QErr ()) -processError sourceConfig e retryConf decodedHeaders ep err = do +processError sourceConfig e retryConf decodedHeaders ep maintenanceModeVersion err = do let invocation = case err of HClient excp -> do let errMsg = TBS.fromLBS $ encode $ show excp @@ -396,10 +470,15 @@ processError sourceConfig e retryConf decodedHeaders ep err = do mkInvocation ep 500 decodedHeaders errMsg [] liftIO $ runPgSourceWriteTx sourceConfig $ do insertInvocation invocation - retryOrSetError e retryConf err + retryOrSetError e retryConf maintenanceModeVersion err -retryOrSetError :: Event -> RetryConf -> HTTPErr a -> Q.TxE QErr () -retryOrSetError e retryConf err = do +retryOrSetError + :: Event + -> RetryConf + -> Maybe MaintenanceModeVersion + -> HTTPErr a + -> Q.TxE QErr () +retryOrSetError e retryConf maintenanceModeVersion err = do let mretryHeader = getRetryAfterHeaderFromError err tries = eTries e mretryHeaderSeconds = mretryHeader >>= parseRetryHeader @@ -408,13 +487,13 @@ retryOrSetError e retryConf err = do -- current_try = tries + 1 , allowed_total_tries = rcNumRetries retryConf + 1 if triesExhausted && noRetryHeader then do - setError e + setError e maintenanceModeVersion else do currentTime <- liftIO getCurrentTime let delay = fromMaybe (rcIntervalSec retryConf) mretryHeaderSeconds diff = fromIntegral delay retryTime = addUTCTime diff currentTime - setRetry e retryTime + setRetry e retryTime maintenanceModeVersion where getRetryAfterHeaderFromError (HStatus resp) = getRetryAfterHeaderFromResp resp getRetryAfterHeaderFromError _ = Nothing @@ -489,6 +568,35 @@ fetchEvents source limitI = } limit = fromIntegral limitI :: Word64 +fetchEventsMaintenanceMode :: SourceName -> Int -> MaintenanceModeVersion -> Q.TxE QErr [Event] +fetchEventsMaintenanceMode sourceName limitI = \case + PreviousMMVersion -> + map uncurryEvent <$> Q.listQE defaultTxErrorHandler [Q.sql| + UPDATE hdb_catalog.event_log + SET locked = 't' + WHERE id IN ( SELECT l.id + FROM hdb_catalog.event_log l + WHERE l.delivered = 'f' and l.error = 'f' and l.locked = 'f' + and (l.next_retry_at is NULL or l.next_retry_at <= now()) + and l.archived = 'f' + ORDER BY created_at + LIMIT $1 + FOR UPDATE SKIP LOCKED ) + RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at + |] (Identity limit) True + where uncurryEvent (id', sn, tn, trn, Q.AltJ payload, tries, created) = + Event + { eId = id' + , eSource = SNDefault -- in v1, there'll only be the default source + , eTable = QualifiedObject sn tn + , eTrigger = TriggerMetadata trn + , eEvent = payload + , eTries = tries + , eCreatedAt = created + } + limit = fromIntegral limitI :: Word64 + CurrentMMVersion -> fetchEvents sourceName limitI + insertInvocation :: Invocation 'EventType -> Q.TxE QErr () insertInvocation invo = do Q.unitQE defaultTxErrorHandler [Q.sql| @@ -500,39 +608,64 @@ insertInvocation invo = do , Q.AltJ $ toJSON $ iResponse invo) True Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.event_log + SET tries = tries + 1 WHERE id = $1 |] (Identity $ iEventId invo) True -setSuccess :: Event -> Q.TxE QErr () -setSuccess e = Q.unitQE defaultTxErrorHandler [Q.sql| - UPDATE hdb_catalog.event_log - SET delivered = 't', next_retry_at = NULL, locked = NULL - WHERE id = $1 - |] (Identity $ eId e) True +setSuccess :: Event -> Maybe MaintenanceModeVersion -> Q.TxE QErr () +setSuccess e = \case + Just PreviousMMVersion -> + Q.unitQE defaultTxErrorHandler [Q.sql| + UPDATE hdb_catalog.event_log + SET delivered = 't', next_retry_at = NULL, locked = 'f' + WHERE id = $1 + |] (Identity $ eId e) True + Just CurrentMMVersion -> latestVersionSetSuccess + Nothing -> latestVersionSetSuccess + where + latestVersionSetSuccess = + Q.unitQE defaultTxErrorHandler [Q.sql| + UPDATE hdb_catalog.event_log + SET delivered = 't', next_retry_at = NULL, locked = NULL + WHERE id = $1 + |] (Identity $ eId e) True -setError :: Event -> Q.TxE QErr () -setError e = Q.unitQE defaultTxErrorHandler [Q.sql| - UPDATE hdb_catalog.event_log - SET error = 't', next_retry_at = NULL, locked = NULL - WHERE id = $1 - |] (Identity $ eId e) True +setError :: Event -> Maybe MaintenanceModeVersion -> Q.TxE QErr () +setError e = \case + Just PreviousMMVersion -> + Q.unitQE defaultTxErrorHandler [Q.sql| + UPDATE hdb_catalog.event_log + SET error = 't', next_retry_at = NULL, locked = 'f' + WHERE id = $1 + |] (Identity $ eId e) True + Just CurrentMMVersion -> latestVersionSetError + Nothing -> latestVersionSetError + where + latestVersionSetError = + Q.unitQE defaultTxErrorHandler [Q.sql| + UPDATE hdb_catalog.event_log + SET error = 't', next_retry_at = NULL, locked = NULL + WHERE id = $1 + |] (Identity $ eId e) True -setRetry :: Event -> UTCTime -> Q.TxE QErr () -setRetry e time = - Q.unitQE defaultTxErrorHandler [Q.sql| - UPDATE hdb_catalog.event_log - SET next_retry_at = $1, locked = NULL - WHERE id = $2 - |] (time, eId e) True - -unlockAllEvents :: Q.TxE QErr () -unlockAllEvents = - Q.unitQE defaultTxErrorHandler [Q.sql| - UPDATE hdb_catalog.event_log - SET locked = NULL - WHERE locked IS NOT NULL - |] () True +setRetry :: Event -> UTCTime -> Maybe MaintenanceModeVersion -> Q.TxE QErr () +setRetry e time = \case + Just PreviousMMVersion -> + Q.unitQE defaultTxErrorHandler [Q.sql| + UPDATE hdb_catalog.event_log + SET next_retry_at = $1, locked = 'f' + WHERE id = $2 + |] (time, eId e) True + Just CurrentMMVersion -> latestVersionSetRetry + Nothing -> latestVersionSetRetry + where + latestVersionSetRetry = + Q.unitQE defaultTxErrorHandler [Q.sql| + UPDATE hdb_catalog.event_log + SET next_retry_at = $1, locked = NULL + WHERE id = $2 + |] (time, eId e) True toInt64 :: (Integral a) => a -> Int64 toInt64 = fromIntegral @@ -563,3 +696,12 @@ unlockEvents eventIds = RETURNING *) SELECT count(*) FROM "cte" |] (Identity $ EventIdArray eventIds) True + +getMaintenanceModeVersion :: Q.TxE QErr MaintenanceModeVersion +getMaintenanceModeVersion = liftTx $ do + catalogVersion <- getCatalogVersion -- From the user's DB + if | catalogVersion == "40" -> pure PreviousMMVersion + | catalogVersion == latestCatalogVersionString -> pure CurrentMMVersion + | otherwise -> + throw500 $ + "Maintenance mode is only supported with catalog versions: 40 and " <> latestCatalogVersionString diff --git a/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs b/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs index 92f4aab7e7c..10e40ac66b3 100644 --- a/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs +++ b/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs @@ -262,6 +262,7 @@ buildSchemaCacheRule env = proc (metadata, invalidationKeys) -> do , MonadIO m, MonadBaseControl IO m , MonadResolveSource m , BackendMetadata b + , HasServerConfigCtx m ) => ( Inc.Dependency (HashMap SourceName Inc.InvalidationKey) , SourceMetadata b @@ -269,12 +270,13 @@ buildSchemaCacheRule env = proc (metadata, invalidationKeys) -> do resolveSourceIfNeeded = Inc.cache proc (invalidationKeys, sourceMetadata) -> do let sourceName = _smName sourceMetadata metadataObj = MetadataObject (MOSource sourceName) $ toJSON sourceName + maintenanceMode <- bindA -< _sccMaintenanceMode <$> askServerConfigCtx maybeSourceConfig <- getSourceConfigIfNeeded -< (invalidationKeys, sourceName, _smConfiguration sourceMetadata) case maybeSourceConfig of Nothing -> returnA -< Nothing Just sourceConfig -> (| withRecordInconsistency ( - liftEitherA <<< bindA -< resolveDatabaseMetadata sourceConfig) + liftEitherA <<< bindA -< resolveDatabaseMetadata sourceConfig maintenanceMode) |) metadataObj buildSource diff --git a/server/src-lib/Hasura/RQL/Types/Metadata/Backend.hs b/server/src-lib/Hasura/RQL/Types/Metadata/Backend.hs index 7ff95792a13..5ea354a06e7 100644 --- a/server/src-lib/Hasura/RQL/Types/Metadata/Backend.hs +++ b/server/src-lib/Hasura/RQL/Types/Metadata/Backend.hs @@ -63,6 +63,7 @@ class (Backend b) => BackendMetadata (b :: BackendType) where resolveDatabaseMetadata :: (MonadIO m, MonadBaseControl IO m, MonadResolveSource m) => SourceConfig b + -> MaintenanceMode -> m (Either QErr (ResolvedSource b)) createTableEventTrigger diff --git a/server/src-lib/Hasura/Server/Migrate.hs b/server/src-lib/Hasura/Server/Migrate.hs index 2448a3d25fe..9b700683916 100644 --- a/server/src-lib/Hasura/Server/Migrate.hs +++ b/server/src-lib/Hasura/Server/Migrate.hs @@ -105,19 +105,23 @@ migrateCatalog migrateCatalog maybeDefaultSourceConfig maintenanceMode migrationTime = do catalogSchemaExists <- doesSchemaExist (SchemaName "hdb_catalog") versionTableExists <- doesTableExist (SchemaName "hdb_catalog") (TableName "hdb_version") + metadataTableExists <- doesTableExist (SchemaName "hdb_catalog") (TableName "hdb_metadata") migrationResult <- if | maintenanceMode == MaintenanceModeEnabled -> do if | not catalogSchemaExists -> throw500 "unexpected: hdb_catalog schema not found in maintenance mode" | not versionTableExists -> throw500 "unexpected: hdb_catalog.hdb_version table not found in maintenance mode" - -- TODO: should we also have a check for the catalog version? + | not metadataTableExists -> + throw500 $ + "the \"hdb_catalog.hdb_metadata\" table is expected to exist and contain" <> + " the metadata of the graphql-engine" | otherwise -> pure MRMaintanenceMode | otherwise -> case catalogSchemaExists of False -> initialize True True -> case versionTableExists of False -> initialize False - True -> migrateFrom =<< getCatalogVersion + True -> migrateFrom =<< liftTx getCatalogVersion metadata <- liftTx fetchMetadataFromCatalog pure (migrationResult, metadata) where @@ -166,7 +170,7 @@ downgradeCatalog => Maybe (SourceConnConfiguration 'Postgres) -> DowngradeOptions -> UTCTime -> m MigrationResult downgradeCatalog defaultSourceConfig opts time = do - downgradeFrom =<< getCatalogVersion + downgradeFrom =<< liftTx getCatalogVersion where -- downgrades an existing catalog to the specified version downgradeFrom :: Text -> m MigrationResult diff --git a/server/src-lib/Hasura/Server/Migrate/Internal.hs b/server/src-lib/Hasura/Server/Migrate/Internal.hs index b93367ce1b6..d2334a6af5e 100644 --- a/server/src-lib/Hasura/Server/Migrate/Internal.hs +++ b/server/src-lib/Hasura/Server/Migrate/Internal.hs @@ -6,6 +6,7 @@ module Hasura.Server.Migrate.Internal where import Hasura.Backends.Postgres.Connection import Hasura.Prelude +import Hasura.RQL.Types.Error import Hasura.RQL.Types.EventTrigger import qualified Data.Aeson as A @@ -16,8 +17,8 @@ runTx = liftTx . Q.multiQE defaultTxErrorHandler -- | The old 0.8 catalog version is non-integral, so we store it in the database as a -- string. -getCatalogVersion :: MonadTx m => m Text -getCatalogVersion = liftTx $ runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler +getCatalogVersion :: Q.TxE QErr Text +getCatalogVersion = runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler [Q.sql| SELECT version FROM hdb_catalog.hdb_version |] () False from3To4 :: MonadTx m => m () diff --git a/server/src-rsr/migrations/42_to_43.sql b/server/src-rsr/migrations/42_to_43.sql index cbdc8d4b355..e985882ae2a 100644 --- a/server/src-rsr/migrations/42_to_43.sql +++ b/server/src-rsr/migrations/42_to_43.sql @@ -39,7 +39,13 @@ DROP VIEW hdb_catalog.hdb_cron_events_stats; DROP TABLE hdb_catalog.hdb_cron_triggers; -- Create table which stores metadata JSON blob -CREATE TABLE hdb_catalog.hdb_metadata +-- The "IF NOT EXISTS" is added due to the introduction of maintenance mode +-- in which migrations are not applied on startup but the 'hdb_catalog.hdb_table' +-- is expected to exist and contain the metadata of the graphql-engine. Now, when +-- the graphql-engine is run in normal mode (with maintenance mode disabled) this +-- migration file will be run and since this table already exists, we should add +-- the "IF NOT EXISTS" clause to avoid a migration error +CREATE TABLE IF NOT EXISTS hdb_catalog.hdb_metadata ( id INTEGER PRIMARY KEY, metadata JSON NOT NULL