server: accept new env var HASURA_GRAPHQL_EVENTS_FETCH_BATCH_SIZE

GitOrigin-RevId: 62c463d3ee754ce9f05ba09afa8cd74ca807a96c
This commit is contained in:
Karthikeyan Chinnakonda 2021-04-27 22:52:54 +05:30 committed by hasura-bot
parent f50f61ab6a
commit bc1e131717
6 changed files with 60 additions and 17 deletions

View File

@ -7,6 +7,7 @@
(Add entries below in the order of: server, console, cli, docs, others)
- server: accept a new server config flag `--events-fetch-batch-size` to configure the number of rows being fetched from the events log table in a single batch
- server: fix regression: `on_conflict` was missing in the schema for inserts in tables where the current user has no columns listed in their update permissions (fix #6804)
- server: fix one-to-one relationship bug (introduced in #459) which prevented adding one-to-one relationships which didn't have the same column name for target and source
- console: fix Postgres table creation when table has a non-lowercase name and a comment (#6760)

View File

@ -160,6 +160,10 @@ For the ``serve`` sub-command these are the available flags and ENV variables:
- Interval in milliseconds to sleep before trying to fetch events again after a fetch
returned no events from postgres
* - ``--events-fetch-batch-size``
- ``HASURA_GRAPHQL_EVENTS_FETCH_BATCH_SIZE``
- Maximum number of events to be fetched from the DB in a single batch (default: 100)
* - ``--async-actions-fetch-interval``
- ``HASURA_GRAPHQL_ASYNC_ACTIONS_FETCH_INTERVAL``
- Interval in milliseconds to sleep before trying to fetch async actions again after a fetch

View File

@ -525,11 +525,13 @@ runHGEServer setupHook env ServeOptions{..} ServeCtx{..} initTime postPollHook s
shutdownEvents allPgSources
(\a b -> hoist lowerIO (unlockScheduledEvents a b)) logger lockedEventsCtx)
-- prepare event triggers data
eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI
unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers"
_eventQueueThread <- C.forkManagedT "processEventQueue" logger $
unless (getNonNegativeInt soEventsFetchBatchSize == 0 || soEventsFetchInterval == Just 0) $ do
-- Don't start the events poller thread when fetchBatchSize or fetchInterval is 0
-- prepare event triggers data
eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI soEventsFetchBatchSize
unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers"
void $ C.forkManagedT "processEventQueue" logger $
processEventQueue logger
logEnvHeaders
_scHttpManager

View File

@ -35,6 +35,7 @@ module Hasura.Eventing.EventTrigger
, processEventQueue
, defaultMaxEventThreads
, defaultFetchInterval
, defaultFetchBatchSize
, Event(..)
, unlockEvents
, EventEngineCtx(..)
@ -155,6 +156,7 @@ data EventEngineCtx
= EventEngineCtx
{ _eeCtxEventThreadsCapacity :: TVar Int
, _eeCtxFetchInterval :: DiffTime
, _eeCtxFetchSize :: NonNegativeInt
}
data DeliveryInfo
@ -193,8 +195,11 @@ defaultMaxEventThreads = 100
defaultFetchInterval :: DiffTime
defaultFetchInterval = seconds 1
initEventEngineCtx :: Int -> DiffTime -> STM EventEngineCtx
initEventEngineCtx maxT _eeCtxFetchInterval = do
defaultFetchBatchSize :: NonNegativeInt
defaultFetchBatchSize = unsafeNonNegativeInt 100
initEventEngineCtx :: Int -> DiffTime -> NonNegativeInt -> STM EventEngineCtx
initEventEngineCtx maxT _eeCtxFetchInterval _eeCtxFetchSize = do
_eeCtxEventThreadsCapacity <- newTVar maxT
return $ EventEngineCtx{..}
@ -233,7 +238,7 @@ processEventQueue logger logenv httpMgr getSchemaCache eeCtx@EventEngineCtx{..}
_ <- liftIO $ EKG.Distribution.add (smNumEventsFetched serverMetrics) (fromIntegral $ length events0)
go events0 0 False
where
fetchBatchSize = 100
fetchBatchSize = getNonNegativeInt _eeCtxFetchSize
popEventsBatch :: m [EventWithSource ('Postgres 'Vanilla)]
popEventsBatch = do
@ -402,10 +407,10 @@ withEventEngineCtx ::
withEventEngineCtx eeCtx = bracket_ (decrementThreadCount eeCtx) (incrementThreadCount eeCtx)
incrementThreadCount :: MonadIO m => EventEngineCtx -> m ()
incrementThreadCount (EventEngineCtx c _) = liftIO $ atomically $ modifyTVar' c (+1)
incrementThreadCount (EventEngineCtx c _ _) = liftIO $ atomically $ modifyTVar' c (+1)
decrementThreadCount :: MonadIO m => EventEngineCtx -> m ()
decrementThreadCount (EventEngineCtx c _) = liftIO $ atomically $ do
decrementThreadCount (EventEngineCtx c _ _) = liftIO $ atomically $ do
countThreads <- readTVar c
if countThreads > 0
then modifyTVar' c (\v -> v - 1)

View File

@ -28,6 +28,7 @@ import qualified Hasura.GraphQL.Execute.Plan as E
import qualified Hasura.Logging as L
import Hasura.Backends.Postgres.Connection
import Hasura.Eventing.EventTrigger (defaultFetchBatchSize)
import Hasura.Prelude
import Hasura.RQL.Types
import Hasura.Server.Auth
@ -210,6 +211,10 @@ mkServeOptions rso = do
bool MaintenanceModeDisabled MaintenanceModeEnabled
<$> withEnvBool (rsoEnableMaintenanceMode rso) (fst maintenanceModeEnv)
eventsFetchBatchSize <-
fromMaybe defaultFetchBatchSize
<$> withEnv (rsoEventsFetchBatchSize rso) (fst eventsFetchBatchSizeEnv)
pure $ ServeOptions
port
host
@ -243,6 +248,7 @@ mkServeOptions rso = do
maintenanceMode
schemaPollInterval
experimentalFeatures
eventsFetchBatchSize
where
#ifdef DeveloperAPIs
defaultAPIs = [METADATA,GRAPHQL,PGDUMP,CONFIG,DEVELOPER]
@ -434,6 +440,12 @@ eventsFetchIntervalEnv =
, "Interval in milliseconds to sleep before trying to fetch events again after a fetch returned no events from postgres."
)
eventsFetchBatchSizeEnv :: (String, String)
eventsFetchBatchSizeEnv =
( "HASURA_GRAPHQL_EVENTS_FETCH_BATCH_SIZE"
, "The maximum number of events to be fetched from the events table in a single batch. Default 100"
)
asyncActionsFetchIntervalEnv :: (String, String)
asyncActionsFetchIntervalEnv =
( "HASURA_GRAPHQL_ASYNC_ACTIONS_FETCH_INTERVAL"
@ -998,6 +1010,14 @@ parseGraphqlEventsFetchInterval = optional $
help (snd eventsFetchIntervalEnv)
)
parseEventsFetchBatchSize :: Parser (Maybe NonNegativeInt)
parseEventsFetchBatchSize = optional $
option (eitherReader readNonNegativeInt)
( long "events-fetch-batch-size" <>
metavar (fst eventsFetchBatchSizeEnv) <>
help (snd eventsFetchBatchSizeEnv)
)
parseGraphqlAsyncActionsFetchInterval :: Parser (Maybe Milliseconds)
parseGraphqlAsyncActionsFetchInterval = optional $
option (eitherReader readEither)
@ -1155,6 +1175,7 @@ serveOptsToLog so =
, "infer_function_permissions" J..= soInferFunctionPermissions so
, "enable_maintenance_mode" J..= soEnableMaintenanceMode so
, "experimental_features" J..= soExperimentalFeatures so
, "events_fetch_batch_size" J..= soEventsFetchBatchSize so
]
mkGenericStrLog :: L.LogLevel -> Text -> String -> StartupLog
@ -1209,6 +1230,7 @@ serveOptionsParser =
<*> parseEnableMaintenanceMode
<*> parseSchemaPollInterval
<*> parseExperimentalFeatures
<*> parseEventsFetchBatchSize
-- | This implements the mapping between application versions
-- and catalog schema versions.

View File

@ -104,6 +104,7 @@ data RawServeOptions impl
, rsoEnableMaintenanceMode :: !Bool
, rsoSchemaPollInterval :: !(Maybe Milliseconds)
, rsoExperimentalFeatures :: !(Maybe [ExperimentalFeature])
, rsoEventsFetchBatchSize :: !(Maybe NonNegativeInt)
}
-- | @'ResponseInternalErrorsConfig' represents the encoding of the internal
@ -160,6 +161,7 @@ data ServeOptions impl
, soEnableMaintenanceMode :: !MaintenanceMode
, soSchemaPollInterval :: !OptionalInterval
, soExperimentalFeatures :: !(Set.HashSet ExperimentalFeature)
, soEventsFetchBatchSize :: !NonNegativeInt
}
data DowngradeOptions
@ -270,6 +272,10 @@ readIsoLevel isoS =
"serializable" -> return Q.Serializable
_ -> Left "Only expecting read-committed / repeatable-read / serializable"
readNonNegativeInt :: String -> Either String NonNegativeInt
readNonNegativeInt s =
onNothing (mkNonNegativeInt =<< readMaybe s) $ Left "Only expecting a non negative integer"
readAPIs :: String -> Either String [API]
readAPIs = mapM readAPI . T.splitOn "," . T.pack
where readAPI si = case T.toUpper $ T.strip si of
@ -371,6 +377,9 @@ instance FromEnv Cache.CacheSize where
instance FromEnv URLTemplate where
fromEnv = parseURLTemplate . T.pack
instance FromEnv NonNegativeInt where
fromEnv = readNonNegativeInt
type WithEnv a = ReaderT Env (ExceptT String Identity) a
runWithEnv :: Env -> WithEnv a -> Either String a