From c425b554b8ecf2ef8897be70127d445bc8ac9326 Mon Sep 17 00:00:00 2001 From: Brandon Simmons Date: Wed, 11 Mar 2020 02:27:31 -0400 Subject: [PATCH] server(events): utilize proper backpressure scheme (close #3839) (#4013) * Test working through a backlog of change events * Use a slightly more performant threaded http server in eventing pytests This helped locally but not on CI it seems... * Rework event processing for backpressure. Closes #3839 With loo low `HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL` and/or slow webhooks and/or too small `HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE` we might previously check out events from the DB faster than we can service them, leading to space leaks, weirdness, etc. Other changes: - avoid fetch interval sleep latency when we previously did a non-empty fetch - prefetch event batch while http pool is working - warn when it appears we can't keep up with events being generated - make some effort to process events in creation order so we don't starve older ones. ALSO NOTE: HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL changes semantics slightly, since it only comes into play after an empty fetch. The old semantics weren't documented in detail, so I think this is fine. --- CHANGELOG.md | 2 + .../graphql-engine-flags/reference.rst | 3 +- docs/graphql/manual/event-triggers/index.rst | 5 + server/src-lib/Hasura/App.hs | 4 +- server/src-lib/Hasura/Events/Lib.hs | 181 ++++++++++-------- .../src-lib/Hasura/RQL/Types/EventTrigger.hs | 2 + server/src-lib/Hasura/RQL/Types/Table.hs | 6 + server/src-lib/Hasura/Server/Init.hs | 3 +- server/src-rsr/catalog_version.txt | 2 +- server/src-rsr/migrations/32_to_33.sql | 1 + server/src-rsr/migrations/33_to_32.sql | 1 + server/tests-py/context.py | 10 +- server/tests-py/test_events.py | 33 +++- 13 files changed, 171 insertions(+), 82 deletions(-) create mode 100644 server/src-rsr/migrations/32_to_33.sql create mode 100644 server/src-rsr/migrations/33_to_32.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index c13dc096c8d..f5601457b7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,3 +64,5 @@ - fix casting citext column type (fix #2818) (#3861) - Add downgrade command (close #1156) (#3760) - persist mix files only when coverage is enabled (#3844) +- `HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL` changes semantics slightly: we only sleep for the interval + when there were previously no events to process. Potential space leak fixed. (#3839) diff --git a/docs/graphql/manual/deployment/graphql-engine-flags/reference.rst b/docs/graphql/manual/deployment/graphql-engine-flags/reference.rst index 5f2661ce9ba..8e81b7d1266 100644 --- a/docs/graphql/manual/deployment/graphql-engine-flags/reference.rst +++ b/docs/graphql/manual/deployment/graphql-engine-flags/reference.rst @@ -132,7 +132,8 @@ For the ``serve`` sub-command these are the available flags and ENV variables: * - N/A - ``HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL`` - - Postgres events polling interval + - Interval in milliseconds to sleep before trying to fetch events again after a fetch + returned no events from postgres * - ``-s, --stripes `` - ``HASURA_GRAPHQL_PG_STRIPES`` diff --git a/docs/graphql/manual/event-triggers/index.rst b/docs/graphql/manual/event-triggers/index.rst index c66469adfeb..8338b5aa09d 100644 --- a/docs/graphql/manual/event-triggers/index.rst +++ b/docs/graphql/manual/event-triggers/index.rst @@ -24,6 +24,11 @@ Events can be of the following types: - DELETE: When a row is deleted from a table - MANUAL: Using the console or API, an event can be triggered manually on a row +.. note:: + + Event webhook notifications will be delivered at least once, and may arrive out of order with + respect to the underlying event. + **See:** .. toctree:: diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index be286e8d6ad..c210d94c645 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -252,8 +252,8 @@ runHGEServer ServeOptions{..} InitCtx{..} initTime = do prepareEvents _icPgPool logger eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers" - (_pushEventsThread, _consumeEventsThread) <- liftIO $ - forkEventQueueProcessors logger logEnvHeaders + _eventQueueThread <- C.forkImmortal "processEventQueue" logger $ liftIO $ + processEventQueue logger logEnvHeaders _icHttpManager _icPgPool (getSCFromRef cacheRef) eventEngineCtx -- start a backgroud thread to handle async actions diff --git a/server/src-lib/Hasura/Events/Lib.hs b/server/src-lib/Hasura/Events/Lib.hs index 116efa38348..d87f454678c 100644 --- a/server/src-lib/Hasura/Events/Lib.hs +++ b/server/src-lib/Hasura/Events/Lib.hs @@ -1,17 +1,18 @@ +{-# LANGUAGE StrictData #-} -- TODO project-wide, maybe. See #3941 +{-# LANGUAGE RecordWildCards #-} module Hasura.Events.Lib ( initEventEngineCtx - , forkEventQueueProcessors + , processEventQueue , unlockAllEvents , defaultMaxEventThreads , defaultFetchIntervalMilliSec , Event(..) ) where -import Control.Concurrent.Extended (sleep, forkImmortal) -import Control.Concurrent.Async (async, link) +import Control.Concurrent.Extended (sleep) +import Control.Concurrent.Async (wait, withAsync, async, link) import Control.Concurrent.STM.TVar -import Control.Exception.Lifted (mask_, try, bracket_) -import Control.Monad.Trans.Control (MonadBaseControl) +import Control.Exception.Lifted (finally, mask_, try) import Control.Monad.STM import Data.Aeson import Data.Aeson.Casing @@ -20,6 +21,7 @@ import Data.Has import Data.Int (Int64) import Data.String import Data.Time.Clock +import Data.Word import Hasura.Events.HTTP import Hasura.HTTP import Hasura.Prelude @@ -28,8 +30,6 @@ import Hasura.RQL.Types import Hasura.Server.Version (HasVersion) import Hasura.SQL.Types -import qualified Control.Concurrent.STM.TQueue as TQ -import qualified Control.Immortal as Immortal import qualified Data.ByteString as BS import qualified Data.CaseInsensitive as CI import qualified Data.HashMap.Strict as M @@ -72,6 +72,9 @@ data DeliveryInfo $(deriveJSON (aesonDrop 2 snakeCase){omitNothingFields=True} ''DeliveryInfo) +-- | Change data for a particular row +-- +-- https://docs.hasura.io/1.0/graphql/manual/event-triggers/payload.html data Event = Event { eId :: EventId @@ -94,6 +97,7 @@ instance ToJSON QualifiedTableStrict where , "name" .= tn ] +-- | See 'Event'. data EventPayload = EventPayload { epId :: EventId @@ -149,9 +153,7 @@ data Invocation data EventEngineCtx = EventEngineCtx - { _eeCtxEventQueue :: TQ.TQueue Event - , _eeCtxEventThreads :: TVar Int - , _eeCtxMaxEventThreads :: Int + { _eeCtxEventThreadsCapacity :: TVar Int , _eeCtxFetchInterval :: DiffTime } @@ -165,50 +167,91 @@ retryAfterHeader :: CI.CI T.Text retryAfterHeader = "Retry-After" initEventEngineCtx :: Int -> DiffTime -> STM EventEngineCtx -initEventEngineCtx maxT fetchI = do - q <- TQ.newTQueue - c <- newTVar 0 - return $ EventEngineCtx q c maxT fetchI +initEventEngineCtx maxT _eeCtxFetchInterval = do + _eeCtxEventThreadsCapacity <- newTVar maxT + return $ EventEngineCtx{..} -forkEventQueueProcessors +-- | Service events from our in-DB queue. +-- +-- There are a few competing concerns and constraints here; we want to... +-- - fetch events in batches for lower DB pressure +-- - don't fetch more than N at a time (since that can mean: space leak, less +-- effective scale out, possible double sends for events we've checked out +-- on exit (TODO clean shutdown procedure)) +-- - try not to cause webhook workers to stall waiting on DB fetch +-- - limit webhook HTTP concurrency per HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE +processEventQueue :: (HasVersion) => L.Logger L.Hasura -> LogEnvHeaders -> HTTP.Manager-> Q.PGPool -> IO SchemaCache -> EventEngineCtx - -> IO (Immortal.Thread, Immortal.Thread) - -- ^ returns: (pushEvents handle, consumeEvents handle) -forkEventQueueProcessors logger logenv httpMgr pool getSchemaCache eectx = do - (,) <$> forkImmortal "pushEvents" logger pushEvents - <*> forkImmortal "consumeEvents" logger consumeEvents + -> IO void +processEventQueue logger logenv httpMgr pool getSchemaCache EventEngineCtx{..} = do + events0 <- popEventsBatch + go events0 0 False where - -- FIXME proper backpressure. See: #3839 - pushEvents = forever $ do - let EventEngineCtx q _ _ fetchI = eectx - eventsOrError <- runExceptT $ Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) fetchEvents - case eventsOrError of - Left err -> L.unLogger logger $ EventInternalErr err - Right events -> atomically $ mapM_ (TQ.writeTQueue q) events - sleep fetchI + fetchBatchSize = 100 + popEventsBatch = do + let run = runExceptT . Q.runTx pool (Q.RepeatableRead, Just Q.ReadWrite) + run (fetchEvents fetchBatchSize) >>= \case + Left err -> do + L.unLogger logger $ EventInternalErr err + return [] + Right events -> + return events - -- TODO this has all events race. How do we know this is correct? Document. - consumeEvents = forever $ - -- ensure async exceptions from link only raised between iterations of forever block: - mask_ $ do - event <- atomically $ do - let EventEngineCtx q _ _ _ = eectx - TQ.readTQueue q - -- FIXME proper backpressure. See: #3839 - t <- async $ runReaderT (processEvent event) (logger, httpMgr, eectx) - -- Make sure any stray exceptions are at least logged via 'forkImmortal': - link t + -- work on this batch of events while prefetching the next. Recurse after we've forked workers + -- for each in the batch, minding the requested pool size. + go :: [Event] -> Int -> Bool -> IO void + go events !fullFetchCount !alreadyWarned = do + -- process events ASAP until we've caught up; only then can we sleep + when (null events) $ sleep _eeCtxFetchInterval + + -- Prefetch next events payload while concurrently working through our current batch. + -- NOTE: we probably don't need to prefetch so early, but probably not + -- worth the effort for something more fine-tuned + eventsNext <- withAsync popEventsBatch $ \eventsNextA -> do + -- process approximately in order, minding HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE: + forM_ events $ \event -> + mask_ $ do + atomically $ do -- block until < HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE threads: + capacity <- readTVar _eeCtxEventThreadsCapacity + check $ capacity > 0 + writeTVar _eeCtxEventThreadsCapacity (capacity - 1) + -- since there is some capacity in our worker threads, we can launch another: + let restoreCapacity = liftIO $ atomically $ + modifyTVar' _eeCtxEventThreadsCapacity (+ 1) + t <- async $ flip runReaderT (logger, httpMgr) $ + processEvent event `finally` restoreCapacity + link t + + -- return when next batch ready; some 'processEvent' threads may be running. + wait eventsNextA + + let lenEvents = length events + if | lenEvents == fetchBatchSize -> do + -- If we've seen N fetches in a row from the DB come back full (i.e. only limited + -- by our LIMIT clause), then we say we're clearly falling behind: + let clearlyBehind = fullFetchCount >= 3 + unless alreadyWarned $ + when clearlyBehind $ + L.unLogger logger $ L.UnstructuredLog L.LevelWarn $ fromString $ + "Events processor may not be keeping up with events generated in postgres, " <> + "or we're working on a backlog of events. Consider increasing " <> + "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE" + go eventsNext (fullFetchCount+1) (alreadyWarned || clearlyBehind) + + | otherwise -> do + when (lenEvents /= fetchBatchSize && alreadyWarned) $ + -- emit as warning in case users are only logging warning severity and saw above + L.unLogger logger $ L.UnstructuredLog L.LevelWarn $ fromString $ + "It looks like the events processor is keeping up again." + go eventsNext 0 False - -- NOTE: Blocks in tryWebhook if >= _eeCtxMaxEventThreads invocations active. processEvent :: ( HasVersion , MonadReader r m , Has HTTP.Manager r , Has (L.Logger L.Hasura) r - , Has EventEngineCtx r , MonadIO m - , MonadBaseControl IO m ) => Event -> m () processEvent e = do @@ -228,10 +271,10 @@ forkEventQueueProcessors logger logenv httpMgr pool getSchemaCache eectx = do ep = createEventPayload retryConf e res <- runExceptT $ tryWebhook headers responseTimeout ep webhook let decodedHeaders = map (decodeHeader logenv headerInfos) headers - finally <- either + either (processError pool e retryConf decodedHeaders ep) (processSuccess pool e decodedHeaders ep) res - either logQErr return finally + >>= flip onLeft logQErr createEventPayload :: RetryConf -> Event -> EventPayload createEventPayload retryConf e = EventPayload @@ -389,21 +432,17 @@ logHTTPErr err = do logger :: L.Logger L.Hasura <- asks getter L.unLogger logger $ err --- NOTE: Blocks if >= _eeCtxMaxEventThreads invocations active, though we --- expect this to be bounded by responseTimeout. +-- These run concurrently on their respective EventPayloads tryWebhook :: ( Has (L.Logger L.Hasura) r , Has HTTP.Manager r - , Has EventEngineCtx r , MonadReader r m - , MonadBaseControl IO m , MonadIO m , MonadError HTTPErr m ) => [HTTP.Header] -> HTTP.ResponseTimeout -> EventPayload -> String -> m HTTPResp tryWebhook headers responseTimeout ep webhook = do - logger :: L.Logger L.Hasura <- asks getter let context = ExtraContext (epCreatedAt ep) (epId ep) initReqE <- liftIO $ try $ HTTP.parseRequest webhook case initReqE of @@ -415,36 +454,26 @@ tryWebhook headers responseTimeout ep webhook = do , HTTP.requestBody = HTTP.RequestBodyLBS (encode ep) , HTTP.responseTimeout = responseTimeout } - EventEngineCtx _ c maxT _ <- asks getter - -- wait for counter and then increment beforing making http request - let haveCapacity = do - countThreads <- readTVar c - pure $ countThreads < maxT - waitForCapacity = do - haveCapacity >>= check - modifyTVar' c (+1) - release = modifyTVar' c (subtract 1) - -- we could also log after we block, but that's actually even more awkward: - likelyHaveCapacity <- liftIO $ atomically haveCapacity - unless likelyHaveCapacity $ do - L.unLogger logger $ L.UnstructuredLog L.LevelWarn $ - fromString $ "In event queue webhook: exceeded HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE " <> - "and likely about to block for: "<> show context - - -- ensure we don't leak capacity and become totally broken in the - -- presence of unexpected exceptions: - bracket_ (liftIO $ atomically waitForCapacity) (liftIO $ atomically release) $ do - eitherResp <- runHTTP req (Just context) - onLeft eitherResp throwError + eitherResp <- runHTTP req (Just context) + onLeft eitherResp throwError getEventTriggerInfoFromEvent :: SchemaCache -> Event -> Maybe EventTriggerInfo getEventTriggerInfoFromEvent sc e = let table = eTable e tableInfo = M.lookup table $ scTables sc in M.lookup ( tmName $ eTrigger e) =<< (_tiEventTriggerInfoMap <$> tableInfo) -fetchEvents :: Q.TxE QErr [Event] -fetchEvents = +---- DATABASE QUERIES --------------------- +-- +-- The API for our in-database work queue: +------------------------------------------- + +-- | Lock and return events not yet being processed or completed, up to some +-- limit. Process events approximately in created_at order, but we make no +-- ordering guarentees; events can and will race. Nevertheless we want to +-- ensure newer change events don't starve older ones. +fetchEvents :: Int -> Q.TxE QErr [Event] +fetchEvents limitI = map uncurryEvent <$> Q.listQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.event_log SET locked = 't' @@ -453,10 +482,11 @@ fetchEvents = WHERE l.delivered = 'f' and l.error = 'f' and l.locked = 'f' and (l.next_retry_at is NULL or l.next_retry_at <= now()) and l.archived = 'f' - FOR UPDATE SKIP LOCKED - LIMIT 100 ) + ORDER BY created_at + LIMIT $1 + FOR UPDATE SKIP LOCKED ) RETURNING id, schema_name, table_name, trigger_name, payload::json, tries, created_at - |] () True + |] (Identity limit) True where uncurryEvent (id', sn, tn, trn, Q.AltJ payload, tries, created) = Event { eId = id' @@ -466,6 +496,7 @@ fetchEvents = , eTries = tries , eCreatedAt = created } + limit = fromIntegral limitI :: Word64 insertInvocation :: Invocation -> Q.TxE QErr () insertInvocation invo = do diff --git a/server/src-lib/Hasura/RQL/Types/EventTrigger.hs b/server/src-lib/Hasura/RQL/Types/EventTrigger.hs index df838d322f9..7ff0d244bc6 100644 --- a/server/src-lib/Hasura/RQL/Types/EventTrigger.hs +++ b/server/src-lib/Hasura/RQL/Types/EventTrigger.hs @@ -38,6 +38,7 @@ import qualified Data.Text as T import qualified Database.PG.Query as Q import qualified Text.Regex.TDFA as TDFA +-- | Unique name for event trigger. newtype TriggerName = TriggerName { unTriggerName :: NonEmptyText } deriving (Show, Eq, Hashable, Lift, DQuote, FromJSON, ToJSON, ToJSONKey, Q.FromCol, Q.ToPrepArg, Generic, Arbitrary, NFData, Cacheable) @@ -172,6 +173,7 @@ instance FromJSON CreateEventTriggerQuery where $(deriveToJSON (aesonDrop 4 snakeCase){omitNothingFields=True} ''CreateEventTriggerQuery) +-- | The table operations on which the event trigger will be invoked. data TriggerOpsDef = TriggerOpsDef { tdInsert :: !(Maybe SubscribeOpSpec) diff --git a/server/src-lib/Hasura/RQL/Types/Table.hs b/server/src-lib/Hasura/RQL/Types/Table.hs index 2443f7fe5b4..9525f1e3fa6 100644 --- a/server/src-lib/Hasura/RQL/Types/Table.hs +++ b/server/src-lib/Hasura/RQL/Types/Table.hs @@ -274,7 +274,13 @@ data EventTriggerInfo , etiOpsDef :: !TriggerOpsDef , etiRetryConf :: !RetryConf , etiWebhookInfo :: !WebhookConfInfo + -- ^ The HTTP(s) URL which will be called with the event payload on configured operation. + -- Must be a POST handler. This URL can be entered manually or can be picked up from an + -- environment variable (the environment variable needs to be set before using it for + -- this configuration). , etiHeaders :: ![EventHeaderInfo] + -- ^ Custom headers can be added to an event trigger. Each webhook request will have these + -- headers added. } deriving (Show, Eq, Generic) instance NFData EventTriggerInfo $(deriveToJSON (aesonDrop 3 snakeCase) ''EventTriggerInfo) diff --git a/server/src-lib/Hasura/Server/Init.hs b/server/src-lib/Hasura/Server/Init.hs index 2500661ba50..f01c39e5a3a 100644 --- a/server/src-lib/Hasura/Server/Init.hs +++ b/server/src-lib/Hasura/Server/Init.hs @@ -487,7 +487,8 @@ serveCmdFooter = , "Max event threads" ) , ( "HASURA_GRAPHQL_EVENTS_FETCH_INTERVAL" - , "Postgres events polling interval in milliseconds" + , "Interval in milliseconds to sleep before trying to fetch events again after a " + <> "fetch returned no events from postgres." ) ] diff --git a/server/src-rsr/catalog_version.txt b/server/src-rsr/catalog_version.txt index 1758dddccea..bb95160cb6e 100644 --- a/server/src-rsr/catalog_version.txt +++ b/server/src-rsr/catalog_version.txt @@ -1 +1 @@ -32 \ No newline at end of file +33 diff --git a/server/src-rsr/migrations/32_to_33.sql b/server/src-rsr/migrations/32_to_33.sql new file mode 100644 index 00000000000..e5287b6d5b3 --- /dev/null +++ b/server/src-rsr/migrations/32_to_33.sql @@ -0,0 +1 @@ +CREATE INDEX event_log_created_at_idx ON hdb_catalog.event_log (created_at); diff --git a/server/src-rsr/migrations/33_to_32.sql b/server/src-rsr/migrations/33_to_32.sql new file mode 100644 index 00000000000..77180012264 --- /dev/null +++ b/server/src-rsr/migrations/33_to_32.sql @@ -0,0 +1 @@ +DROP INDEX IF EXISTS hdb_catalog."event_log_created_at_idx"; diff --git a/server/tests-py/context.py b/server/tests-py/context.py index aa2aff5fbd0..cd6c1892392 100644 --- a/server/tests-py/context.py +++ b/server/tests-py/context.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 from http import HTTPStatus +from socketserver import ThreadingMixIn from urllib.parse import urlparse from ruamel.yaml.comments import CommentedMap as OrderedDict # to avoid '!!omap' in yaml import threading @@ -331,7 +332,14 @@ class EvtsWebhookHandler(http.server.BaseHTTPRequestHandler): "body": req_json, "headers": req_headers}) -class EvtsWebhookServer(http.server.HTTPServer): +# A very slightly more sane/performant http server. +# See: https://stackoverflow.com/a/14089457/176841 +# +# TODO use this elsewhere, or better yet: use e.g. bottle + waitress +class ThreadedHTTPServer(ThreadingMixIn, http.server.HTTPServer): + """Handle requests in a separate thread.""" + +class EvtsWebhookServer(ThreadedHTTPServer): def __init__(self, server_address): self.resp_queue = queue.Queue(maxsize=1) self.error_queue = queue.Queue() diff --git a/server/tests-py/test_events.py b/server/tests-py/test_events.py index 54f6c885cfc..f0e36887203 100755 --- a/server/tests-py/test_events.py +++ b/server/tests-py/test_events.py @@ -27,11 +27,14 @@ def select_last_event_fromdb(hge_ctx): def insert(hge_ctx, table, row, returning=[], headers = {}): + return insert_many(hge_ctx, table, [row], returning, headers) + +def insert_many(hge_ctx, table, rows, returning=[], headers = {}): q = { "type": "insert", "args": { "table": table, - "objects": [row], + "objects": rows, "returning": returning } } @@ -81,6 +84,34 @@ class TestCreateAndDelete: def dir(cls): return 'queries/event_triggers/create-delete' +# Smoke test for handling a backlog of events +@usefixtures("per_method_tests_db_state") +class TestEventFlood(object): + + @classmethod + def dir(cls): + return 'queries/event_triggers/basic' + + def test_flood(self, hge_ctx, evts_webhook): + table = {"schema": "hge_tests", "name": "test_t1"} + + payload = range(1,1001) + rows = list(map(lambda x: {"c1": x, "c2": "hello"}, payload)) + st_code, resp = insert_many(hge_ctx, table, rows) + assert st_code == 200, resp + + def get_evt(): + # TODO ThreadedHTTPServer helps locally (I only need a timeout of + # 10 here), but we still need a bit of a long timeout here for CI + # it seems, since webhook can't keep up there: + ev_full = evts_webhook.get_event(600) + return ev_full['body']['event']['data']['new']['c1'] + # Make sure we got all payloads (probably out of order): + ns = list(map(lambda _: get_evt(), payload)) + ns.sort() + assert ns == list(payload) + + @usefixtures("per_method_tests_db_state") class TestCreateEvtQuery(object):