use the created_at time of the scheduled event

- in case of cron events, use the created_at of the cron trigger
This commit is contained in:
Karthikeyan Chinnakonda 2020-09-04 22:44:54 +05:30
parent f9aca45706
commit 772e991079
5 changed files with 66 additions and 25 deletions

View File

@ -176,6 +176,7 @@ data CronEventPartial
, cepName :: !TriggerName
, cepScheduledTime :: !UTCTime
, cepTries :: !Int
, cepCreatedAt :: !UTCTime
} deriving (Show, Eq)
data ScheduledEventFull
@ -192,6 +193,7 @@ data ScheduledEventFull
, sefRetryConf :: !STRetryConf
, sefHeaders :: ![EventHeaderInfo]
, sefComment :: !(Maybe Text)
, sefCreatedAt :: !UTCTime
} deriving (Show, Eq)
$(J.deriveToJSON (J.aesonDrop 3 J.snakeCase) {J.omitNothingFields = True} ''ScheduledEventFull)
@ -206,8 +208,8 @@ data StandAloneScheduledEvent
, saseRetryConf :: !STRetryConf
, saseHeaderConf :: ![HeaderConf]
, saseComment :: !(Maybe Text)
, saseCreatedAt :: !UTCTime
} deriving (Show, Eq)
$(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) {J.omitNothingFields = True} ''StandAloneScheduledEvent)
-- | The 'ScheduledEventType' data type is needed to differentiate
@ -359,7 +361,7 @@ processCronEvents logger logEnv httpMgr pgpool getSC lockedCronEvents = do
-- database, the events stored here will be unlocked in case a
-- graceful shutdown is initiated in midst of processing these events
saveLockedEvents (map cepId partialEvents) lockedCronEvents
for_ partialEvents $ \(CronEventPartial id' name st tries)-> do
for_ partialEvents $ \(CronEventPartial id' name st tries createdAt)-> do
case Map.lookup name cronTriggersInfo of
Nothing -> logInternalError $
err500 Unexpected "could not find cron trigger in cache"
@ -376,6 +378,7 @@ processCronEvents logger logEnv httpMgr pgpool getSC lockedCronEvents = do
ctiRetryConf
ctiHeaders
ctiComment
createdAt
finally <- Tracing.runTraceT "scheduled event" . runExceptT $
runReaderT (processScheduledEvent logEnv pgpool scheduledEvent CronScheduledEvent) (logger, httpMgr)
removeEventFromLockedEvents id' lockedCronEvents
@ -411,7 +414,9 @@ processStandAloneEvents env logger logEnv httpMgr pgpool lockedStandAloneEvents
payload
retryConf
headerConf
comment )
comment
createdAt
)
-> do
webhookInfo <- liftIO . runExceptT $ resolveWebhook env webhookConf
headerInfo <- liftIO . runExceptT $ getHeaderInfosFromConf env headerConf
@ -431,6 +436,7 @@ processStandAloneEvents env logger logEnv httpMgr pgpool lockedStandAloneEvents
retryConf
headerInfo'
comment
createdAt
finally <- Tracing.runTraceT "scheduled event" . runExceptT $
runReaderT (processScheduledEvent logEnv pgpool scheduledEvent StandAloneEvent) $
(logger, httpMgr)
@ -488,7 +494,7 @@ processScheduledEvent
headers = addDefaultHeaders $ map encodeHeader sefHeaders
extraLogCtx = ExtraLogContext (Just currentTime) sefId
webhookReqPayload =
ScheduledEventWebhookPayload sefId sefName sefScheduledTime sefPayload sefComment currentTime
ScheduledEventWebhookPayload sefId sefName sefScheduledTime sefPayload sefComment sefCreatedAt
webhookReqBodyJson = J.toJSON webhookReqPayload
webhookReqBody = J.encode webhookReqBodyJson
requestDetails = RequestDetails $ BL.length webhookReqBody
@ -696,21 +702,24 @@ setScheduledEventStatus scheduledEventId status type' =
getPartialCronEvents :: Q.TxE QErr [CronEventPartial]
getPartialCronEvents = do
map uncurryEvent <$> Q.listQE defaultTxErrorHandler [Q.sql|
UPDATE hdb_catalog.hdb_cron_events
UPDATE hdb_catalog.hdb_cron_events ce
SET status = 'locked'
WHERE id IN ( SELECT t.id
FROM hdb_catalog.hdb_cron_events t
WHERE ( t.status = 'scheduled'
and (
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
(t.next_retry_at is not NULL and t.next_retry_at <= now())
)
)
FOR UPDATE SKIP LOCKED
)
RETURNING id, trigger_name, scheduled_time, tries
FROM (
SELECT t.id, ct.created_at
FROM hdb_catalog.hdb_cron_events t
JOIN hdb_catalog.hdb_cron_triggers ct on ct.name = t.trigger_name
WHERE ( t.status = 'scheduled'
and (
(t.next_retry_at is NULL and t.scheduled_time <= now()) or
(t.next_retry_at is not NULL and t.next_retry_at <= now())
)
)
FOR UPDATE SKIP LOCKED
) cron_events
WHERE ce.id = cron_events.id
RETURNING ce.id, trigger_name, scheduled_time, tries, cron_events.created_at
|] () True
where uncurryEvent (i, n, st, tries) = CronEventPartial i n st tries
where uncurryEvent (i, n, st, tries, createdAt) = CronEventPartial i n st tries createdAt
getOneOffScheduledEvents :: Q.TxE QErr [StandAloneScheduledEvent]
getOneOffScheduledEvents = do
@ -727,7 +736,7 @@ getOneOffScheduledEvents = do
)
FOR UPDATE SKIP LOCKED
)
RETURNING id, webhook_conf, scheduled_time, retry_conf, payload, header_conf, tries, comment
RETURNING id, webhook_conf, scheduled_time, retry_conf, payload, header_conf, tries, comment, created_at
|] () False
where
uncurryOneOffEvent ( eventId
@ -737,7 +746,9 @@ getOneOffScheduledEvents = do
, payload
, headerConf
, tries
, comment ) =
, comment
, createdAt
) =
StandAloneScheduledEvent eventId
scheduledTime
tries
@ -746,6 +757,7 @@ getOneOffScheduledEvents = do
(Q.getAltJ retryConf)
(Q.getAltJ headerConf)
comment
createdAt
liftExceptTIO :: (MonadError e m, MonadIO m) => ExceptT e IO a -> m a

View File

@ -1 +1 @@
37
38

View File

@ -747,7 +747,8 @@ CREATE TABLE hdb_catalog.hdb_cron_triggers
retry_conf JSON,
header_conf JSON,
include_in_metadata BOOLEAN NOT NULL DEFAULT FALSE,
comment TEXT
comment TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE hdb_catalog.hdb_cron_events
@ -757,7 +758,7 @@ CREATE TABLE hdb_catalog.hdb_cron_events
scheduled_time TIMESTAMPTZ NOT NULL,
status TEXT NOT NULL DEFAULT 'scheduled',
tries INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW(),
next_retry_at TIMESTAMPTZ,
FOREIGN KEY (trigger_name) REFERENCES hdb_catalog.hdb_cron_triggers(name)
@ -774,7 +775,7 @@ CREATE TABLE hdb_catalog.hdb_cron_event_invocation_logs
status INTEGER,
request JSON,
response JSON,
created_at TIMESTAMP DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW(),
FOREIGN KEY (event_id) REFERENCES hdb_catalog.hdb_cron_events (id)
ON UPDATE CASCADE ON DELETE CASCADE
@ -803,7 +804,7 @@ CREATE TABLE hdb_catalog.hdb_scheduled_events
header_conf JSON,
status TEXT NOT NULL DEFAULT 'scheduled',
tries INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW(),
next_retry_at TIMESTAMPTZ,
comment TEXT,
CONSTRAINT valid_status CHECK (status IN ('scheduled','locked','delivered','error','dead'))
@ -818,7 +819,7 @@ event_id TEXT,
status INTEGER,
request JSON,
response JSON,
created_at TIMESTAMP DEFAULT NOW(),
created_at TIMESTAMPTZ DEFAULT NOW(),
FOREIGN KEY (event_id) REFERENCES hdb_catalog.hdb_scheduled_events (id)
ON DELETE CASCADE ON UPDATE CASCADE

View File

@ -0,0 +1,14 @@
ALTER TABLE hdb_catalog.hdb_cron_triggers
ADD COLUMN created_at TIMESTAMPTZ default now();
ALTER TABLE hdb_catalog.hdb_cron_events
ALTER COLUMN created_at TYPE TIMESTAMPTZ;
ALTER TABLE hdb_catalog.hdb_cron_event_invocation_logs
ALTER COLUMN created_at TYPE TIMESTAMPTZ;
ALTER TABLE hdb_catalog.hdb_scheduled_events
ALTER COLUMN created_at TYPE TIMESTAMPTZ;
ALTER TABLE hdb_catalog.hdb_scheduled_event_invocation_logs
ALTER COLUMN created_at TYPE TIMESTAMPTZ;

View File

@ -0,0 +1,14 @@
ALTER TABLE hdb_catalog.hdb_cron_triggers
DROP COLUMN created_at;
ALTER TABLE hdb_catalog.hdb_cron_events
ALTER COLUMN created_at TIMESTAMP default now();
ALTER TABLE hdb_catalog.hdb_cron_event_invocation_logs
ALTER COLUMN created_at TIMESTAMP default now();
ALTER TABLE hdb_catalog.hdb_scheduled_events
ALTER COLUMN created_at TIMESTAMP default now();
ALTER TABLE hdb_catalog.hdb_scheduled_event_invocation_logs
ALTER COLUMN created_at TIMESTAMP default now();