From fe70d9fbe856efc7734fd02d7604dbc5d5d6e8ed Mon Sep 17 00:00:00 2001 From: Phil Freeman Date: Tue, 28 Jul 2020 13:45:20 -0700 Subject: [PATCH 1/4] Add a 30 minute timeout for event trigger locks --- .../src-lib/Hasura/Eventing/EventTrigger.hs | 21 ++++++++++--------- .../Hasura/Eventing/ScheduledTrigger.hs | 4 ++-- .../src-lib/Hasura/GraphQL/Resolve/Action.hs | 2 +- .../src-lib/Hasura/GraphQL/Transport/HTTP.hs | 4 ++-- .../Hasura/GraphQL/Transport/WebSocket.hs | 6 +++--- server/src-lib/Hasura/RQL/DDL/EventTrigger.hs | 2 +- server/src-lib/Hasura/Server/Auth.hs | 2 +- server/src-lib/Hasura/Server/Auth/JWT.hs | 2 +- server/src-rsr/catalog_version.txt | 2 +- server/src-rsr/initialise.sql | 2 +- server/src-rsr/migrations/37_to_38.sql | 3 +++ server/src-rsr/migrations/38_to_37.sql | 3 +++ 12 files changed, 30 insertions(+), 23 deletions(-) create mode 100644 server/src-rsr/migrations/37_to_38.sql create mode 100644 server/src-rsr/migrations/38_to_37.sql diff --git a/server/src-lib/Hasura/Eventing/EventTrigger.hs b/server/src-lib/Hasura/Eventing/EventTrigger.hs index c32c2508b99..8c47c89b0fb 100644 --- a/server/src-lib/Hasura/Eventing/EventTrigger.hs +++ b/server/src-lib/Hasura/Eventing/EventTrigger.hs @@ -209,7 +209,7 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx Tracing.runTraceTInContext tracingCtx t <- processEvent event - & runTraceT "process event" + & runTraceT "Event trigger" & withEventEngineCtx eeCtx & flip runReaderT (logger, httpMgr) & LA.async @@ -408,10 +408,11 @@ fetchEvents :: Int -> Q.TxE QErr [Event] fetchEvents limitI = map uncurryEvent <$> Q.listQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.event_log - SET locked = 't' + SET locked = NOW() WHERE id IN ( SELECT l.id FROM hdb_catalog.event_log l - WHERE l.delivered = 'f' and l.error = 'f' and l.locked = 'f' + WHERE l.delivered = 'f' and l.error = 'f' + and (l.locked IS NULL or l.locked < NOW() - interval '30 minute') and (l.next_retry_at is NULL or l.next_retry_at <= now()) and l.archived = 'f' ORDER BY created_at @@ -448,14 +449,14 @@ insertInvocation invo = do setSuccess :: Event -> Q.TxE QErr () setSuccess e = Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.event_log - SET delivered = 't', next_retry_at = NULL, locked = 'f' + SET delivered = 't', next_retry_at = NULL, locked = NULL WHERE id = $1 |] (Identity $ eId e) True setError :: Event -> Q.TxE QErr () setError e = Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.event_log - SET error = 't', next_retry_at = NULL, locked = 'f' + SET error = 't', next_retry_at = NULL, locked = NULL WHERE id = $1 |] (Identity $ eId e) True @@ -463,7 +464,7 @@ setRetry :: Event -> UTCTime -> Q.TxE QErr () setRetry e time = Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.event_log - SET next_retry_at = $1, locked = 'f' + SET next_retry_at = $1, locked = NULL WHERE id = $2 |] (time, eId e) True @@ -471,8 +472,8 @@ unlockAllEvents :: Q.TxE QErr () unlockAllEvents = Q.unitQE defaultTxErrorHandler [Q.sql| UPDATE hdb_catalog.event_log - SET locked = 'f' - WHERE locked = 't' + SET locked = NULL + WHERE locked IS NOT NULL |] () True toInt64 :: (Integral a) => a -> Int64 @@ -495,12 +496,12 @@ unlockEvents eventIds = [Q.sql| WITH "cte" AS (UPDATE hdb_catalog.event_log - SET locked = 'f' + SET locked = NULL WHERE id = ANY($1::text[]) -- only unlock those events that have been locked, it's possible -- that an event has been processed but not yet been removed from -- the saved locked events, which will lead to a double send - AND locked = 't' + AND locked IS NOT NULL RETURNING *) SELECT count(*) FROM "cte" |] (Identity $ EventIdArray eventIds) True diff --git a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs index 65cb2855566..8bcf2028a12 100644 --- a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs @@ -375,7 +375,7 @@ processCronEvents logger logEnv httpMgr pgpool getSC lockedCronEvents = do ctiRetryConf ctiHeaders ctiComment - finally <- Tracing.runTraceT "scheduled event" . runExceptT $ + finally <- Tracing.runTraceT "Scheduled event" . runExceptT $ runReaderT (processScheduledEvent logEnv pgpool scheduledEvent CronScheduledEvent) (logger, httpMgr) removeEventFromLockedEvents id' lockedCronEvents either logInternalError pure finally @@ -430,7 +430,7 @@ processStandAloneEvents env logger logEnv httpMgr pgpool lockedStandAloneEvents retryConf headerInfo' comment - finally <- Tracing.runTraceT "scheduled event" . runExceptT $ + finally <- Tracing.runTraceT "Scheduled event" . runExceptT $ runReaderT (processScheduledEvent logEnv pgpool scheduledEvent StandAloneEvent) $ (logger, httpMgr) removeEventFromLockedEvents id' lockedStandAloneEvents diff --git a/server/src-lib/Hasura/GraphQL/Resolve/Action.hs b/server/src-lib/Hasura/GraphQL/Resolve/Action.hs index 095f38bb74f..9d0c7901786 100644 --- a/server/src-lib/Hasura/GraphQL/Resolve/Action.hs +++ b/server/src-lib/Hasura/GraphQL/Resolve/Action.hs @@ -411,7 +411,7 @@ asyncActionsProcessor env cacheRef pgPool httpManager = forever $ do either mempty return res callHandler :: ActionCache -> ActionLogItem -> m () - callHandler actionCache actionLogItem = Tracing.runTraceT "async actions processor" do + callHandler actionCache actionLogItem = Tracing.runTraceT "Async actions processor" do let ActionLogItem actionId actionName reqHeaders sessionVariables inputPayload = actionLogItem case Map.lookup actionName actionCache of diff --git a/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs b/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs index 5517ab5ba52..734ed800a2a 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs @@ -165,12 +165,12 @@ runHasuraGQ runHasuraGQ reqId (query, queryParsed) userInfo resolvedOp = do (E.ExecutionCtx logger _ pgExecCtx _ _ _ _ _) <- ask (telemTimeIO, respE) <- withElapsedTime $ runExceptT $ case resolvedOp of - E.ExOpQuery tx genSql asts -> trace "pg" $ do + E.ExOpQuery tx genSql asts -> trace "Query" $ do -- log the generated SQL and the graphql query logQueryLog logger query genSql reqId Tracing.interpTraceT id $ executeQuery queryParsed asts genSql pgExecCtx Q.ReadOnly tx - E.ExOpMutation respHeaders tx -> trace "pg" $ do + E.ExOpMutation respHeaders tx -> trace "Mutation" $ do logQueryLog logger query Nothing reqId ctx <- Tracing.currentContext (respHeaders,) <$> Tracing.interpTraceT (runLazyTx pgExecCtx Q.ReadWrite . withTraceContext ctx . withUserInfo userInfo) tx diff --git a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs index 5f05334b2bb..8181f08dec4 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs @@ -356,11 +356,11 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do -> E.ExecOp (Tracing.TraceT (LazyTx QErr)) -> ExceptT () m () runHasuraGQ timerTot telemCacheHit reqId query queryParsed userInfo = \case - E.ExOpQuery opTx genSql asts -> Tracing.trace "pg" $ + E.ExOpQuery opTx genSql asts -> Tracing.trace "Query" $ execQueryOrMut Telem.Query genSql . fmap snd $ Tracing.interpTraceT id $ executeQuery queryParsed asts genSql pgExecCtx Q.ReadOnly opTx -- Response headers discarded over websockets - E.ExOpMutation _ opTx -> Tracing.trace "pg" do + E.ExOpMutation _ opTx -> Tracing.trace "Mutation" do execQueryOrMut Telem.Mutation Nothing $ Tracing.interpTraceT (runLazyTx pgExecCtx Q.ReadWrite . withUserInfo userInfo) opTx E.ExOpSubs lqOp -> do @@ -517,7 +517,7 @@ onMessage -> AuthMode -> WSServerEnv -> WSConn -> BL.ByteString -> m () -onMessage env authMode serverEnv wsConn msgRaw = Tracing.runTraceT "websocket" do +onMessage env authMode serverEnv wsConn msgRaw = Tracing.runTraceT "Websocket" do case J.eitherDecode msgRaw of Left e -> do let err = ConnErrMsg $ "parsing ClientMessage failed: " <> T.pack e diff --git a/server/src-lib/Hasura/RQL/DDL/EventTrigger.hs b/server/src-lib/Hasura/RQL/DDL/EventTrigger.hs index f73074cb3ea..4b10419fd2f 100644 --- a/server/src-lib/Hasura/RQL/DDL/EventTrigger.hs +++ b/server/src-lib/Hasura/RQL/DDL/EventTrigger.hs @@ -158,7 +158,7 @@ fetchEvent :: EventId -> Q.TxE QErr (EventId, Bool) fetchEvent eid = do events <- Q.listQE defaultTxErrorHandler [Q.sql| - SELECT l.id, l.locked + SELECT l.id, l.locked IS NOT NULL AND l.locked >= NOW() - interval '30 minute' FROM hdb_catalog.event_log l JOIN hdb_catalog.event_triggers e ON l.trigger_name = e.name diff --git a/server/src-lib/Hasura/Server/Auth.hs b/server/src-lib/Hasura/Server/Auth.hs index 4e17d612e32..1a9e6cf5fac 100644 --- a/server/src-lib/Hasura/Server/Auth.hs +++ b/server/src-lib/Hasura/Server/Auth.hs @@ -164,7 +164,7 @@ setupAuthMode mAdminSecretHash mWebHook mJwtSecret mUnAuthRole httpManager logge -- header), do not start a background thread for refreshing the JWK getJwkFromUrl url = do ref <- liftIO $ newIORef $ JWKSet [] - maybeExpiry <- withJwkError $ Tracing.runTraceT "jwk init" $ updateJwkRef logger httpManager url ref + maybeExpiry <- withJwkError $ Tracing.runTraceT "JWK init" $ updateJwkRef logger httpManager url ref case maybeExpiry of Nothing -> return ref Just time -> do diff --git a/server/src-lib/Hasura/Server/Auth/JWT.hs b/server/src-lib/Hasura/Server/Auth/JWT.hs index cc07b0ddc3b..876ae727bc7 100644 --- a/server/src-lib/Hasura/Server/Auth/JWT.hs +++ b/server/src-lib/Hasura/Server/Auth/JWT.hs @@ -136,7 +136,7 @@ jwkRefreshCtrl -> m void jwkRefreshCtrl logger manager url ref time = do liftIO $ C.sleep time - forever $ Tracing.runTraceT "jwk refresh" do + forever $ Tracing.runTraceT "JWK refresh" do res <- runExceptT $ updateJwkRef logger manager url ref mTime <- either (const $ logNotice >> return Nothing) return res -- if can't parse time from header, defaults to 1 min diff --git a/server/src-rsr/catalog_version.txt b/server/src-rsr/catalog_version.txt index 81b5c5d06cc..e522732c77e 100644 --- a/server/src-rsr/catalog_version.txt +++ b/server/src-rsr/catalog_version.txt @@ -1 +1 @@ -37 +38 diff --git a/server/src-rsr/initialise.sql b/server/src-rsr/initialise.sql index 97f34defc85..45138e37684 100644 --- a/server/src-rsr/initialise.sql +++ b/server/src-rsr/initialise.sql @@ -302,7 +302,7 @@ CREATE TABLE hdb_catalog.event_log error BOOLEAN NOT NULL DEFAULT FALSE, tries INTEGER NOT NULL DEFAULT 0, created_at TIMESTAMP DEFAULT NOW(), - locked BOOLEAN NOT NULL DEFAULT FALSE, + locked TIMESTAMP, next_retry_at TIMESTAMP, archived BOOLEAN NOT NULL DEFAULT FALSE ); diff --git a/server/src-rsr/migrations/37_to_38.sql b/server/src-rsr/migrations/37_to_38.sql new file mode 100644 index 00000000000..0d07dc3f9a0 --- /dev/null +++ b/server/src-rsr/migrations/37_to_38.sql @@ -0,0 +1,3 @@ +ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked DROP DEFAULT; +ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked DROP NOT NULL; +ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked TYPE TIMESTAMP USING CASE WHEN locked THEN NOW() ELSE NULL END; \ No newline at end of file diff --git a/server/src-rsr/migrations/38_to_37.sql b/server/src-rsr/migrations/38_to_37.sql new file mode 100644 index 00000000000..f2772a266a7 --- /dev/null +++ b/server/src-rsr/migrations/38_to_37.sql @@ -0,0 +1,3 @@ +ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked TYPE BOOLEAN USING locked IS NOT NULL; +ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked SET NOT NULL; +ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked SET DEFAULT false; \ No newline at end of file From 02ddcb34a0f02910fd028c50f4861d69a62517bd Mon Sep 17 00:00:00 2001 From: Phil Freeman Date: Tue, 28 Jul 2020 16:11:51 -0700 Subject: [PATCH 2/4] Apply suggestions from code review Co-authored-by: Brandon Simmons --- server/src-lib/Hasura/Eventing/EventTrigger.hs | 2 +- server/src-lib/Hasura/RQL/DDL/EventTrigger.hs | 2 +- server/src-rsr/initialise.sql | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src-lib/Hasura/Eventing/EventTrigger.hs b/server/src-lib/Hasura/Eventing/EventTrigger.hs index 8c47c89b0fb..9d69b195436 100644 --- a/server/src-lib/Hasura/Eventing/EventTrigger.hs +++ b/server/src-lib/Hasura/Eventing/EventTrigger.hs @@ -412,7 +412,7 @@ fetchEvents limitI = WHERE id IN ( SELECT l.id FROM hdb_catalog.event_log l WHERE l.delivered = 'f' and l.error = 'f' - and (l.locked IS NULL or l.locked < NOW() - interval '30 minute') + and (l.locked IS NULL or l.locked < (NOW() - interval '30 minute')) and (l.next_retry_at is NULL or l.next_retry_at <= now()) and l.archived = 'f' ORDER BY created_at diff --git a/server/src-lib/Hasura/RQL/DDL/EventTrigger.hs b/server/src-lib/Hasura/RQL/DDL/EventTrigger.hs index 4b10419fd2f..5c0e7ded0fa 100644 --- a/server/src-lib/Hasura/RQL/DDL/EventTrigger.hs +++ b/server/src-lib/Hasura/RQL/DDL/EventTrigger.hs @@ -158,7 +158,7 @@ fetchEvent :: EventId -> Q.TxE QErr (EventId, Bool) fetchEvent eid = do events <- Q.listQE defaultTxErrorHandler [Q.sql| - SELECT l.id, l.locked IS NOT NULL AND l.locked >= NOW() - interval '30 minute' + SELECT l.id, l.locked IS NOT NULL AND l.locked >= (NOW() - interval '30 minute') FROM hdb_catalog.event_log l JOIN hdb_catalog.event_triggers e ON l.trigger_name = e.name diff --git a/server/src-rsr/initialise.sql b/server/src-rsr/initialise.sql index 45138e37684..0ea913bbb81 100644 --- a/server/src-rsr/initialise.sql +++ b/server/src-rsr/initialise.sql @@ -302,6 +302,7 @@ CREATE TABLE hdb_catalog.event_log error BOOLEAN NOT NULL DEFAULT FALSE, tries INTEGER NOT NULL DEFAULT 0, created_at TIMESTAMP DEFAULT NOW(), + /* when locked IS NULL the event is unlocked and can be processed */ locked TIMESTAMP, next_retry_at TIMESTAMP, archived BOOLEAN NOT NULL DEFAULT FALSE From 8cc548abf076d54177ffc1c06f6a88705ca2b9f3 Mon Sep 17 00:00:00 2001 From: Phil Freeman Date: Wed, 29 Jul 2020 14:08:27 -0700 Subject: [PATCH 3/4] Use TIMESTAMPTZ --- server/src-lib/Hasura/Eventing/EventTrigger.hs | 2 +- server/src-lib/Hasura/Eventing/ScheduledTrigger.hs | 4 ++-- server/src-lib/Hasura/GraphQL/Resolve/Action.hs | 2 +- server/src-lib/Hasura/GraphQL/Transport/HTTP.hs | 4 ++-- server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs | 6 +++--- server/src-lib/Hasura/Server/Auth.hs | 2 +- server/src-lib/Hasura/Server/Auth/JWT.hs | 2 +- server/src-rsr/initialise.sql | 2 +- server/src-rsr/migrations/37_to_38.sql | 2 +- 9 files changed, 13 insertions(+), 13 deletions(-) diff --git a/server/src-lib/Hasura/Eventing/EventTrigger.hs b/server/src-lib/Hasura/Eventing/EventTrigger.hs index 60120b1d203..fa6c7e862d3 100644 --- a/server/src-lib/Hasura/Eventing/EventTrigger.hs +++ b/server/src-lib/Hasura/Eventing/EventTrigger.hs @@ -210,7 +210,7 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx Tracing.runTraceTInContext tracingCtx t <- processEvent event - & runTraceT "Event trigger" + & runTraceT "process event" & withEventEngineCtx eeCtx & flip runReaderT (logger, httpMgr) & LA.async diff --git a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs index 81853564b3f..bc7e00b1bae 100644 --- a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs @@ -376,7 +376,7 @@ processCronEvents logger logEnv httpMgr pgpool getSC lockedCronEvents = do ctiRetryConf ctiHeaders ctiComment - finally <- Tracing.runTraceT "Scheduled event" . runExceptT $ + finally <- Tracing.runTraceT "scheduled event" . runExceptT $ runReaderT (processScheduledEvent logEnv pgpool scheduledEvent CronScheduledEvent) (logger, httpMgr) removeEventFromLockedEvents id' lockedCronEvents either logInternalError pure finally @@ -431,7 +431,7 @@ processStandAloneEvents env logger logEnv httpMgr pgpool lockedStandAloneEvents retryConf headerInfo' comment - finally <- Tracing.runTraceT "Scheduled event" . runExceptT $ + finally <- Tracing.runTraceT "scheduled event" . runExceptT $ runReaderT (processScheduledEvent logEnv pgpool scheduledEvent StandAloneEvent) $ (logger, httpMgr) removeEventFromLockedEvents id' lockedStandAloneEvents diff --git a/server/src-lib/Hasura/GraphQL/Resolve/Action.hs b/server/src-lib/Hasura/GraphQL/Resolve/Action.hs index 169333c8fcb..5eea760108d 100644 --- a/server/src-lib/Hasura/GraphQL/Resolve/Action.hs +++ b/server/src-lib/Hasura/GraphQL/Resolve/Action.hs @@ -428,7 +428,7 @@ asyncActionsProcessor env logger cacheRef pgPool httpManager = forever $ do either mempty return res callHandler :: ActionCache -> ActionLogItem -> m () - callHandler actionCache actionLogItem = Tracing.runTraceT "Async actions processor" do + callHandler actionCache actionLogItem = Tracing.runTraceT "async actions processor" do let ActionLogItem actionId actionName reqHeaders sessionVariables inputPayload = actionLogItem case Map.lookup actionName actionCache of diff --git a/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs b/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs index 451340ea053..70cd3e37ae9 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs @@ -168,12 +168,12 @@ runHasuraGQ runHasuraGQ reqId (query, queryParsed) userInfo resolvedOp = do (E.ExecutionCtx logger _ pgExecCtx _ _ _ _ _) <- ask (telemTimeIO, respE) <- withElapsedTime $ runExceptT $ case resolvedOp of - E.ExOpQuery tx genSql asts -> trace "Query" $ do + E.ExOpQuery tx genSql asts -> trace "pg" $ do -- log the generated SQL and the graphql query logQueryLog logger query genSql reqId Tracing.interpTraceT id $ executeQuery queryParsed asts genSql pgExecCtx Q.ReadOnly tx - E.ExOpMutation respHeaders tx -> trace "Mutation" $ do + E.ExOpMutation respHeaders tx -> trace "pg" $ do logQueryLog logger query Nothing reqId ctx <- Tracing.currentContext (respHeaders,) <$> Tracing.interpTraceT (runLazyTx pgExecCtx Q.ReadWrite . withTraceContext ctx . withUserInfo userInfo) tx diff --git a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs index 57ce18cbdd2..a50e1ab8801 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs @@ -363,11 +363,11 @@ onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do -> E.ExecOp (Tracing.TraceT (LazyTx QErr)) -> ExceptT () m () runHasuraGQ timerTot telemCacheHit reqId query queryParsed userInfo = \case - E.ExOpQuery opTx genSql asts -> Tracing.trace "Query" $ + E.ExOpQuery opTx genSql asts -> Tracing.trace "pg" $ execQueryOrMut Telem.Query genSql . fmap snd $ Tracing.interpTraceT id $ executeQuery queryParsed asts genSql pgExecCtx Q.ReadOnly opTx -- Response headers discarded over websockets - E.ExOpMutation _ opTx -> Tracing.trace "Mutation" do + E.ExOpMutation _ opTx -> Tracing.trace "pg" do execQueryOrMut Telem.Mutation Nothing $ Tracing.interpTraceT (runLazyTx pgExecCtx Q.ReadWrite . withUserInfo userInfo) opTx E.ExOpSubs lqOp -> do @@ -524,7 +524,7 @@ onMessage -> AuthMode -> WSServerEnv -> WSConn -> BL.ByteString -> m () -onMessage env authMode serverEnv wsConn msgRaw = Tracing.runTraceT "Websocket" do +onMessage env authMode serverEnv wsConn msgRaw = Tracing.runTraceT "websocket" do case J.eitherDecode msgRaw of Left e -> do let err = ConnErrMsg $ "parsing ClientMessage failed: " <> T.pack e diff --git a/server/src-lib/Hasura/Server/Auth.hs b/server/src-lib/Hasura/Server/Auth.hs index 1a9e6cf5fac..4e17d612e32 100644 --- a/server/src-lib/Hasura/Server/Auth.hs +++ b/server/src-lib/Hasura/Server/Auth.hs @@ -164,7 +164,7 @@ setupAuthMode mAdminSecretHash mWebHook mJwtSecret mUnAuthRole httpManager logge -- header), do not start a background thread for refreshing the JWK getJwkFromUrl url = do ref <- liftIO $ newIORef $ JWKSet [] - maybeExpiry <- withJwkError $ Tracing.runTraceT "JWK init" $ updateJwkRef logger httpManager url ref + maybeExpiry <- withJwkError $ Tracing.runTraceT "jwk init" $ updateJwkRef logger httpManager url ref case maybeExpiry of Nothing -> return ref Just time -> do diff --git a/server/src-lib/Hasura/Server/Auth/JWT.hs b/server/src-lib/Hasura/Server/Auth/JWT.hs index 12c85e7b6c6..a489606ec54 100644 --- a/server/src-lib/Hasura/Server/Auth/JWT.hs +++ b/server/src-lib/Hasura/Server/Auth/JWT.hs @@ -136,7 +136,7 @@ jwkRefreshCtrl -> m void jwkRefreshCtrl logger manager url ref time = do liftIO $ C.sleep time - forever $ Tracing.runTraceT "JWK refresh" do + forever $ Tracing.runTraceT "jwk refresh" do res <- runExceptT $ updateJwkRef logger manager url ref mTime <- either (const $ logNotice >> return Nothing) return res -- if can't parse time from header, defaults to 1 min diff --git a/server/src-rsr/initialise.sql b/server/src-rsr/initialise.sql index 0ea913bbb81..076ff1ea3c7 100644 --- a/server/src-rsr/initialise.sql +++ b/server/src-rsr/initialise.sql @@ -303,7 +303,7 @@ CREATE TABLE hdb_catalog.event_log tries INTEGER NOT NULL DEFAULT 0, created_at TIMESTAMP DEFAULT NOW(), /* when locked IS NULL the event is unlocked and can be processed */ - locked TIMESTAMP, + locked TIMESTAMPTZ, next_retry_at TIMESTAMP, archived BOOLEAN NOT NULL DEFAULT FALSE ); diff --git a/server/src-rsr/migrations/37_to_38.sql b/server/src-rsr/migrations/37_to_38.sql index 0d07dc3f9a0..ba366b6ca82 100644 --- a/server/src-rsr/migrations/37_to_38.sql +++ b/server/src-rsr/migrations/37_to_38.sql @@ -1,3 +1,3 @@ ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked DROP DEFAULT; ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked DROP NOT NULL; -ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked TYPE TIMESTAMP USING CASE WHEN locked THEN NOW() ELSE NULL END; \ No newline at end of file +ALTER TABLE hdb_catalog.event_log ALTER COLUMN locked TYPE TIMESTAMPTZ USING CASE WHEN locked THEN NOW() ELSE NULL END; \ No newline at end of file From b24af626280506a9c6c0b3340f277bc89c702658 Mon Sep 17 00:00:00 2001 From: Phil Freeman Date: Thu, 30 Jul 2020 12:33:43 -0700 Subject: [PATCH 4/4] Remove prepareEvents --- server/src-lib/Hasura/App.hs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index b50767a070b..619077a15cd 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -368,7 +368,6 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos lockedEventsCtx <- liftIO $ atomically $ initLockedEventsCtx -- prepare event triggers data - prepareEvents _icPgPool logger eventEngineCtx <- liftIO $ atomically $ initEventEngineCtx maxEvThrds fetchI unLogger logger $ mkGenericStrLog LevelInfo "event_triggers" "starting workers" @@ -423,8 +422,10 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos liftIO $ Warp.runSettings warpSettings app where - -- | prepareEvents is a function to unlock all the events that are - -- locked and unprocessed, which is called while hasura is started. + -- | prepareScheduledEvents is a function to unlock all the scheduled trigger + -- events that are locked and unprocessed, which is called while hasura is + -- started. + -- -- Locked and unprocessed events can occur in 2 ways -- 1. -- Hasura's shutdown was not graceful in which all the fetched @@ -434,12 +435,6 @@ runHGEServer env ServeOptions{..} InitCtx{..} pgExecCtx initTime shutdownApp pos -- There is another hasura instance which is processing events and -- it will lock events to process them. -- So, unlocking all the locked events might re-deliver an event(due to #2). - prepareEvents pool (Logger logger) = do - liftIO $ logger $ mkGenericStrLog LevelInfo "event_triggers" "preparing data" - res <- liftIO $ runTx pool (Q.ReadCommitted, Nothing) unlockAllEvents - either (printErrJExit EventSubSystemError) return res - - -- | prepareScheduledEvents is like prepareEvents, but for scheduled triggers prepareScheduledEvents pool (Logger logger) = do liftIO $ logger $ mkGenericStrLog LevelInfo "scheduled_triggers" "preparing data" res <- liftIO $ runTx pool (Q.ReadCommitted, Nothing) unlockAllLockedScheduledEvents