graphql-engine/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

1221 lines
49 KiB
Haskell
Raw Normal View History

{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE QuasiQuotes #-}
-- |
-- = Scheduled Triggers
--
-- This module implements the functionality of invoking webhooks during specified
-- time events aka scheduled events. The scheduled events are the events generated
-- by the graphql-engine using the cron triggers or/and a scheduled event can
-- be created by the user at a specified time with the payload, webhook, headers
-- and the retry configuration. Scheduled events are modeled using rows in Postgres
-- with a @timestamp@ column.
--
-- This module implements scheduling and delivery of scheduled
-- events:
--
-- 1. Scheduling a cron event involves creating new cron events. New
-- cron events are created based on the cron schedule and the number of
-- scheduled events that are already present in the scheduled events buffer.
-- The graphql-engine computes the new scheduled events and writes them to
-- the database.(Generator)
--
-- 2. Delivering a scheduled event involves reading undelivered scheduled events
-- from the database and delivering them to the webhook server. (Processor)
--
-- The rationale behind separating the event scheduling and event delivery
-- mechanism into two different threads is that the scheduling and delivering of
-- the scheduled events are not directly dependent on each other. The generator
-- will almost always try to create scheduled events which are supposed to be
-- delivered in the future (timestamp > current_timestamp) and the processor
-- will fetch scheduled events of the past (timestamp < current_timestamp). So,
-- the set of the scheduled events generated by the generator and the processor
-- will never be the same. The point here is that they're not correlated to each
-- other. They can be split into different threads for a better performance.
--
-- == Implementation
--
-- The scheduled triggers eventing is being implemented in the metadata storage.
-- All functions that make interaction to storage system are abstracted in
-- the @'MonadMetadataStorage' class.
--
-- During the startup, two threads are started:
--
-- 1. Generator: Fetches the list of scheduled triggers from cache and generates
-- the scheduled events.
--
-- - Additional events will be generated only if there are fewer than 100
-- scheduled events.
--
-- - The upcoming events timestamp will be generated using:
--
-- - cron schedule of the scheduled trigger
--
-- - max timestamp of the scheduled events that already exist or
-- current_timestamp(when no scheduled events exist)
--
-- - The timestamp of the scheduled events is stored with timezone because
-- `SELECT NOW()` returns timestamp with timezone, so it's good to
-- compare two things of the same type.
--
-- This effectively corresponds to doing an INSERT with values containing
-- specific timestamp.
--
-- 2. Processor: Fetches the undelivered cron events and the scheduled events
-- from the database and which have timestamp lesser than the
-- current timestamp and then process them.
--
-- TODO
-- - Consider and document ordering guarantees
-- - do we have any in the presence of multiple hasura instances?
-- - If we have nothing useful to say about ordering, then consider processing
-- events asynchronously, so that a slow webhook doesn't cause everything
-- subsequent to be delayed
module Hasura.Eventing.ScheduledTrigger
( runCronEventsGenerator,
processScheduledTriggers,
generateScheduleTimes,
CronEventSeed (..),
LockedEventsCtx (..),
-- * Cron trigger stats logger
createFetchedCronTriggerStatsLogger,
closeFetchedCronTriggersStatsLogger,
-- * Scheduled events stats logger
createFetchedScheduledEventsStatsLogger,
closeFetchedScheduledEventsStatsLogger,
-- * Database interactions
-- Following function names are similar to those present in
-- 'MonadMetadataStorage' type class. To avoid duplication,
-- 'Tx' is suffixed to identify as database transactions
getDeprivedCronTriggerStatsTx,
getScheduledEventsForDeliveryTx,
insertInvocationTx,
setScheduledEventOpTx,
unlockScheduledEventsTx,
unlockAllLockedScheduledEventsTx,
insertCronEventsTx,
insertOneOffScheduledEventTx,
dropFutureCronEventsTx,
getOneOffScheduledEventsTx,
getCronEventsTx,
deleteScheduledEventTx,
getScheduledEventInvocationsTx,
getScheduledEventsInvocationsQuery,
getScheduledEventsInvocationsQueryNoPagination,
-- * Export utility functions which are useful to build
-- SQLs for fetching data from metadata storage
mkScheduledEventStatusFilter,
scheduledTimeOrderBy,
executeWithOptionalTotalCount,
mkPaginationSelectExp,
withCount,
invocationFieldExtractors,
mkEventIdBoolExp,
EventTables (..),
)
where
import Control.Concurrent.Async.Lifted (forConcurrently_)
import Control.Concurrent.Extended (Forever (..), sleep)
import Control.Concurrent.STM
import Control.Lens (preview)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson qualified as J
import Data.Environment qualified as Env
import Data.Has
import Data.HashMap.Strict qualified as Map
import Data.Int (Int64)
import Data.List.NonEmpty qualified as NE
import Data.SerializableBlob qualified as SB
import Data.Set qualified as Set
import Data.Text qualified as T
import Data.Text.Extended (ToTxt (..), (<<>))
import Data.These
import Data.Time.Clock
import Data.URL.Template (printURLTemplate)
import Database.PG.Query qualified as PG
import Hasura.Backends.Postgres.Execute.Types
import Hasura.Backends.Postgres.SQL.DML qualified as S
import Hasura.Backends.Postgres.SQL.Types
import Hasura.Base.Error
import Hasura.Eventing.Common
import Hasura.Eventing.HTTP
import Hasura.Eventing.ScheduledTrigger.Types
import Hasura.HTTP (getHTTPExceptionStatus)
import Hasura.Logging qualified as L
import Hasura.Metadata.Class
2020-08-27 19:36:39 +03:00
import Hasura.Prelude
import Hasura.RQL.DDL.EventTrigger (ResolveHeaderError, getHeaderInfosFromConfEither)
import Hasura.RQL.DDL.Headers
import Hasura.RQL.DDL.Webhook.Transform
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.EventTrigger
import Hasura.RQL.Types.Eventing
import Hasura.RQL.Types.ScheduledTrigger
import Hasura.RQL.Types.SchemaCache
import Hasura.SQL.Types
import Hasura.Server.Prometheus (ScheduledTriggerMetrics (..))
import Hasura.Tracing qualified as Tracing
import Network.HTTP.Client.Transformable qualified as HTTP
import Refined (unrefine)
import System.Metrics.Prometheus.Counter as Prometheus.Counter
import System.Timeout.Lifted (timeout)
import Text.Builder qualified as TB
-- | runCronEventsGenerator makes sure that all the cron triggers
-- have an adequate buffer of cron events.
runCronEventsGenerator ::
( MonadIO m,
MonadMetadataStorage m
) =>
L.Logger L.Hasura ->
FetchedCronTriggerStatsLogger ->
IO SchemaCache ->
m void
runCronEventsGenerator logger cronTriggerStatsLogger getSC = do
forever $ do
sc <- liftIO getSC
-- get cron triggers from cache
let cronTriggersCache = scCronTriggers sc
unless (Map.null cronTriggersCache) $ do
-- Poll the DB only when there's at-least one cron trigger present
-- in the schema cache
-- get cron trigger stats from db
-- When shutdown is initiated, we stop generating new cron events
eitherRes <- runExceptT $ do
deprivedCronTriggerStats <- liftEitherM $ getDeprivedCronTriggerStats $ Map.keys cronTriggersCache
-- Log fetched deprived cron trigger stats
logFetchedCronTriggersStats cronTriggerStatsLogger deprivedCronTriggerStats
-- join stats with cron triggers and produce @[(CronTriggerInfo, CronTriggerStats)]@
cronTriggersForHydrationWithStats <-
catMaybes
<$> mapM (withCronTrigger cronTriggersCache) deprivedCronTriggerStats
insertCronEventsFor cronTriggersForHydrationWithStats
onLeft eitherRes $ L.unLogger logger . ScheduledTriggerInternalErr
-- See discussion: https://github.com/hasura/graphql-engine-mono/issues/1001
liftIO $ sleep (minutes 1)
where
withCronTrigger cronTriggerCache cronTriggerStat = do
case Map.lookup (_ctsName cronTriggerStat) cronTriggerCache of
Nothing -> do
L.unLogger logger $
ScheduledTriggerInternalErr $
err500 Unexpected "could not find scheduled trigger in the schema cache"
pure Nothing
Just cronTrigger ->
pure $
Just (cronTrigger, cronTriggerStat)
insertCronEventsFor ::
(MonadMetadataStorage m, MonadError QErr m) =>
[(CronTriggerInfo, CronTriggerStats)] ->
m ()
insertCronEventsFor cronTriggersWithStats = do
let scheduledEvents = flip concatMap cronTriggersWithStats $ \(cti, stats) ->
generateCronEventsFrom (_ctsMaxScheduledTime stats) cti
case scheduledEvents of
[] -> pure ()
events -> liftEitherM $ insertCronEvents events
generateCronEventsFrom :: UTCTime -> CronTriggerInfo -> [CronEventSeed]
generateCronEventsFrom startTime CronTriggerInfo {..} =
map (CronEventSeed ctiName) $
-- generate next 100 events; see getDeprivedCronTriggerStatsTx:
generateScheduleTimes startTime 100 ctiSchedule
-- | `upperBoundScheduledEventTimeout` is the maximum amount of time
-- a scheduled event can take to process. This function is intended
-- to use with a timeout.
upperBoundScheduledEventTimeout :: DiffTime
upperBoundScheduledEventTimeout = minutes 30
processCronEvents ::
( MonadIO m,
Rewrite `Tracing` to allow for only one `TraceT` in the entire stack. This PR is on top of #7789. ### Description This PR entirely rewrites the API of the Tracing library, to make `interpTraceT` a thing of the past. Before this change, we ran traces by sticking a `TraceT` on top of whatever we were doing. This had several major drawbacks: - we were carrying a bunch of `TraceT` across the codebase, and the entire codebase had to know about it - we needed to carry a second class constraint around (`HasReporterM`) to be able to run all of those traces - we kept having to do stack rewriting with `interpTraceT`, which went from inconvenient to horrible - we had to declare several behavioral instances on `TraceT m` This PR rewrite all of `Tracing` using a more conventional model: there is ONE `TraceT` at the bottom of the stack, and there is an associated class constraint `MonadTrace`: any part of the code that happens to satisfy `MonadTrace` is able to create new traces. We NEVER have to do stack rewriting, `interpTraceT` is gone, and `TraceT` and `Reporter` become implementation details that 99% of the code is blissfully unaware of: code that needs to do tracing only needs to declare that the monad in which it operates implements `MonadTrace`. In doing so, this PR revealed **several bugs in the codebase**: places where we were expecting to trace something, but due to the default instance of `HasReporterM IO` we would actually not do anything. This PR also splits the code of `Tracing` in more byte-sized modules, with the goal of potentially moving to `server/lib` down the line. ### Remaining work This PR is a draft; what's left to do is: - [x] make Pro compile; i haven't updated `HasuraPro/Main` yet - [x] document Tracing by writing a note that explains how to use the library, and the meaning of "reporter", "trace" and "span", as well as the pitfalls - [x] discuss some of the trade-offs in the implementation, which is why i'm opening this PR already despite it not fully building yet - [x] it depends on #7789 being merged first PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7791 GitOrigin-RevId: cadd32d039134c93ddbf364599a2f4dd988adea8
2023-03-13 20:37:16 +03:00
MonadMetadataStorage m,
Tracing.MonadTrace m,
MonadBaseControl IO m
) =>
L.Logger L.Hasura ->
HTTP.Manager ->
ScheduledTriggerMetrics ->
[CronEvent] ->
HashMap TriggerName CronTriggerInfo ->
TVar (Set.Set CronEventId) ->
m ()
processCronEvents logger httpMgr scheduledTriggerMetrics cronEvents cronTriggersInfo lockedCronEvents = do
-- save the locked cron events that have been fetched from the
-- database, the events stored here will be unlocked in case a
-- graceful shutdown is initiated in midst of processing these events
saveLockedEvents (map _ceId cronEvents) lockedCronEvents
-- The `createdAt` of a cron event is the `created_at` of the cron trigger
forConcurrently_ cronEvents $ \(CronEvent id' name st _ tries _ _) -> do
case Map.lookup name cronTriggersInfo of
Nothing ->
logInternalError $
err500 Unexpected $
"could not find cron trigger " <> name <<> " in the schema cache"
Just CronTriggerInfo {..} -> do
let payload =
ScheduledEventWebhookPayload
id'
(Just name)
st
(fromMaybe J.Null ctiPayload)
ctiComment
Nothing
ctiRequestTransform
ctiResponseTransform
retryCtx = RetryContext tries ctiRetryConf
eventProcessingTimeout = min upperBoundScheduledEventTimeout (unrefine $ strcTimeoutSeconds $ ctiRetryConf)
processScheduledEventAction =
runExceptT $
flip runReaderT (logger, httpMgr) $
processScheduledEvent
scheduledTriggerMetrics
id'
ctiHeaders
retryCtx
payload
ctiWebhookInfo
Cron
eventProcessedMaybe <-
timeout (fromInteger (diffTimeToMicroSeconds eventProcessingTimeout)) $ processScheduledEventAction
case eventProcessedMaybe of
Nothing -> do
let eventTimeoutMessage = "Cron Scheduled event " <> id' <<> " of cron trigger " <> name <<> " timed out while processing."
eventTimeoutError = err500 TimeoutErrorCode eventTimeoutMessage
logInternalError eventTimeoutError
runExceptT (processError id' retryCtx [] Cron (mkErrorObject eventTimeoutMessage) (HOther $ T.unpack eventTimeoutMessage) scheduledTriggerMetrics)
>>= (`onLeft` logInternalError)
Just finally -> onLeft finally logInternalError
removeEventFromLockedEvents id' lockedCronEvents
where
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
mkErrorObject :: Text -> J.Value
mkErrorObject errorMessage =
J.object $ ["error" J..= errorMessage]
processOneOffScheduledEvents ::
( MonadIO m,
Rewrite `Tracing` to allow for only one `TraceT` in the entire stack. This PR is on top of #7789. ### Description This PR entirely rewrites the API of the Tracing library, to make `interpTraceT` a thing of the past. Before this change, we ran traces by sticking a `TraceT` on top of whatever we were doing. This had several major drawbacks: - we were carrying a bunch of `TraceT` across the codebase, and the entire codebase had to know about it - we needed to carry a second class constraint around (`HasReporterM`) to be able to run all of those traces - we kept having to do stack rewriting with `interpTraceT`, which went from inconvenient to horrible - we had to declare several behavioral instances on `TraceT m` This PR rewrite all of `Tracing` using a more conventional model: there is ONE `TraceT` at the bottom of the stack, and there is an associated class constraint `MonadTrace`: any part of the code that happens to satisfy `MonadTrace` is able to create new traces. We NEVER have to do stack rewriting, `interpTraceT` is gone, and `TraceT` and `Reporter` become implementation details that 99% of the code is blissfully unaware of: code that needs to do tracing only needs to declare that the monad in which it operates implements `MonadTrace`. In doing so, this PR revealed **several bugs in the codebase**: places where we were expecting to trace something, but due to the default instance of `HasReporterM IO` we would actually not do anything. This PR also splits the code of `Tracing` in more byte-sized modules, with the goal of potentially moving to `server/lib` down the line. ### Remaining work This PR is a draft; what's left to do is: - [x] make Pro compile; i haven't updated `HasuraPro/Main` yet - [x] document Tracing by writing a note that explains how to use the library, and the meaning of "reporter", "trace" and "span", as well as the pitfalls - [x] discuss some of the trade-offs in the implementation, which is why i'm opening this PR already despite it not fully building yet - [x] it depends on #7789 being merged first PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7791 GitOrigin-RevId: cadd32d039134c93ddbf364599a2f4dd988adea8
2023-03-13 20:37:16 +03:00
Tracing.MonadTrace m,
MonadMetadataStorage m,
MonadBaseControl IO m
) =>
Env.Environment ->
L.Logger L.Hasura ->
HTTP.Manager ->
ScheduledTriggerMetrics ->
[OneOffScheduledEvent] ->
TVar (Set.Set OneOffScheduledEventId) ->
m ()
processOneOffScheduledEvents
env
logger
httpMgr
scheduledTriggerMetrics
oneOffEvents
lockedOneOffScheduledEvents = do
-- save the locked one-off events that have been fetched from the
-- database, the events stored here will be unlocked in case a
-- graceful shutdown is initiated in midst of processing these events
saveLockedEvents (map _ooseId oneOffEvents) lockedOneOffScheduledEvents
forConcurrently_ oneOffEvents $ \OneOffScheduledEvent {..} -> do
(either logInternalError pure) =<< runExceptT do
let payload =
ScheduledEventWebhookPayload
_ooseId
Nothing
_ooseScheduledTime
(fromMaybe J.Null _oosePayload)
_ooseComment
(Just _ooseCreatedAt)
_ooseRequestTransform
_ooseResponseTransform
retryCtx = RetryContext _ooseTries _ooseRetryConf
resolvedWebhookInfoEither = resolveWebhookEither env _ooseWebhookConf
resolvedHeaderInfoEither = getHeaderInfosFromConfEither env _ooseHeaderConf
-- `webhookAndHeaderInfo` returns webhook and header info (and errors)
webhookAndHeaderInfo = case (resolvedWebhookInfoEither, resolvedHeaderInfoEither) of
(Right resolvedEventWebhookInfo, Right resolvedEventHeaderInfo) -> do
let resolvedWebhookEnvRecord = EnvRecord (getTemplateFromUrl _ooseWebhookConf) resolvedEventWebhookInfo
Right (resolvedWebhookEnvRecord, resolvedEventHeaderInfo)
(Left eventWebhookErrorVars, Right _) -> Left $ This eventWebhookErrorVars
(Right _, Left eventHeaderErrorVars) -> Left $ That eventHeaderErrorVars
(Left eventWebhookErrors, Left eventHeaderErrorVars) -> Left $ These eventWebhookErrors eventHeaderErrorVars
case webhookAndHeaderInfo of
Right (webhookEnvRecord, eventHeaderInfo) -> do
let processScheduledEventAction =
flip runReaderT (logger, httpMgr) $
processScheduledEvent scheduledTriggerMetrics _ooseId eventHeaderInfo retryCtx payload webhookEnvRecord OneOff
eventTimeout = unrefine $ strcTimeoutSeconds $ _ooseRetryConf
-- Try to process the event with a timeout of min(`uppserBoundScheduledEventTimeout`, event's response timeout),
-- so that we're never blocked forever while processing a single event.
--
-- If the request times out, then process it as an erroneous invocation and move on.
timeout (fromInteger (diffTimeToMicroSeconds (min upperBoundScheduledEventTimeout eventTimeout))) processScheduledEventAction
`onNothingM` ( do
let eventTimeoutMessage = "One-off Scheduled event " <> _ooseId <<> " timed out while processing."
eventTimeoutError = err500 TimeoutErrorCode eventTimeoutMessage
lift $ logInternalError eventTimeoutError
processError _ooseId retryCtx [] OneOff (mkErrorObject eventTimeoutMessage) (HOther $ T.unpack eventTimeoutMessage) scheduledTriggerMetrics
)
removeEventFromLockedEvents _ooseId lockedOneOffScheduledEvents
Left envVarError ->
processError
_ooseId
retryCtx
[]
OneOff
(mkErrorObject $ "Error creating the request. " <> (mkInvalidEnvVarErrMsg $ envVarError))
(HOther $ T.unpack $ qeError (err400 NotFound (mkInvalidEnvVarErrMsg envVarError)))
scheduledTriggerMetrics
where
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
getTemplateFromUrl url = printURLTemplate $ unInputWebhook url
mkInvalidEnvVarErrMsg envVarErrorValues = "The value for environment variables not found: " <> (getInvalidEnvVarText envVarErrorValues)
mkErrorObject :: Text -> J.Value
mkErrorObject errorMessage =
J.object $ ["error" J..= errorMessage]
getInvalidEnvVarText :: These ResolveWebhookError ResolveHeaderError -> Text
getInvalidEnvVarText (This a) = toTxt a
getInvalidEnvVarText (That b) = toTxt b
getInvalidEnvVarText (These a b) = toTxt a <> ", " <> toTxt b
processScheduledTriggers ::
( MonadIO m,
Rewrite `Tracing` to allow for only one `TraceT` in the entire stack. This PR is on top of #7789. ### Description This PR entirely rewrites the API of the Tracing library, to make `interpTraceT` a thing of the past. Before this change, we ran traces by sticking a `TraceT` on top of whatever we were doing. This had several major drawbacks: - we were carrying a bunch of `TraceT` across the codebase, and the entire codebase had to know about it - we needed to carry a second class constraint around (`HasReporterM`) to be able to run all of those traces - we kept having to do stack rewriting with `interpTraceT`, which went from inconvenient to horrible - we had to declare several behavioral instances on `TraceT m` This PR rewrite all of `Tracing` using a more conventional model: there is ONE `TraceT` at the bottom of the stack, and there is an associated class constraint `MonadTrace`: any part of the code that happens to satisfy `MonadTrace` is able to create new traces. We NEVER have to do stack rewriting, `interpTraceT` is gone, and `TraceT` and `Reporter` become implementation details that 99% of the code is blissfully unaware of: code that needs to do tracing only needs to declare that the monad in which it operates implements `MonadTrace`. In doing so, this PR revealed **several bugs in the codebase**: places where we were expecting to trace something, but due to the default instance of `HasReporterM IO` we would actually not do anything. This PR also splits the code of `Tracing` in more byte-sized modules, with the goal of potentially moving to `server/lib` down the line. ### Remaining work This PR is a draft; what's left to do is: - [x] make Pro compile; i haven't updated `HasuraPro/Main` yet - [x] document Tracing by writing a note that explains how to use the library, and the meaning of "reporter", "trace" and "span", as well as the pitfalls - [x] discuss some of the trade-offs in the implementation, which is why i'm opening this PR already despite it not fully building yet - [x] it depends on #7789 being merged first PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7791 GitOrigin-RevId: cadd32d039134c93ddbf364599a2f4dd988adea8
2023-03-13 20:37:16 +03:00
Tracing.MonadTrace m,
MonadMetadataStorage m,
MonadBaseControl IO m
) =>
IO Env.Environment ->
L.Logger L.Hasura ->
FetchedScheduledEventsStatsLogger ->
HTTP.Manager ->
ScheduledTriggerMetrics ->
IO SchemaCache ->
LockedEventsCtx ->
m (Forever m)
processScheduledTriggers getEnvHook logger statsLogger httpMgr scheduledTriggerMetrics getSC LockedEventsCtx {..} = do
return $
Forever () $
const do
cronTriggersInfo <- scCronTriggers <$> liftIO getSC
env <- liftIO getEnvHook
getScheduledEventsForDelivery (Map.keys cronTriggersInfo) >>= \case
Left e -> logInternalError e
Right (cronEvents, oneOffEvents) -> do
logFetchedScheduledEventsStats statsLogger (CronEventsCount $ length cronEvents) (OneOffScheduledEventsCount $ length oneOffEvents)
processCronEvents logger httpMgr scheduledTriggerMetrics cronEvents cronTriggersInfo leCronEvents
processOneOffScheduledEvents env logger httpMgr scheduledTriggerMetrics oneOffEvents leOneOffEvents
-- NOTE: cron events are scheduled at times with minute resolution (as on
-- unix), while one-off events can be set for arbitrary times. The sleep
-- time here determines how overdue a scheduled event (cron or one-off)
-- might be before we begin processing:
liftIO $ sleep (seconds 10)
where
logInternalError err = liftIO . L.unLogger logger $ ScheduledTriggerInternalErr err
processScheduledEvent ::
( MonadReader r m,
Has HTTP.Manager r,
Has (L.Logger L.Hasura) r,
MonadIO m,
Rewrite `Tracing` to allow for only one `TraceT` in the entire stack. This PR is on top of #7789. ### Description This PR entirely rewrites the API of the Tracing library, to make `interpTraceT` a thing of the past. Before this change, we ran traces by sticking a `TraceT` on top of whatever we were doing. This had several major drawbacks: - we were carrying a bunch of `TraceT` across the codebase, and the entire codebase had to know about it - we needed to carry a second class constraint around (`HasReporterM`) to be able to run all of those traces - we kept having to do stack rewriting with `interpTraceT`, which went from inconvenient to horrible - we had to declare several behavioral instances on `TraceT m` This PR rewrite all of `Tracing` using a more conventional model: there is ONE `TraceT` at the bottom of the stack, and there is an associated class constraint `MonadTrace`: any part of the code that happens to satisfy `MonadTrace` is able to create new traces. We NEVER have to do stack rewriting, `interpTraceT` is gone, and `TraceT` and `Reporter` become implementation details that 99% of the code is blissfully unaware of: code that needs to do tracing only needs to declare that the monad in which it operates implements `MonadTrace`. In doing so, this PR revealed **several bugs in the codebase**: places where we were expecting to trace something, but due to the default instance of `HasReporterM IO` we would actually not do anything. This PR also splits the code of `Tracing` in more byte-sized modules, with the goal of potentially moving to `server/lib` down the line. ### Remaining work This PR is a draft; what's left to do is: - [x] make Pro compile; i haven't updated `HasuraPro/Main` yet - [x] document Tracing by writing a note that explains how to use the library, and the meaning of "reporter", "trace" and "span", as well as the pitfalls - [x] discuss some of the trade-offs in the implementation, which is why i'm opening this PR already despite it not fully building yet - [x] it depends on #7789 being merged first PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7791 GitOrigin-RevId: cadd32d039134c93ddbf364599a2f4dd988adea8
2023-03-13 20:37:16 +03:00
Tracing.MonadTrace m,
MonadMetadataStorage m,
MonadError QErr m
) =>
ScheduledTriggerMetrics ->
ScheduledEventId ->
[EventHeaderInfo] ->
RetryContext ->
ScheduledEventWebhookPayload ->
EnvRecord ResolvedWebhook ->
ScheduledEventType ->
m ()
processScheduledEvent scheduledTriggerMetrics eventId eventHeaders retryCtx payload webhookUrl type' =
Rewrite `Tracing` to allow for only one `TraceT` in the entire stack. This PR is on top of #7789. ### Description This PR entirely rewrites the API of the Tracing library, to make `interpTraceT` a thing of the past. Before this change, we ran traces by sticking a `TraceT` on top of whatever we were doing. This had several major drawbacks: - we were carrying a bunch of `TraceT` across the codebase, and the entire codebase had to know about it - we needed to carry a second class constraint around (`HasReporterM`) to be able to run all of those traces - we kept having to do stack rewriting with `interpTraceT`, which went from inconvenient to horrible - we had to declare several behavioral instances on `TraceT m` This PR rewrite all of `Tracing` using a more conventional model: there is ONE `TraceT` at the bottom of the stack, and there is an associated class constraint `MonadTrace`: any part of the code that happens to satisfy `MonadTrace` is able to create new traces. We NEVER have to do stack rewriting, `interpTraceT` is gone, and `TraceT` and `Reporter` become implementation details that 99% of the code is blissfully unaware of: code that needs to do tracing only needs to declare that the monad in which it operates implements `MonadTrace`. In doing so, this PR revealed **several bugs in the codebase**: places where we were expecting to trace something, but due to the default instance of `HasReporterM IO` we would actually not do anything. This PR also splits the code of `Tracing` in more byte-sized modules, with the goal of potentially moving to `server/lib` down the line. ### Remaining work This PR is a draft; what's left to do is: - [x] make Pro compile; i haven't updated `HasuraPro/Main` yet - [x] document Tracing by writing a note that explains how to use the library, and the meaning of "reporter", "trace" and "span", as well as the pitfalls - [x] discuss some of the trade-offs in the implementation, which is why i'm opening this PR already despite it not fully building yet - [x] it depends on #7789 being merged first PR-URL: https://github.com/hasura/graphql-engine-mono/pull/7791 GitOrigin-RevId: cadd32d039134c93ddbf364599a2f4dd988adea8
2023-03-13 20:37:16 +03:00
Tracing.newTrace Tracing.sampleAlways traceNote do
currentTime <- liftIO getCurrentTime
let retryConf = _rctxConf retryCtx
scheduledTime = sewpScheduledTime payload
if convertDuration (diffUTCTime currentTime scheduledTime)
> unrefine (strcToleranceSeconds retryConf)
then processDead eventId type'
else do
let timeoutSeconds = round $ unrefine (strcTimeoutSeconds retryConf)
httpTimeout = HTTP.responseTimeoutMicro (timeoutSeconds * 1000000)
(headers, decodedHeaders) = prepareHeaders eventHeaders
extraLogCtx = ExtraLogContext eventId (sewpName payload)
webhookReqBodyJson = J.toJSON payload
webhookReqBody = J.encode webhookReqBodyJson
requestTransform = sewpRequestTransform payload
responseTransform = mkResponseTransform <$> sewpResponseTransform payload
eitherReqRes <-
runExceptT $
mkRequest headers httpTimeout webhookReqBody requestTransform (_envVarValue webhookUrl) >>= \reqDetails -> do
let request = extractRequest reqDetails
logger e d = do
logHTTPForST e extraLogCtx d (_envVarName webhookUrl) decodedHeaders
liftIO $ do
case e of
Left _err -> pure ()
Right response ->
Prometheus.Counter.add
(stmScheduledTriggerBytesReceived scheduledTriggerMetrics)
(hrsSize response)
let RequestDetails {_rdOriginalSize, _rdTransformedSize} = d
in Prometheus.Counter.add
(stmScheduledTriggerBytesSent scheduledTriggerMetrics)
(fromMaybe _rdOriginalSize _rdTransformedSize)
case (type', e) of
(Cron, Left _err) -> Prometheus.Counter.inc (stmCronEventsInvocationTotalFailure scheduledTriggerMetrics)
(Cron, Right _) -> Prometheus.Counter.inc (stmCronEventsInvocationTotalSuccess scheduledTriggerMetrics)
(OneOff, Left _err) -> Prometheus.Counter.inc (stmOneOffEventsInvocationTotalFailure scheduledTriggerMetrics)
(OneOff, Right _) -> Prometheus.Counter.inc (stmOneOffEventsInvocationTotalSuccess scheduledTriggerMetrics)
sessionVars = _rdSessionVars reqDetails
resp <- invokeRequest reqDetails responseTransform sessionVars logger
pure (request, resp)
case eitherReqRes of
Right (req, resp) ->
let reqBody = fromMaybe J.Null $ preview (HTTP.body . HTTP._RequestBodyLBS) req >>= J.decode @J.Value
in processSuccess eventId decodedHeaders type' reqBody resp scheduledTriggerMetrics
Left (HTTPError reqBody e) -> processError eventId retryCtx decodedHeaders type' reqBody e scheduledTriggerMetrics
Left (TransformationError _ e) -> do
-- Log The Transformation Error
logger :: L.Logger L.Hasura <- asks getter
L.unLogger logger $ L.UnstructuredLog L.LevelError (SB.fromLBS $ J.encode e)
-- Set event state to Error
liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESError) type'
where
traceNote = "Scheduled trigger" <> foldMap ((": " <>) . triggerNameToTxt) (sewpName payload)
processError ::
( MonadIO m,
MonadMetadataStorage m,
MonadError QErr m
) =>
ScheduledEventId ->
RetryContext ->
[HeaderConf] ->
ScheduledEventType ->
J.Value ->
HTTPErr a ->
ScheduledTriggerMetrics ->
m ()
processError eventId retryCtx decodedHeaders type' reqJson err scheduledTriggerMetric = do
let invocation = case err of
HClient httpException ->
let statusMaybe = getHTTPExceptionStatus httpException
in mkInvocation eventId statusMaybe decodedHeaders (SB.fromLBS $ httpExceptionErrorEncoding httpException) [] reqJson
HStatus errResp -> do
let respPayload = hrsBody errResp
respHeaders = hrsHeaders errResp
respStatus = hrsStatus errResp
mkInvocation eventId (Just respStatus) decodedHeaders respPayload respHeaders reqJson
HOther detail -> do
let errMsg = (SB.fromLBS $ J.encode detail)
mkInvocation eventId (Just 500) decodedHeaders errMsg [] reqJson
liftEitherM $ insertScheduledEventInvocation invocation type'
retryOrMarkError eventId retryCtx err type' scheduledTriggerMetric
retryOrMarkError ::
(MonadIO m, MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId ->
RetryContext ->
HTTPErr a ->
ScheduledEventType ->
ScheduledTriggerMetrics ->
m ()
retryOrMarkError eventId retryCtx err type' scheduledTriggerMetric = do
let RetryContext tries retryConf = retryCtx
mRetryHeader = getRetryAfterHeaderFromHTTPErr err
mRetryHeaderSeconds = parseRetryHeaderValue =<< mRetryHeader
triesExhausted = tries >= strcNumRetries retryConf
noRetryHeader = isNothing mRetryHeaderSeconds
if triesExhausted && noRetryHeader
then do
liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESError) type'
case type' of
Cron -> liftIO $ Prometheus.Counter.inc (stmCronEventsProcessedTotalFailure scheduledTriggerMetric)
OneOff -> liftIO $ Prometheus.Counter.inc (stmOneOffEventsProcessedTotalFailure scheduledTriggerMetric)
else do
currentTime <- liftIO getCurrentTime
let delay =
fromMaybe
( round $ unrefine (strcRetryIntervalSeconds retryConf)
)
mRetryHeaderSeconds
diff = fromIntegral delay
retryTime = addUTCTime diff currentTime
liftEitherM $ setScheduledEventOp eventId (SEOpRetry retryTime) type'
{- Note [Scheduled event lifecycle]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Scheduled events move between six different states over the course of their
lifetime, as represented by the following flowchart:
Deploy server documentation to github page in CI _(This PR is on top of #3352.)_ ## Description This PR overhauls our documentation CI steps to push all generated server documentation to the `gh-pages` branch of the OSS repo. The goal of this PR is to arrive in the situation where `https://hasura.github.io/graphql-engine/server/` is automatically populated to contain the following: - all the markdown files from `server/documentation`, copied verbatim, no transformation applied - all the notes, collected from the code by the `extract-notes.sh` script, in `server/notes` - the generated haddock documentation for each major release or branch in `server/haddock`. To do so, this PR does the following: - it includes the script to extract notes from #3352, - it rewrites the documentation checking CI step, to generate the notes and publish the resulting "server/documentation" folder, - it includes a new CI step to deploy the documentation to the `gh-pages` branch Of note: - we will generate a different haddock folder for each main branch and release; in practice, that means the _main_, _stable_, _alpha_, _beta_ branches, and every build tagged with a version number - the step that builds the haddock documentation checks that ALL projects in the repo build, including pro, but the deploy only deploys the graphql-engine documentation, as it pushes it to a publicly-accessible place ## Required work **DO NOT MERGE THIS PR IT IS NOT READY**. Some work needs to go into this PR before it is ready. First of all: the `gh-pages` branch of the OSS repo does NOT yet contain the documentation scaffolding that this new process assumes. At the bare minimum, it should be a orphan branch that contains a top-level README.md file, and a _server_ folder. An example of the bare minimum required can be previewed [on my fork](https://nicuveo.github.io/graphql-engine/server/). The content of the `server/documentation` folder needs to be adjusted to reflect this; at the very least, a `README.md` file needs to be added to do the indexing (again, see the placeholder [on my fork](https://nicuveo.github.io/graphql-engine/server/) for an example). This way of publishing documentation must be validated against [proposed changes to the documentation](https://github.com/hasura/graphql-engine-mono/pull/3294). @marionschleifer what do you think? ~~The buildkite code in this branch is currently untested, and I am not sure how to test it.~~ PR-URL: https://github.com/hasura/graphql-engine-mono/pull/3380 GitOrigin-RevId: b24f6759c64ae29886c1f1b481b172febc512032
2022-01-31 16:15:31 +03:00
scheduled (1) locked (2) delivered
(3)(4) error
(5) dead
When a scheduled event is first created, it starts in the 'scheduled' state,
and it can transition to other states in the following ways:
Deploy server documentation to github page in CI _(This PR is on top of #3352.)_ ## Description This PR overhauls our documentation CI steps to push all generated server documentation to the `gh-pages` branch of the OSS repo. The goal of this PR is to arrive in the situation where `https://hasura.github.io/graphql-engine/server/` is automatically populated to contain the following: - all the markdown files from `server/documentation`, copied verbatim, no transformation applied - all the notes, collected from the code by the `extract-notes.sh` script, in `server/notes` - the generated haddock documentation for each major release or branch in `server/haddock`. To do so, this PR does the following: - it includes the script to extract notes from #3352, - it rewrites the documentation checking CI step, to generate the notes and publish the resulting "server/documentation" folder, - it includes a new CI step to deploy the documentation to the `gh-pages` branch Of note: - we will generate a different haddock folder for each main branch and release; in practice, that means the _main_, _stable_, _alpha_, _beta_ branches, and every build tagged with a version number - the step that builds the haddock documentation checks that ALL projects in the repo build, including pro, but the deploy only deploys the graphql-engine documentation, as it pushes it to a publicly-accessible place ## Required work **DO NOT MERGE THIS PR IT IS NOT READY**. Some work needs to go into this PR before it is ready. First of all: the `gh-pages` branch of the OSS repo does NOT yet contain the documentation scaffolding that this new process assumes. At the bare minimum, it should be a orphan branch that contains a top-level README.md file, and a _server_ folder. An example of the bare minimum required can be previewed [on my fork](https://nicuveo.github.io/graphql-engine/server/). The content of the `server/documentation` folder needs to be adjusted to reflect this; at the very least, a `README.md` file needs to be added to do the indexing (again, see the placeholder [on my fork](https://nicuveo.github.io/graphql-engine/server/) for an example). This way of publishing documentation must be validated against [proposed changes to the documentation](https://github.com/hasura/graphql-engine-mono/pull/3294). @marionschleifer what do you think? ~~The buildkite code in this branch is currently untested, and I am not sure how to test it.~~ PR-URL: https://github.com/hasura/graphql-engine-mono/pull/3380 GitOrigin-RevId: b24f6759c64ae29886c1f1b481b172febc512032
2022-01-31 16:15:31 +03:00
1. When graphql-engine fetches a scheduled event from the database to process
it, it sets its state to 'locked'. This prevents multiple graphql-engine
instances running on the same database from processing the same
scheduled event concurrently.
Deploy server documentation to github page in CI _(This PR is on top of #3352.)_ ## Description This PR overhauls our documentation CI steps to push all generated server documentation to the `gh-pages` branch of the OSS repo. The goal of this PR is to arrive in the situation where `https://hasura.github.io/graphql-engine/server/` is automatically populated to contain the following: - all the markdown files from `server/documentation`, copied verbatim, no transformation applied - all the notes, collected from the code by the `extract-notes.sh` script, in `server/notes` - the generated haddock documentation for each major release or branch in `server/haddock`. To do so, this PR does the following: - it includes the script to extract notes from #3352, - it rewrites the documentation checking CI step, to generate the notes and publish the resulting "server/documentation" folder, - it includes a new CI step to deploy the documentation to the `gh-pages` branch Of note: - we will generate a different haddock folder for each main branch and release; in practice, that means the _main_, _stable_, _alpha_, _beta_ branches, and every build tagged with a version number - the step that builds the haddock documentation checks that ALL projects in the repo build, including pro, but the deploy only deploys the graphql-engine documentation, as it pushes it to a publicly-accessible place ## Required work **DO NOT MERGE THIS PR IT IS NOT READY**. Some work needs to go into this PR before it is ready. First of all: the `gh-pages` branch of the OSS repo does NOT yet contain the documentation scaffolding that this new process assumes. At the bare minimum, it should be a orphan branch that contains a top-level README.md file, and a _server_ folder. An example of the bare minimum required can be previewed [on my fork](https://nicuveo.github.io/graphql-engine/server/). The content of the `server/documentation` folder needs to be adjusted to reflect this; at the very least, a `README.md` file needs to be added to do the indexing (again, see the placeholder [on my fork](https://nicuveo.github.io/graphql-engine/server/) for an example). This way of publishing documentation must be validated against [proposed changes to the documentation](https://github.com/hasura/graphql-engine-mono/pull/3294). @marionschleifer what do you think? ~~The buildkite code in this branch is currently untested, and I am not sure how to test it.~~ PR-URL: https://github.com/hasura/graphql-engine-mono/pull/3380 GitOrigin-RevId: b24f6759c64ae29886c1f1b481b172febc512032
2022-01-31 16:15:31 +03:00
2. When a scheduled event is processed successfully, it is marked 'delivered'.
3. If a scheduled event fails to be processed, but it hasnt yet reached
its maximum retry limit, its retry counter is incremented and
it is returned to the 'scheduled' state.
Deploy server documentation to github page in CI _(This PR is on top of #3352.)_ ## Description This PR overhauls our documentation CI steps to push all generated server documentation to the `gh-pages` branch of the OSS repo. The goal of this PR is to arrive in the situation where `https://hasura.github.io/graphql-engine/server/` is automatically populated to contain the following: - all the markdown files from `server/documentation`, copied verbatim, no transformation applied - all the notes, collected from the code by the `extract-notes.sh` script, in `server/notes` - the generated haddock documentation for each major release or branch in `server/haddock`. To do so, this PR does the following: - it includes the script to extract notes from #3352, - it rewrites the documentation checking CI step, to generate the notes and publish the resulting "server/documentation" folder, - it includes a new CI step to deploy the documentation to the `gh-pages` branch Of note: - we will generate a different haddock folder for each main branch and release; in practice, that means the _main_, _stable_, _alpha_, _beta_ branches, and every build tagged with a version number - the step that builds the haddock documentation checks that ALL projects in the repo build, including pro, but the deploy only deploys the graphql-engine documentation, as it pushes it to a publicly-accessible place ## Required work **DO NOT MERGE THIS PR IT IS NOT READY**. Some work needs to go into this PR before it is ready. First of all: the `gh-pages` branch of the OSS repo does NOT yet contain the documentation scaffolding that this new process assumes. At the bare minimum, it should be a orphan branch that contains a top-level README.md file, and a _server_ folder. An example of the bare minimum required can be previewed [on my fork](https://nicuveo.github.io/graphql-engine/server/). The content of the `server/documentation` folder needs to be adjusted to reflect this; at the very least, a `README.md` file needs to be added to do the indexing (again, see the placeholder [on my fork](https://nicuveo.github.io/graphql-engine/server/) for an example). This way of publishing documentation must be validated against [proposed changes to the documentation](https://github.com/hasura/graphql-engine-mono/pull/3294). @marionschleifer what do you think? ~~The buildkite code in this branch is currently untested, and I am not sure how to test it.~~ PR-URL: https://github.com/hasura/graphql-engine-mono/pull/3380 GitOrigin-RevId: b24f6759c64ae29886c1f1b481b172febc512032
2022-01-31 16:15:31 +03:00
4. If a scheduled event fails to be processed and *has* reached its
retry limit, its state is set to 'error'.
Deploy server documentation to github page in CI _(This PR is on top of #3352.)_ ## Description This PR overhauls our documentation CI steps to push all generated server documentation to the `gh-pages` branch of the OSS repo. The goal of this PR is to arrive in the situation where `https://hasura.github.io/graphql-engine/server/` is automatically populated to contain the following: - all the markdown files from `server/documentation`, copied verbatim, no transformation applied - all the notes, collected from the code by the `extract-notes.sh` script, in `server/notes` - the generated haddock documentation for each major release or branch in `server/haddock`. To do so, this PR does the following: - it includes the script to extract notes from #3352, - it rewrites the documentation checking CI step, to generate the notes and publish the resulting "server/documentation" folder, - it includes a new CI step to deploy the documentation to the `gh-pages` branch Of note: - we will generate a different haddock folder for each main branch and release; in practice, that means the _main_, _stable_, _alpha_, _beta_ branches, and every build tagged with a version number - the step that builds the haddock documentation checks that ALL projects in the repo build, including pro, but the deploy only deploys the graphql-engine documentation, as it pushes it to a publicly-accessible place ## Required work **DO NOT MERGE THIS PR IT IS NOT READY**. Some work needs to go into this PR before it is ready. First of all: the `gh-pages` branch of the OSS repo does NOT yet contain the documentation scaffolding that this new process assumes. At the bare minimum, it should be a orphan branch that contains a top-level README.md file, and a _server_ folder. An example of the bare minimum required can be previewed [on my fork](https://nicuveo.github.io/graphql-engine/server/). The content of the `server/documentation` folder needs to be adjusted to reflect this; at the very least, a `README.md` file needs to be added to do the indexing (again, see the placeholder [on my fork](https://nicuveo.github.io/graphql-engine/server/) for an example). This way of publishing documentation must be validated against [proposed changes to the documentation](https://github.com/hasura/graphql-engine-mono/pull/3294). @marionschleifer what do you think? ~~The buildkite code in this branch is currently untested, and I am not sure how to test it.~~ PR-URL: https://github.com/hasura/graphql-engine-mono/pull/3380 GitOrigin-RevId: b24f6759c64ae29886c1f1b481b172febc512032
2022-01-31 16:15:31 +03:00
5. If for whatever reason the difference between the current time and the
scheduled time is greater than the tolerance of the scheduled event, it
will not be processed and its state will be set to 'dead'.
-}
processSuccess ::
(MonadMetadataStorage m, MonadError QErr m, MonadIO m) =>
ScheduledEventId ->
[HeaderConf] ->
ScheduledEventType ->
J.Value ->
HTTPResp a ->
ScheduledTriggerMetrics ->
m ()
processSuccess eventId decodedHeaders type' reqBodyJson resp scheduledTriggerMetric = do
let respBody = hrsBody resp
respHeaders = hrsHeaders resp
respStatus = hrsStatus resp
invocation = mkInvocation eventId (Just respStatus) decodedHeaders respBody respHeaders reqBodyJson
liftEitherM $ insertScheduledEventInvocation invocation type'
liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESDelivered) type'
case type' of
Cron -> liftIO $ Prometheus.Counter.inc (stmCronEventsProcessedTotalSuccess scheduledTriggerMetric)
OneOff -> liftIO $ Prometheus.Counter.inc (stmOneOffEventsProcessedTotalSuccess scheduledTriggerMetric)
processDead ::
(MonadMetadataStorage m, MonadError QErr m) =>
ScheduledEventId ->
ScheduledEventType ->
m ()
processDead eventId type' =
liftEitherM $ setScheduledEventOp eventId (SEOpStatus SESDead) type'
mkInvocation ::
ScheduledEventId ->
Maybe Int ->
[HeaderConf] ->
SB.SerializableBlob ->
[HeaderConf] ->
J.Value ->
(Invocation 'ScheduledType)
mkInvocation eventId status reqHeaders respBody respHeaders reqBodyJson =
Invocation
eventId
status
(mkWebhookReq reqBodyJson reqHeaders invocationVersionST)
(mkInvocationResp status respBody respHeaders)
-- metadata database transactions
-- | Get cron trigger stats for cron jobs with fewer than 100 future reified
-- events in the database
--
-- The point here is to maintain a certain number of future events so the user
-- can kind of see what's coming up, and obviously to give 'processCronEvents'
-- something to do.
getDeprivedCronTriggerStatsTx :: [TriggerName] -> PG.TxE QErr [CronTriggerStats]
getDeprivedCronTriggerStatsTx cronTriggerNames =
map (\(n, count, maxTx) -> CronTriggerStats n count maxTx)
<$> PG.withQE
defaultTxErrorHandler
[PG.sql|
SELECT t.trigger_name, coalesce(q.upcoming_events_count, 0), coalesce(q.max_scheduled_time, now())
FROM (SELECT UNNEST ($1::text[]) as trigger_name) as t
LEFT JOIN
( SELECT
trigger_name,
count(1) as upcoming_events_count,
max(scheduled_time) as max_scheduled_time
FROM hdb_catalog.hdb_cron_events
WHERE tries = 0 and status = 'scheduled'
GROUP BY trigger_name
) AS q
ON t.trigger_name = q.trigger_name
WHERE coalesce(q.upcoming_events_count, 0) < 100
|]
(Identity $ PGTextArray $ map triggerNameToTxt cronTriggerNames)
True
-- TODO
-- - cron events have minute resolution, while one-off events have arbitrary
-- resolution, so it doesn't make sense to fetch them at the same rate
-- - if we decide to fetch cron events less frequently we should wake up that
-- thread at second 0 of every minute, and then pass hasura's now time into
-- the query (since the DB may disagree about the time)
getScheduledEventsForDeliveryTx :: [TriggerName] -> PG.TxE QErr ([CronEvent], [OneOffScheduledEvent])
getScheduledEventsForDeliveryTx cronTriggerNames =
(,) <$> getCronEventsForDelivery <*> getOneOffEventsForDelivery
where
getCronEventsForDelivery :: PG.TxE QErr [CronEvent]
getCronEventsForDelivery =
map (PG.getViaJSON . runIdentity)
<$> PG.withQE
defaultTxErrorHandler
[PG.sql|
WITH cte AS
( UPDATE hdb_catalog.hdb_cron_events
SET status = 'locked'
WHERE id IN ( SELECT t.id
FROM hdb_catalog.hdb_cron_events t
WHERE ( t.status = 'scheduled'
and (
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
(t.next_retry_at is not NULL and t.next_retry_at <= now())
)
) AND trigger_name = ANY($1)
FOR UPDATE SKIP LOCKED
)
RETURNING *
)
SELECT row_to_json(t.*) FROM cte AS t
|]
(Identity $ PGTextArray $ triggerNameToTxt <$> cronTriggerNames)
True
getOneOffEventsForDelivery :: PG.TxE QErr [OneOffScheduledEvent]
getOneOffEventsForDelivery = do
map (PG.getViaJSON . runIdentity)
<$> PG.withQE
defaultTxErrorHandler
[PG.sql|
WITH cte AS (
UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'locked'
WHERE id IN ( SELECT t.id
FROM hdb_catalog.hdb_scheduled_events t
WHERE ( t.status = 'scheduled'
and (
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
(t.next_retry_at is not NULL and t.next_retry_at <= now())
)
)
FOR UPDATE SKIP LOCKED
)
RETURNING *
)
SELECT row_to_json(t.*) FROM cte AS t
|]
()
False
insertInvocationTx :: Invocation 'ScheduledType -> ScheduledEventType -> PG.TxE QErr ()
insertInvocationTx invo type' = do
case type' of
Cron -> do
PG.unitQE
defaultTxErrorHandler
[PG.sql|
INSERT INTO hdb_catalog.hdb_cron_event_invocation_logs
(event_id, status, request, response)
VALUES ($1, $2, $3, $4)
|]
( iEventId invo,
fromIntegral <$> iStatus invo :: Maybe Int64,
PG.ViaJSON $ J.toJSON $ iRequest invo,
PG.ViaJSON $ J.toJSON $ iResponse invo
)
True
PG.unitQE
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_cron_events
SET tries = tries + 1
WHERE id = $1
|]
(Identity $ iEventId invo)
True
OneOff -> do
PG.unitQE
defaultTxErrorHandler
[PG.sql|
INSERT INTO hdb_catalog.hdb_scheduled_event_invocation_logs
(event_id, status, request, response)
VALUES ($1, $2, $3, $4)
|]
( iEventId invo,
fromIntegral <$> iStatus invo :: Maybe Int64,
PG.ViaJSON $ J.toJSON $ iRequest invo,
PG.ViaJSON $ J.toJSON $ iResponse invo
)
True
PG.unitQE
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET tries = tries + 1
WHERE id = $1
|]
(Identity $ iEventId invo)
True
setScheduledEventOpTx ::
ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> PG.TxE QErr ()
setScheduledEventOpTx eventId op type' = case op of
SEOpRetry time -> setRetry time
SEOpStatus status -> setStatus status
where
setRetry time =
case type' of
Cron ->
PG.unitQE
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_cron_events
SET next_retry_at = $1,
STATUS = 'scheduled'
WHERE id = $2
|]
(time, eventId)
True
OneOff ->
PG.unitQE
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET next_retry_at = $1,
STATUS = 'scheduled'
WHERE id = $2
|]
(time, eventId)
True
setStatus status =
case type' of
Cron -> do
PG.unitQE
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_cron_events
SET status = $2
WHERE id = $1
|]
(eventId, status)
True
OneOff -> do
PG.unitQE
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET status = $2
WHERE id = $1
|]
(eventId, status)
True
unlockScheduledEventsTx :: ScheduledEventType -> [ScheduledEventId] -> PG.TxE QErr Int
unlockScheduledEventsTx type' eventIds =
let eventIdsTextArray = map unEventId eventIds
in case type' of
Cron ->
(runIdentity . PG.getRow)
<$> PG.withQE
defaultTxErrorHandler
[PG.sql|
WITH "cte" AS
(UPDATE hdb_catalog.hdb_cron_events
SET status = 'scheduled'
WHERE id = ANY($1::text[]) and status = 'locked'
RETURNING *)
SELECT count(*) FROM "cte"
|]
(Identity $ PGTextArray eventIdsTextArray)
True
OneOff ->
(runIdentity . PG.getRow)
<$> PG.withQE
defaultTxErrorHandler
[PG.sql|
WITH "cte" AS
(UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'scheduled'
WHERE id = ANY($1::text[]) AND status = 'locked'
RETURNING *)
SELECT count(*) FROM "cte"
|]
(Identity $ PGTextArray eventIdsTextArray)
True
unlockAllLockedScheduledEventsTx :: PG.TxE QErr ()
unlockAllLockedScheduledEventsTx = do
PG.unitQE
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_cron_events
SET status = 'scheduled'
WHERE status = 'locked'
|]
()
True
PG.unitQE
defaultTxErrorHandler
[PG.sql|
UPDATE hdb_catalog.hdb_scheduled_events
SET status = 'scheduled'
WHERE status = 'locked'
|]
()
True
insertCronEventsTx :: [CronEventSeed] -> PG.TxE QErr ()
insertCronEventsTx cronSeeds = do
let insertCronEventsSql =
TB.run $
toSQL
S.SQLInsert
{ siTable = cronEventsTable,
siCols = map unsafePGCol ["trigger_name", "scheduled_time"],
siValues = S.ValuesExp $ map (toTupleExp . toArr) cronSeeds,
siConflict = Just $ S.DoNothing Nothing,
siRet = Nothing
}
PG.unitQE defaultTxErrorHandler (PG.fromText insertCronEventsSql) () False
where
toArr (CronEventSeed n t) = [(triggerNameToTxt n), (formatTime' t)]
toTupleExp = S.TupleExp . map S.SELit
insertOneOffScheduledEventTx :: OneOffEvent -> PG.TxE QErr EventId
insertOneOffScheduledEventTx CreateScheduledEvent {..} =
runIdentity . PG.getRow
<$> PG.withQE
defaultTxErrorHandler
[PG.sql|
INSERT INTO hdb_catalog.hdb_scheduled_events
(webhook_conf,scheduled_time,payload,retry_conf,header_conf,comment)
VALUES
($1, $2, $3, $4, $5, $6) RETURNING id
|]
( PG.ViaJSON cseWebhook,
cseScheduleAt,
PG.ViaJSON csePayload,
PG.ViaJSON cseRetryConf,
PG.ViaJSON cseHeaders,
cseComment
)
False
dropFutureCronEventsTx :: ClearCronEvents -> PG.TxE QErr ()
dropFutureCronEventsTx = \case
SingleCronTrigger triggerName ->
PG.unitQE
defaultTxErrorHandler
[PG.sql|
DELETE FROM hdb_catalog.hdb_cron_events
WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0
|]
(Identity triggerName)
True
MetadataCronTriggers triggerNames ->
PG.unitQE
defaultTxErrorHandler
[PG.sql|
DELETE FROM hdb_catalog.hdb_cron_events
WHERE scheduled_time > now() AND tries = 0 AND trigger_name = ANY($1::text[])
|]
(Identity $ PGTextArray $ map triggerNameToTxt triggerNames)
False
cronEventsTable :: QualifiedTable
cronEventsTable =
QualifiedObject "hdb_catalog" $ TableName "hdb_cron_events"
mkScheduledEventStatusFilter :: [ScheduledEventStatus] -> S.BoolExp
mkScheduledEventStatusFilter = \case
[] -> S.BELit True
v ->
S.BEIN (S.SEIdentifier $ Identifier "status") $
map (S.SELit . scheduledEventStatusToText) v
scheduledTimeOrderBy :: S.OrderByExp
scheduledTimeOrderBy =
let scheduledTimeCol = S.SEIdentifier $ Identifier "scheduled_time"
in S.OrderByExp $
flip (NE.:|) [] $
S.OrderByItem
scheduledTimeCol
(Just S.OTAsc)
Nothing
-- | Build a select expression which outputs total count and
-- list of json rows with pagination limit and offset applied
mkPaginationSelectExp ::
S.Select ->
ScheduledEventPagination ->
RowsCountOption ->
S.Select
mkPaginationSelectExp allRowsSelect ScheduledEventPagination {..} shouldIncludeRowsCount =
S.mkSelect
{ S.selCTEs = [(countCteAlias, S.ICTESelect allRowsSelect), (limitCteAlias, limitCteSelect)],
S.selExtr =
case shouldIncludeRowsCount of
IncludeRowsCount -> [countExtractor, rowsExtractor]
DontIncludeRowsCount -> [rowsExtractor]
}
where
countCteAlias = S.mkTableAlias "count_cte"
limitCteAlias = S.mkTableAlias "limit_cte"
countExtractor =
let selectExp =
S.mkSelect
{ S.selExtr = [S.Extractor S.countStar Nothing],
S.selFrom = Just $ S.mkIdenFromExp (S.tableAliasToIdentifier countCteAlias)
}
in S.Extractor (S.SESelect selectExp) Nothing
limitCteSelect =
S.ICTESelect
S.mkSelect
{ S.selExtr = [S.selectStar],
S.selFrom = Just $ S.mkIdenFromExp (S.tableAliasToIdentifier countCteAlias),
S.selLimit = (S.LimitExp . S.intToSQLExp) <$> _sepLimit,
S.selOffset = (S.OffsetExp . S.intToSQLExp) <$> _sepOffset
}
rowsExtractor =
let jsonAgg = S.SEUnsafe "json_agg(row_to_json(limit_cte.*))"
selectExp =
S.mkSelect
{ S.selExtr = [S.Extractor jsonAgg Nothing],
S.selFrom = Just $ S.mkIdenFromExp (S.tableAliasToIdentifier limitCteAlias)
}
in S.Extractor (S.handleIfNull (S.SELit "[]") (S.SESelect selectExp)) Nothing
withCount :: (Int, PG.ViaJSON a) -> WithOptionalTotalCount a
withCount (count, PG.ViaJSON a) = WithOptionalTotalCount (Just count) a
withoutCount :: PG.ViaJSON a -> WithOptionalTotalCount a
withoutCount (PG.ViaJSON a) = WithOptionalTotalCount Nothing a
executeWithOptionalTotalCount :: J.FromJSON a => PG.Query -> RowsCountOption -> PG.TxE QErr (WithOptionalTotalCount a)
executeWithOptionalTotalCount sql getRowsCount =
case getRowsCount of
IncludeRowsCount -> (withCount . PG.getRow) <$> PG.withQE defaultTxErrorHandler sql () False
DontIncludeRowsCount -> (withoutCount . runIdentity . PG.getRow) <$> PG.withQE defaultTxErrorHandler sql () False
getOneOffScheduledEventsTx ::
ScheduledEventPagination ->
[ScheduledEventStatus] ->
RowsCountOption ->
PG.TxE QErr (WithOptionalTotalCount [OneOffScheduledEvent])
getOneOffScheduledEventsTx pagination statuses getRowsCount = do
let table = QualifiedObject "hdb_catalog" $ TableName "hdb_scheduled_events"
statusFilter = mkScheduledEventStatusFilter statuses
select =
S.mkSelect
{ S.selExtr = [S.selectStar],
S.selFrom = Just $ S.mkSimpleFromExp table,
S.selWhere = Just $ S.WhereFrag statusFilter,
S.selOrderBy = Just scheduledTimeOrderBy
}
sql = PG.fromBuilder $ toSQL $ mkPaginationSelectExp select pagination getRowsCount
executeWithOptionalTotalCount sql getRowsCount
getCronEventsTx ::
TriggerName ->
ScheduledEventPagination ->
[ScheduledEventStatus] ->
RowsCountOption ->
PG.TxE QErr (WithOptionalTotalCount [CronEvent])
getCronEventsTx triggerName pagination status getRowsCount = do
let triggerNameFilter =
S.BECompare S.SEQ (S.SEIdentifier $ Identifier "trigger_name") (S.SELit $ triggerNameToTxt triggerName)
statusFilter = mkScheduledEventStatusFilter status
select =
S.mkSelect
{ S.selExtr = [S.selectStar],
S.selFrom = Just $ S.mkSimpleFromExp cronEventsTable,
S.selWhere = Just $ S.WhereFrag $ S.BEBin S.AndOp triggerNameFilter statusFilter,
S.selOrderBy = Just scheduledTimeOrderBy
}
sql = PG.fromBuilder $ toSQL $ mkPaginationSelectExp select pagination getRowsCount
executeWithOptionalTotalCount sql getRowsCount
deleteScheduledEventTx ::
ScheduledEventId -> ScheduledEventType -> PG.TxE QErr ()
deleteScheduledEventTx eventId = \case
OneOff ->
PG.unitQE
defaultTxErrorHandler
[PG.sql|
DELETE FROM hdb_catalog.hdb_scheduled_events
WHERE id = $1
|]
(Identity eventId)
False
Cron ->
PG.unitQE
defaultTxErrorHandler
[PG.sql|
DELETE FROM hdb_catalog.hdb_cron_events
WHERE id = $1
|]
(Identity eventId)
False
invocationFieldExtractors :: QualifiedTable -> [S.Extractor]
invocationFieldExtractors table =
[ S.Extractor (seIden "id") Nothing,
S.Extractor (seIden "event_id") Nothing,
S.Extractor (seIden "status") Nothing,
S.Extractor (withJsonTypeAnn $ seIden "request") Nothing,
S.Extractor (withJsonTypeAnn $ seIden "response") Nothing,
S.Extractor (seIden "created_at") Nothing
]
where
withJsonTypeAnn e = S.SETyAnn e $ S.TypeAnn "json"
seIden = S.SEQIdentifier . S.mkQIdentifierTable table . Identifier
mkEventIdBoolExp :: QualifiedTable -> EventId -> S.BoolExp
mkEventIdBoolExp table eventId =
S.BECompare
S.SEQ
(S.SEQIdentifier $ S.mkQIdentifierTable table $ Identifier "event_id")
(S.SELit $ unEventId eventId)
getScheduledEventInvocationsTx ::
GetScheduledEventInvocations ->
PG.TxE QErr (WithOptionalTotalCount [ScheduledEventInvocation])
getScheduledEventInvocationsTx getEventInvocations = do
let eventsTables = EventTables oneOffInvocationsTable cronInvocationsTable cronEventsTable
sql = PG.fromBuilder $ toSQL $ getScheduledEventsInvocationsQuery eventsTables getEventInvocations
executeWithOptionalTotalCount sql (_geiGetRowsCount getEventInvocations)
where
oneOffInvocationsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_scheduled_event_invocation_logs"
cronInvocationsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_cron_event_invocation_logs"
data EventTables = EventTables
{ etOneOffInvocationsTable :: QualifiedTable,
etCronInvocationsTable :: QualifiedTable,
etCronEventsTable :: QualifiedTable
}
server: multitenant metadata storage The metadata storage implementation for graphql-engine-multitenant. - It uses a centralized PG database to store metadata of all tenants (instead of per tenant database) - Similarly, it uses a single schema-sync listener thread per MT worker (instead of listener thread per tenant) (PS: although, the processor thread is spawned per tenant) - 2 new flags are introduced - `--metadataDatabaseUrl` and (optional) `--metadataDatabaseRetries` Internally, a "metadata mode" is introduced to indicate an external/managed store vs a store managed by each pro-server. To run : - obtain the schema file (located at `pro/server/res/cloud/metadata_db_schema.sql`) - apply the schema on a PG database - set the `--metadataDatabaseUrl` flag to point to the above database - run the MT executable The schema (and its migrations) for the metadata db is managed outside the MT worker. ### New metadata The following is the new portion of `Metadata` added : ```yaml version: 3 metrics_config: analyze_query_variables: true analyze_response_body: false api_limits: disabled: false depth_limit: global: 5 per_role: user: 7 editor: 9 rate_limit: per_role: user: unique_params: - x-hasura-user-id - x-hasura-team-id max_reqs_per_min: 20 global: unique_params: IP max_reqs_per_min: 10 ``` - In Pro, the code around fetching/updating/syncing pro-config is removed - That also means, `hdb_pro_catalog` for keeping the config cache is not required. Hence the `hdb_pro_catalog` is also removed - The required config comes from metadata / schema cache ### New Metadata APIs - `set_api_limits` - `remove_api_limits` - `set_metrics_config` - `remove_metrics_config` #### `set_api_limits` ```yaml type: set_api_limits args: disabled: false depth_limit: global: 5 per_role: user: 7 editor: 9 rate_limit: per_role: anonymous: max_reqs_per_min: 10 unique_params: "ip" editor: max_reqs_per_min: 30 unique_params: - x-hasura-user-id user: unique_params: - x-hasura-user-id - x-hasura-team-id max_reqs_per_min: 20 global: unique_params: IP max_reqs_per_min: 10 ``` #### `remove_api_limits` ```yaml type: remove_api_limits args: {} ``` #### `set_metrics_config` ```yaml type: set_metrics_config args: analyze_query_variables: true analyze_response_body: false ``` #### `remove_metrics_config` ```yaml type: remove_metrics_config args: {} ``` #### TODO - [x] on-prem pro implementation for `MonadMetadataStorage` - [x] move the project config from Lux to pro metadata (PR: #379) - [ ] console changes for pro config/api limits, subscription workers (cc @soorajshankar @beerose) - [x] address other minor TODOs - [x] TxIso for `MonadSourceResolver` - [x] enable EKG connection pool metrics - [x] add logging of connection info when sources are added? - [x] confirm if the `buildReason` for schema cache is correct - [ ] testing - [x] 1.3 -> 1.4 cloud migration script (#465; PR: #508) - [x] one-time migration of existing metadata from users' db to centralized PG - [x] one-time migration of pro project config + api limits + regression tests from metrics API to metadata - [ ] integrate with infra team (WIP - cc @hgiasac) - [x] benchmark with 1000+ tenants + each tenant making read/update metadata query every second (PR: https://github.com/hasura/graphql-engine-mono/pull/411) - [ ] benchmark with few tenants having large metadata (100+ tables etc.) - [ ] when user moves regions (https://github.com/hasura/lux/issues/1717) - [ ] metadata has to be migrated from one regional PG to another - [ ] migrate metrics data as well ? - [ ] operation logs - [ ] regression test runs - [ ] find a way to share the schema files with the infra team Co-authored-by: Naveen Naidu <30195193+Naveenaidu@users.noreply.github.com> GitOrigin-RevId: 39e8361f2c0e96e0f9e8f8fb45e6cc14857f31f1
2021-02-11 20:54:25 +03:00
getScheduledEventsInvocationsQueryNoPagination :: EventTables -> GetScheduledEventInvocationsBy -> S.Select
getScheduledEventsInvocationsQueryNoPagination (EventTables oneOffInvocationsTable cronInvocationsTable cronEventsTable') invocationsBy =
allRowsSelect
where
createdAtOrderBy table =
let createdAtCol = S.SEQIdentifier $ S.mkQIdentifierTable table $ Identifier "created_at"
in S.OrderByExp $ flip (NE.:|) [] $ S.OrderByItem createdAtCol (Just S.OTDesc) Nothing
allRowsSelect = case invocationsBy of
GIBEventId eventId eventType ->
let table = case eventType of
OneOff -> oneOffInvocationsTable
Cron -> cronInvocationsTable
in S.mkSelect
{ S.selExtr = invocationFieldExtractors table,
S.selFrom = Just $ S.mkSimpleFromExp table,
S.selOrderBy = Just $ createdAtOrderBy table,
S.selWhere = Just $ S.WhereFrag $ mkEventIdBoolExp table eventId
}
GIBEvent event -> case event of
SEOneOff ->
let table = oneOffInvocationsTable
in S.mkSelect
{ S.selExtr = invocationFieldExtractors table,
S.selFrom = Just $ S.mkSimpleFromExp table,
S.selOrderBy = Just $ createdAtOrderBy table
}
SECron triggerName ->
let invocationTable = cronInvocationsTable
eventTable = cronEventsTable'
joinCondition =
S.JoinOn $
S.BECompare
S.SEQ
(S.SEQIdentifier $ S.mkQIdentifierTable eventTable $ Identifier "id")
(S.SEQIdentifier $ S.mkQIdentifierTable invocationTable $ Identifier "event_id")
joinTables =
S.JoinExpr
(S.FISimple invocationTable Nothing)
S.Inner
(S.FISimple eventTable Nothing)
joinCondition
triggerBoolExp =
S.BECompare
S.SEQ
(S.SEQIdentifier $ S.mkQIdentifierTable eventTable (Identifier "trigger_name"))
(S.SELit $ triggerNameToTxt triggerName)
in S.mkSelect
{ S.selExtr = invocationFieldExtractors invocationTable,
S.selFrom = Just $ S.FromExp [S.FIJoin joinTables],
S.selWhere = Just $ S.WhereFrag triggerBoolExp,
S.selOrderBy = Just $ createdAtOrderBy invocationTable
}
getScheduledEventsInvocationsQuery :: EventTables -> GetScheduledEventInvocations -> S.Select
getScheduledEventsInvocationsQuery eventTables (GetScheduledEventInvocations invocationsBy pagination shouldIncludeRowsCount) =
let invocationsSelect = getScheduledEventsInvocationsQueryNoPagination eventTables invocationsBy
in mkPaginationSelectExp invocationsSelect pagination shouldIncludeRowsCount
-- | Logger to accumulate stats of fetched scheduled events over a period of time and log once using @'L.Logger L.Hasura'.
-- See @'createStatsLogger' for more details.
createFetchedScheduledEventsStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> m FetchedScheduledEventsStatsLogger
createFetchedScheduledEventsStatsLogger = L.createStatsLogger
-- | Close the fetched scheduled events stats logger.
closeFetchedScheduledEventsStatsLogger ::
(MonadIO m) => L.Logger L.Hasura -> FetchedScheduledEventsStatsLogger -> m ()
closeFetchedScheduledEventsStatsLogger = L.closeStatsLogger L.scheduledTriggerProcessLogType
-- | Log statistics of fetched scheduled events. See @'logStats' for more details.
logFetchedScheduledEventsStats ::
(MonadIO m) =>
FetchedScheduledEventsStatsLogger ->
CronEventsCount ->
OneOffScheduledEventsCount ->
m ()
logFetchedScheduledEventsStats logger cron oneOff =
L.logStats logger (FetchedScheduledEventsStats cron oneOff 1)
-- | Logger to accumulate stats of fetched cron triggers, for generating cron events, over a period of time and
-- log once using @'L.Logger L.Hasura'.
-- See @'createStatsLogger' for more details.
createFetchedCronTriggerStatsLogger :: (MonadIO m) => L.Logger L.Hasura -> m FetchedCronTriggerStatsLogger
createFetchedCronTriggerStatsLogger = L.createStatsLogger
-- | Close the fetched cron trigger stats logger.
closeFetchedCronTriggersStatsLogger ::
(MonadIO m) => L.Logger L.Hasura -> FetchedCronTriggerStatsLogger -> m ()
closeFetchedCronTriggersStatsLogger = L.closeStatsLogger L.cronEventGeneratorProcessType
-- | Log statistics of fetched cron triggers. See @'logStats' for more details.
logFetchedCronTriggersStats ::
(MonadIO m) =>
FetchedCronTriggerStatsLogger ->
[CronTriggerStats] ->
m ()
logFetchedCronTriggersStats logger cronTriggerStats =
L.logStats logger (FetchedCronTriggerStats cronTriggerStats 1)