server: restore proper batching behavior in event trigger processing (#1237)

This essentially restores the original code from c425b554b8
(https://github.com/hasura/graphql-engine/pull/4013). Prior to this
commit we would slurp messages as fast as possible from the database
(one thing c425b55 fixed).

Another thing broken as a consequence of the same logic was the
removeEventFromLockedEvents logic which unlocks in-flight events
(breaking at-least-once delivery)

Some archeology, post-c425b55:

- cc8e2ccc erroneously attempted to refactor using `bracket`, resulting
  in the same slurp-all-events behavior (since we don't ever wait for
  processEvent to complete)
- at some point event processing within a batch is made serial, this
  reported as a bug. See: https://github.com/hasura/graphql-engine/issues/5189
- in 0ef52292b5 (which I approved...) an `async` is added, again
  causing the same issue...

GitOrigin-RevId: d8cbaab385267a4c3f1f173e268a385265980fb1
This commit is contained in:
Brandon Simmons 2021-04-29 00:01:06 -04:00 committed by hasura-bot
parent 507d3aac2c
commit 9c9bb43a53
9 changed files with 179 additions and 39 deletions

View File

@ -8,6 +8,7 @@
(Add entries below in the order of: server, console, cli, docs, others)
- server: aggregation fields are now supported on mssql
- server: accept a new server config flag `--events-fetch-batch-size` to configure the number of rows being fetched from the events log table in a single batch
- server: restore proper batching behavior in event trigger processing so that at most 200 events are checked out at a time
- server: fix regression: `on_conflict` was missing in the schema for inserts in tables where the current user has no columns listed in their update permissions (fix #6804)
- server: fix one-to-one relationship bug (introduced in #459) which prevented adding one-to-one relationships which didn't have the same column name for target and source
- console: fix Postgres table creation when table has a non-lowercase name and a comment (#6760)

View File

@ -428,8 +428,11 @@ elif [ "$MODE" = "test" ]; then
fi
if [ "$RUN_HLINT" = true ]; then
cd "$PROJECT_ROOT/server"
hlint src-*
if command -v hlint >/dev/null; then
(cd "$PROJECT_ROOT/server" && hlint src-*)
else
echo_warn "hlint is not installed: skipping"
fi
fi
if [ "$RUN_INTEGRATION_TESTS" = true ]; then

View File

@ -58,7 +58,7 @@ import qualified System.Metrics.Gauge as EKG.Gauge
import Control.Concurrent.Extended (sleep)
import Control.Concurrent.STM.TVar
import Control.Monad.Catch (MonadMask, bracket_)
import Control.Monad.Catch (MonadMask, bracket_, finally, mask_)
import Control.Monad.STM
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson
@ -203,6 +203,10 @@ initEventEngineCtx maxT _eeCtxFetchInterval _eeCtxFetchSize = do
_eeCtxEventThreadsCapacity <- newTVar maxT
return $ EventEngineCtx{..}
-- | The event payload processed by 'processEvent'
--
-- The 'Time.UTCTime' represents the time when the event was fetched from DB.
-- Used to calculate Event Lock time
type EventWithSource b = (Event, SourceConfig b, Time.UTCTime)
-- | Service events from our in-DB queue.
@ -232,7 +236,7 @@ processEventQueue
-> ServerMetrics
-> MaintenanceMode
-> m void
processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..} LockedEventsCtx{leEvents} serverMetrics maintenanceMode = do
processEventQueue logger logenv httpMgr getSchemaCache EventEngineCtx{..} LockedEventsCtx{leEvents} serverMetrics maintenanceMode = do
events0 <- popEventsBatch
-- Track number of events fetched in EKG
_ <- liftIO $ EKG.Distribution.add (smNumEventsFetched serverMetrics) (fromIntegral $ length events0)
@ -279,6 +283,11 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..}
saveLockedEvents (map eId events) leEvents
return $ map (, sourceConfig, eventsFetchedTime) events
-- !!! CAREFUL !!!
-- The logic here in particular is subtle and has been fixed, broken,
-- and fixed again in several different ways, several times.
-- !!! CAREFUL !!!
--
-- work on this batch of events while prefetching the next. Recurse after we've forked workers
-- for each in the batch, minding the requested pool size.
go :: [EventWithSource ('Postgres 'Vanilla)] -> Int -> Bool -> m void
@ -291,15 +300,25 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..}
-- worth the effort for something more fine-tuned
eventsNext <- LA.withAsync popEventsBatch $ \eventsNextA -> do
-- process approximately in order, minding HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE:
forM_ events $ \(event, sourceConfig, eventFetchedTime) -> do
t <- processEvent event sourceConfig eventFetchedTime
& withEventEngineCtx eeCtx
& flip runReaderT (logger, httpMgr)
& LA.async
-- removing an event from the _eeCtxLockedEvents after the event has
-- been processed
removeEventFromLockedEvents (eId event) leEvents
LA.link t
forM_ events $ \eventWithSource ->
-- NOTE: we implement a logical bracket pattern here with the
-- increment and decrement of _eeCtxEventThreadsCapacity which
-- depends on not putting anything that can throw in the body here:
mask_ $ do
liftIO $ atomically $ do -- block until < HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE threads:
capacity <- readTVar _eeCtxEventThreadsCapacity
check $ capacity > 0
writeTVar _eeCtxEventThreadsCapacity (capacity - 1)
-- since there is some capacity in our worker threads, we can launch another:
let restoreCapacity = liftIO $ atomically $
modifyTVar' _eeCtxEventThreadsCapacity (+ 1)
t <- LA.async $ flip runReaderT (logger, httpMgr) $
processEvent eventWithSource `finally`
-- NOTE!: this needs to happen IN THE FORKED THREAD:
restoreCapacity
LA.link t
-- return when next batch ready; some 'processEvent' threads may be running.
LA.wait eventsNextA
let lenEvents = length events
@ -331,12 +350,9 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..}
, Tracing.HasReporter io
, MonadMask io
)
=> Event
-> SourceConfig ('Postgres 'Vanilla)
-> Time.UTCTime
-- ^ Time when the event was fetched from DB. Used to calculate Event Lock time
=> EventWithSource ('Postgres 'Vanilla)
-> io ()
processEvent e sourceConfig eventFetchedTime= do
processEvent (e, sourceConfig, eventFetchedTime) = do
-- Track Lock Time of Event
-- Lock Time = Time when the event was fetched from DB - Time when the event is being processed
eventProcessTime <- liftIO getCurrentTime
@ -398,23 +414,8 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..}
(processError sourceConfig e retryConf decodedHeaders ep maintenanceModeVersion)
(processSuccess sourceConfig e decodedHeaders ep maintenanceModeVersion) res
>>= flip onLeft logQErr
withEventEngineCtx ::
( MonadIO m
, MonadMask m
)
=> EventEngineCtx -> m () -> m ()
withEventEngineCtx eeCtx = bracket_ (decrementThreadCount eeCtx) (incrementThreadCount eeCtx)
incrementThreadCount :: MonadIO m => EventEngineCtx -> m ()
incrementThreadCount (EventEngineCtx c _ _) = liftIO $ atomically $ modifyTVar' c (+1)
decrementThreadCount :: MonadIO m => EventEngineCtx -> m ()
decrementThreadCount (EventEngineCtx c _ _) = liftIO $ atomically $ do
countThreads <- readTVar c
if countThreads > 0
then modifyTVar' c (\v -> v - 1)
else retry
-- removing an event from the _eeCtxLockedEvents after the event has been processed:
removeEventFromLockedEvents (eId e) leEvents
createEventPayload :: RetryConf -> Event -> EventPayload
createEventPayload retryConf e = EventPayload

View File

@ -428,7 +428,7 @@ serveCmdFooter =
eventsHttpPoolSizeEnv :: (String, String)
eventsHttpPoolSizeEnv =
( "HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE"
, "Max event threads"
, "Max event processing threads (default: 100)"
)
eventsFetchIntervalEnv :: (String, String)

View File

@ -371,6 +371,18 @@ class EvtsWebhookHandler(http.server.BaseHTTPRequestHandler):
time.sleep(2)
self.send_response(HTTPStatus.NO_CONTENT)
self.end_headers()
# This is like a sleep endpoint above, but allowing us to decide
# externally when the webhook can return, with unblock()
elif req_path == "/block":
if not self.server.unblocked:
self.server.blocked_count += 1
with self.server.unblocked_wait:
# We expect this timeout never to be reached, but if
# something goes wrong the main thread will block forever:
self.server.unblocked_wait.wait(timeout=60)
self.server.blocked_count -= 1
self.send_response(HTTPStatus.NO_CONTENT)
self.end_headers()
else:
self.send_response(HTTPStatus.NO_CONTENT)
self.end_headers()
@ -390,8 +402,23 @@ class EvtsWebhookServer(ThreadedHTTPServer):
def __init__(self, server_address):
# Data received from hasura by our web hook, pushed after it returns to the client:
self.resp_queue = queue.Queue()
# We use these two vars to coordinate unblocking in the /block route
self.unblocked = False
self.unblocked_wait = threading.Condition()
# ...and this for bookkeeping open blocked requests; this becomes
# meaningless after the first call to unblock()
self.blocked_count = 0
super().__init__(server_address, EvtsWebhookHandler)
# Unblock all webhook requests to /block. Idempotent.
def unblock(self):
self.unblocked = True
with self.unblocked_wait:
# NOTE: this only affects currently wait()-ing threads, future
# wait()s will block again (hence the simple self.unblocked flag)
self.unblocked_wait.notify_all()
def server_bind(self):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address)

View File

@ -0,0 +1,36 @@
# This is a fork of tests-py/queries/event_triggers/basic
# but using the /block endpoint as webhook
type: bulk
args:
- type: run_sql
args:
sql: |
create table hge_tests.test_flood(
c1 int,
c2 text
);
- type: track_table
args:
schema: hge_tests
name: test_flood
- type: create_event_trigger
args:
name: flood_all
table:
schema: hge_tests
name: test_flood
insert:
columns: '*'
update:
columns: '*'
delete:
columns: '*'
webhook: http://127.0.0.1:5592/block
retry_conf:
# We will manuallally unblock() before this is reached:
timeout_sec: 60
num_retries: 0
interval_sec: 1

View File

@ -0,0 +1,9 @@
type: bulk
args:
- type: delete_event_trigger
args:
name: flood_all
- type: run_sql
args:
sql: |
drop table hge_tests.test_flood;

View File

@ -3,6 +3,7 @@
import pytest
import queue
import time
import utils
from validate import check_query_f, check_event
usefixtures = pytest.mark.usefixtures
@ -84,22 +85,65 @@ class TestCreateAndDelete:
def dir(cls):
return 'queries/event_triggers/create-delete'
# Smoke test for handling a backlog of events
# Generates a backlog of events, then:
# - checks that we're processing with the concurrency and backpressure
# characteristics we expect
# - ensures all events are successfully processed
#
# NOTE: this expects:
# HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE=8
# HASURA_GRAPHQL_EVENTS_FETCH_BATCH_SIZE=100 (the default)
@usefixtures("per_method_tests_db_state")
class TestEventFlood(object):
@classmethod
def dir(cls):
return 'queries/event_triggers/basic'
return 'queries/event_triggers/flood'
def test_flood(self, hge_ctx, evts_webhook):
table = {"schema": "hge_tests", "name": "test_t1"}
table = {"schema": "hge_tests", "name": "test_flood"}
# Trigger a bunch of events; hasura will begin processing but block on /block
payload = range(1,1001)
rows = list(map(lambda x: {"c1": x, "c2": "hello"}, payload))
st_code, resp = insert_many(hge_ctx, table, rows)
assert st_code == 200, resp
def check_backpressure():
# Expect that HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE webhooks are pending:
assert evts_webhook.blocked_count == 8
# ...Great, so presumably:
# - event handlers are run concurrently
# - with concurrency limited by HASURA_GRAPHQL_EVENTS_HTTP_POOL_SIZE
locked_counts = {
"type":"run_sql",
"args":{
"sql":'''
select
(select count(*) from hdb_catalog.event_log where locked IS NOT NULL) as num_locked,
count(*) as total
from hdb_catalog.event_log
where table_name = 'test_flood'
'''
}
}
st, resp = hge_ctx.v1q(locked_counts)
assert st == 200, resp
# Make sure we have 2*HASURA_GRAPHQL_EVENTS_FETCH_BATCH_SIZE events checked out:
# - 100 prefetched
# - 100 being processed right now (but blocked on HTTP_POOL capacity)
assert resp['result'][1] == ['200', '1000']
# Rather than sleep arbitrarily, loop until assertions pass:
utils.until_asserts_pass(30, check_backpressure)
# ...then make sure we're truly stable:
time.sleep(3)
check_backpressure()
# unblock open and future requests to /block; check all events processed
evts_webhook.unblock()
def get_evt():
# TODO ThreadedHTTPServer helps locally (I only need a timeout of
# 10 here), but we still need a bit of a long timeout here for CI

19
server/tests-py/utils.py Normal file
View File

@ -0,0 +1,19 @@
# Various testing utility functions
import time
# Loop a function 'tries' times, until all assertions pass. With a 0.3 second
# pause after each. This re-raises AssertionError in case we run out of tries
def until_asserts_pass(tries, func):
for x in range(0, tries):
print(x)
if x == tries-1:
# last time; raise any assertions in caller:
func()
else:
try:
func()
break
except AssertionError:
time.sleep(0.3)
pass