mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-11-10 10:29:12 +03:00
server: fix asymptotics of event_log batch fetching
Co-authored-by: Rakesh Emmadi <12475069+rakeshkky@users.noreply.github.com> GitOrigin-RevId: 9b8afb502e94dd371a8596ccde97d023bd2344a9
This commit is contained in:
parent
fc17132930
commit
01d8a37706
@ -5,6 +5,7 @@
|
||||
### Bug fixes and improvements
|
||||
(Add entries below in the order of server, console, cli, docs, others)
|
||||
|
||||
- server: fix asymptotic performance of fetching from the event_log table
|
||||
- console: add `pool_timeout`, `connection_lifetime` and `isolation_level` connection params to connect database form
|
||||
- console: add check constraints and comments to MS SQL Server tables' read-only modify page
|
||||
- console: add create table functionality for MS SQL Server
|
||||
|
@ -442,7 +442,12 @@ elif [ "$MODE" = "test" ]; then
|
||||
# It's better UX to build first (possibly failing) before trying to launch
|
||||
# PG, but make sure that new-run uses the exact same build plan, else we risk
|
||||
# rebuilding twice... ugh
|
||||
cabal new-build --project-file=cabal.project.dev-sh exe:graphql-engine test:graphql-engine-tests
|
||||
# Formerly this was a `cabal build` but mixing cabal build and cabal run
|
||||
# seems to conflict now, causing re-linking, haddock runs, etc. Instead do a
|
||||
# `graphql-engine version` to trigger build
|
||||
cabal new-run --project-file=cabal.project.dev-sh -- exe:graphql-engine \
|
||||
--metadata-database-url="$PG_DB_URL" version
|
||||
|
||||
if [ "$RUN_INTEGRATION_TESTS" = true ]; then
|
||||
start_dbs
|
||||
else
|
||||
|
@ -335,6 +335,7 @@ library
|
||||
, Hasura.Backends.Postgres.DDL.Function
|
||||
, Hasura.Backends.Postgres.DDL.RunSQL
|
||||
, Hasura.Backends.Postgres.DDL.Source
|
||||
, Hasura.Backends.Postgres.DDL.Source.Version
|
||||
, Hasura.Backends.Postgres.DDL.Table
|
||||
, Hasura.Backends.Postgres.Execute.LiveQuery
|
||||
, Hasura.Backends.Postgres.Execute.Mutation
|
||||
|
@ -11,17 +11,17 @@ module Hasura.Backends.Postgres.DDL.Source
|
||||
|
||||
import Hasura.Prelude
|
||||
|
||||
import qualified Data.HashMap.Strict as Map
|
||||
import qualified Database.PG.Query as Q
|
||||
import qualified Language.Haskell.TH.Lib as TH
|
||||
import qualified Language.Haskell.TH.Syntax as TH
|
||||
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Data.FileEmbed (makeRelativeToProject)
|
||||
import Data.Time.Clock (UTCTime)
|
||||
import qualified Data.HashMap.Strict as Map
|
||||
import qualified Database.PG.Query as Q
|
||||
import qualified Language.Haskell.TH.Lib as TH
|
||||
import qualified Language.Haskell.TH.Syntax as TH
|
||||
|
||||
import Control.Monad.Trans.Control (MonadBaseControl)
|
||||
import Data.FileEmbed (makeRelativeToProject)
|
||||
import Data.Time.Clock (UTCTime)
|
||||
|
||||
import Hasura.Backends.Postgres.Connection
|
||||
import Hasura.Backends.Postgres.DDL.Source.Version
|
||||
import Hasura.Backends.Postgres.SQL.Types
|
||||
import Hasura.Base.Error
|
||||
import Hasura.RQL.Types.Backend
|
||||
@ -31,7 +31,7 @@ import Hasura.RQL.Types.Source
|
||||
import Hasura.RQL.Types.Table
|
||||
import Hasura.SQL.Backend
|
||||
import Hasura.Server.Migrate.Internal
|
||||
import Hasura.Server.Types (MaintenanceMode (..))
|
||||
import Hasura.Server.Types (MaintenanceMode (..))
|
||||
|
||||
|
||||
-- | We differentiate the handling of metadata between Citus and Vanilla
|
||||
@ -85,12 +85,14 @@ initCatalogForSource maintenanceMode migrationTime = do
|
||||
| not sourceVersionTableExist && eventLogTableExist -> do
|
||||
-- Update the Source Catalog to v43 to include the new migration
|
||||
-- changes. Skipping this step will result in errors.
|
||||
currCatalogVersion <- liftTx getCatalogVersion
|
||||
currMetadataCatalogVersion <- liftTx getCatalogVersion
|
||||
-- we migrate to the 43 version, which is the migration where
|
||||
-- metadata separation is introduced
|
||||
migrateTo43 currCatalogVersion
|
||||
migrateTo43MetadataCatalog currMetadataCatalogVersion
|
||||
setCatalogVersion "43" migrationTime
|
||||
liftTx createVersionTable
|
||||
-- Migrate the catalog from initial version i.e '1'
|
||||
migrateSourceCatalogFrom "1"
|
||||
| otherwise -> migrateSourceCatalog
|
||||
where
|
||||
initPgSourceCatalog = do
|
||||
@ -108,19 +110,61 @@ initCatalogForSource maintenanceMode migrationTime = do
|
||||
CREATE UNIQUE INDEX hdb_source_catalog_version_one_row
|
||||
ON hdb_catalog.hdb_source_catalog_version((version IS NOT NULL));
|
||||
|]
|
||||
setSourceCatalogVersion
|
||||
pure ()
|
||||
|
||||
migrateSourceCatalog = do
|
||||
version <- getSourceCatalogVersion
|
||||
case version of
|
||||
"1" -> pure ()
|
||||
_ -> throw500 $ "unexpected source catalog version: " <> version
|
||||
|
||||
migrateTo43 prevVersion = do
|
||||
migrateTo43MetadataCatalog prevVersion = do
|
||||
let neededMigrations = dropWhile ((/= prevVersion) . fst) upMigrationsUntil43
|
||||
traverse_ snd neededMigrations
|
||||
|
||||
-- Upgrade the hdb_catalog schema to v43
|
||||
-- NOTE (rakesh):
|
||||
-- Down migrations for postgres sources is not supported in this PR. We need an
|
||||
-- exhaustive discussion to make a call as I think, as of now, it is not
|
||||
-- trivial. For metadata catalog migrations, we have a separate downgrade
|
||||
-- command in the graphql-engine exe.
|
||||
--
|
||||
-- I can think of two ways:
|
||||
--
|
||||
-- - Just like downgrade, we need to have a new command path for downgrading
|
||||
-- pg sources (command design should support other backends too,
|
||||
-- graphql-engine source-downgrade postgres --to-catalog-version 1 --
|
||||
-- downgrade all available pg sources to 1)
|
||||
-- - Have an online documentation with necessary SQLs to help users to
|
||||
-- downgrade pg sources themselves. Improve error message by referring the URL
|
||||
-- to the documentation.
|
||||
|
||||
migrateSourceCatalog :: MonadTx m => m ()
|
||||
migrateSourceCatalog =
|
||||
getSourceCatalogVersion >>= migrateSourceCatalogFrom
|
||||
|
||||
migrateSourceCatalogFrom :: (MonadTx m) => Text -> m ()
|
||||
migrateSourceCatalogFrom prevVersion
|
||||
| prevVersion == latestSourceCatalogVersionText = pure ()
|
||||
| [] <- neededMigrations =
|
||||
throw400 NotSupported $
|
||||
"Expected source catalog version <= "
|
||||
<> latestSourceCatalogVersionText
|
||||
<> ", but the current version is " <> prevVersion
|
||||
| otherwise = do
|
||||
traverse_ snd neededMigrations
|
||||
setSourceCatalogVersion
|
||||
where
|
||||
neededMigrations =
|
||||
dropWhile ((/= prevVersion) . fst) sourceMigrations
|
||||
|
||||
sourceMigrations :: (MonadTx m) => [(Text, m ())]
|
||||
sourceMigrations =
|
||||
$(let migrationFromFile from =
|
||||
let to = from + 1
|
||||
path = "src-rsr/pg_source_migrations/" <> show from <> "_to_" <> show to <> ".sql"
|
||||
in [| runTx $(makeRelativeToProject path >>= Q.sqlFromFile) |]
|
||||
|
||||
migrationsFromFile = map $ \(from :: Integer) ->
|
||||
[| ($(TH.lift $ tshow from), $(migrationFromFile from)) |]
|
||||
|
||||
in TH.listE $ migrationsFromFile [1..(latestSourceCatalogVersion - 1)]
|
||||
)
|
||||
|
||||
-- Upgrade the hdb_catalog schema to v43 (Metadata catalog)
|
||||
upMigrationsUntil43 :: MonadTx m => [(Text, m ())]
|
||||
upMigrationsUntil43 =
|
||||
$(let migrationFromFile from to =
|
||||
@ -140,21 +184,6 @@ upMigrationsUntil43 =
|
||||
: migrationsFromFile [5..43]
|
||||
)
|
||||
|
||||
currentSourceCatalogVersion :: Text
|
||||
currentSourceCatalogVersion = "1"
|
||||
|
||||
setSourceCatalogVersion :: MonadTx m => m ()
|
||||
setSourceCatalogVersion = liftTx $ Q.unitQE defaultTxErrorHandler [Q.sql|
|
||||
INSERT INTO hdb_catalog.hdb_source_catalog_version(version, upgraded_on)
|
||||
VALUES ($1, NOW())
|
||||
ON CONFLICT ((version IS NOT NULL))
|
||||
DO UPDATE SET version = $1, upgraded_on = NOW()
|
||||
|] (Identity currentSourceCatalogVersion) False
|
||||
|
||||
getSourceCatalogVersion :: MonadTx m => m Text
|
||||
getSourceCatalogVersion = liftTx $ runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler
|
||||
[Q.sql| SELECT version FROM hdb_catalog.hdb_source_catalog_version |] () False
|
||||
|
||||
-- | Fetch Postgres metadata of all user tables
|
||||
fetchTableMetadata
|
||||
:: forall pgKind m
|
||||
|
@ -0,0 +1,26 @@
|
||||
module Hasura.Backends.Postgres.DDL.Source.Version where
|
||||
|
||||
import Hasura.Prelude
|
||||
|
||||
import qualified Database.PG.Query as Q
|
||||
|
||||
import Hasura.Backends.Postgres.Connection
|
||||
|
||||
|
||||
latestSourceCatalogVersion :: Integer
|
||||
latestSourceCatalogVersion = 2
|
||||
|
||||
latestSourceCatalogVersionText :: Text
|
||||
latestSourceCatalogVersionText = tshow latestSourceCatalogVersion
|
||||
|
||||
setSourceCatalogVersion :: MonadTx m => m ()
|
||||
setSourceCatalogVersion = liftTx $ Q.unitQE defaultTxErrorHandler [Q.sql|
|
||||
INSERT INTO hdb_catalog.hdb_source_catalog_version(version, upgraded_on)
|
||||
VALUES ($1, NOW())
|
||||
ON CONFLICT ((version IS NOT NULL))
|
||||
DO UPDATE SET version = $1, upgraded_on = NOW()
|
||||
|] (Identity latestSourceCatalogVersionText) False
|
||||
|
||||
getSourceCatalogVersion :: MonadTx m => m Text
|
||||
getSourceCatalogVersion = liftTx $ runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler
|
||||
[Q.sql| SELECT version FROM hdb_catalog.hdb_source_catalog_version |] () False
|
@ -567,7 +567,9 @@ fetchEvents source limitI =
|
||||
and (l.locked IS NULL or l.locked < (NOW() - interval '30 minute'))
|
||||
and (l.next_retry_at is NULL or l.next_retry_at <= now())
|
||||
and l.archived = 'f'
|
||||
ORDER BY created_at
|
||||
/* NB: this ordering is important for our index `event_log_fetch_events` */
|
||||
/* (see `init_pg_source.sql`) */
|
||||
ORDER BY locked NULLS FIRST, next_retry_at NULLS FIRST, created_at
|
||||
LIMIT $1
|
||||
FOR UPDATE SKIP LOCKED )
|
||||
RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at
|
||||
|
@ -34,10 +34,16 @@ CREATE TABLE hdb_catalog.event_log
|
||||
archived BOOLEAN NOT NULL DEFAULT FALSE
|
||||
);
|
||||
|
||||
/* This powers `archiveEvents` */
|
||||
CREATE INDEX ON hdb_catalog.event_log (trigger_name);
|
||||
CREATE INDEX ON hdb_catalog.event_log (locked);
|
||||
CREATE INDEX ON hdb_catalog.event_log (delivered);
|
||||
CREATE INDEX ON hdb_catalog.event_log (created_at);
|
||||
/* This index powers `fetchEvents` */
|
||||
CREATE INDEX event_log_fetch_events
|
||||
ON hdb_catalog.event_log (locked NULLS FIRST, next_retry_at NULLS FIRST, created_at)
|
||||
WHERE delivered = 'f'
|
||||
and error = 'f'
|
||||
and archived = 'f'
|
||||
;
|
||||
|
||||
|
||||
CREATE TABLE hdb_catalog.event_invocation_logs
|
||||
(
|
||||
|
11
server/src-rsr/pg_source_migrations/1_to_2.sql
Normal file
11
server/src-rsr/pg_source_migrations/1_to_2.sql
Normal file
@ -0,0 +1,11 @@
|
||||
DROP INDEX hdb_catalog.event_log_locked_idx;
|
||||
DROP INDEX hdb_catalog.event_log_delivered_idx;
|
||||
DROP INDEX hdb_catalog.event_log_created_at_idx;
|
||||
|
||||
/* This index powers `fetchEvents` */
|
||||
CREATE INDEX event_log_fetch_events
|
||||
ON hdb_catalog.event_log (locked NULLS FIRST, next_retry_at NULLS FIRST, created_at)
|
||||
WHERE delivered = 'f'
|
||||
and error = 'f'
|
||||
and archived = 'f'
|
||||
;
|
Loading…
Reference in New Issue
Block a user