mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-15 17:31:56 +03:00
server, multi-tenant: update error logging strategy on pro and multitenant
Co-authored-by: Lyndon Maydwell <92299+sordina@users.noreply.github.com> GitOrigin-RevId: 7462d36488003bfdacb5566c7a0e9f273a937a0e
This commit is contained in:
parent
9cbb2484ad
commit
e24abede99
@ -12,6 +12,7 @@ import Hasura.Prelude
|
|||||||
import qualified Control.Concurrent.Extended as C
|
import qualified Control.Concurrent.Extended as C
|
||||||
import qualified Control.Concurrent.STM as STM
|
import qualified Control.Concurrent.STM as STM
|
||||||
import qualified Control.Immortal as Immortal
|
import qualified Control.Immortal as Immortal
|
||||||
|
import qualified Control.Monad.Loops as L
|
||||||
import qualified Data.HashMap.Strict as HM
|
import qualified Data.HashMap.Strict as HM
|
||||||
import qualified Data.HashSet as HS
|
import qualified Data.HashSet as HS
|
||||||
import qualified Database.PG.Query as Q
|
import qualified Database.PG.Query as Q
|
||||||
@ -173,12 +174,35 @@ forcePut v a = STM.atomically $ STM.tryTakeTMVar v >> STM.putTMVar v a
|
|||||||
schemaVersionCheckHandler
|
schemaVersionCheckHandler
|
||||||
:: Q.PGPool -> STM.TMVar MetadataResourceVersion -> IO (Either QErr ())
|
:: Q.PGPool -> STM.TMVar MetadataResourceVersion -> IO (Either QErr ())
|
||||||
schemaVersionCheckHandler pool metaVersionRef =
|
schemaVersionCheckHandler pool metaVersionRef =
|
||||||
(runExceptT $
|
runExceptT (Q.runTx pool (Q.RepeatableRead, Nothing)
|
||||||
Q.runTx pool (Q.RepeatableRead, Nothing) $
|
$ fetchMetadataResourceVersionFromCatalog)
|
||||||
fetchMetadataResourceVersionFromCatalog) >>= \case
|
>>=
|
||||||
Right version ->
|
\case
|
||||||
Right <$> forcePut metaVersionRef version
|
Right version -> Right <$> forcePut metaVersionRef version
|
||||||
Left err -> pure $ Left err
|
Left err -> pure $ Left err
|
||||||
|
|
||||||
|
data ErrorState = ErrorState { _unState :: (Maybe MetadataResourceVersion) }
|
||||||
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
-- NOTE: The ErrorState type is to be used mainly for the `listener` method below.
|
||||||
|
-- This will help prevent logging the same error with the same MetadataResourceVersion
|
||||||
|
-- multiple times consecutively. When the `listener` is in ErrorState we don't log the
|
||||||
|
-- next error until the resource version has changed/updated.
|
||||||
|
|
||||||
|
emptyErrorState :: ErrorState
|
||||||
|
emptyErrorState = ErrorState Nothing
|
||||||
|
|
||||||
|
-- | NOTE: this can be updated to use lenses
|
||||||
|
updateErrorInState :: ErrorState -> MetadataResourceVersion -> ErrorState
|
||||||
|
updateErrorInState es mrv = es { _unState = (Just mrv) }
|
||||||
|
|
||||||
|
isInErrorState :: ErrorState -> Bool
|
||||||
|
isInErrorState = isJust . _unState
|
||||||
|
|
||||||
|
toLogError :: ErrorState -> MetadataResourceVersion -> Bool
|
||||||
|
toLogError es mrv = case _unState es of
|
||||||
|
Just mrv' -> mrv' /= mrv
|
||||||
|
Nothing -> True
|
||||||
|
|
||||||
-- | An IO action that listens to postgres for events and pushes them to a Queue, in a loop forever.
|
-- | An IO action that listens to postgres for events and pushes them to a Queue, in a loop forever.
|
||||||
listener
|
listener
|
||||||
@ -188,12 +212,27 @@ listener
|
|||||||
-> STM.TMVar MetadataResourceVersion
|
-> STM.TMVar MetadataResourceVersion
|
||||||
-> Milliseconds
|
-> Milliseconds
|
||||||
-> m void
|
-> m void
|
||||||
listener logger pool metaVersionRef interval =
|
listener logger pool metaVersionRef interval = L.iterateM_ listenerLoop emptyErrorState
|
||||||
forever $ do
|
where
|
||||||
respErr <- liftIO $ schemaVersionCheckHandler pool metaVersionRef
|
listenerLoop errorState = do
|
||||||
liftIO $ do
|
mrv <- liftIO $ STM.atomically $ STM.tryTakeTMVar metaVersionRef
|
||||||
onLeft respErr (logError logger TTListener . TEQueryError)
|
resp <- liftIO $ schemaVersionCheckHandler pool metaVersionRef
|
||||||
C.sleep (milliseconds interval)
|
let metadataVersion = fromMaybe initialResourceVersion mrv
|
||||||
|
nextErr <- case resp of
|
||||||
|
Left respErr -> do
|
||||||
|
if (toLogError errorState metadataVersion)
|
||||||
|
then do
|
||||||
|
logError logger TTListener $ TEQueryError respErr
|
||||||
|
logInfo logger TTListener $ object [ "metadataResourceVersion" .= toJSON metadataVersion ]
|
||||||
|
pure $ updateErrorInState errorState metadataVersion
|
||||||
|
else do
|
||||||
|
pure errorState
|
||||||
|
Right _ -> do
|
||||||
|
when (isInErrorState errorState) $
|
||||||
|
logInfo logger TTListener $ object [ "message" .= ("SchemaSync Restored..." :: Text) ]
|
||||||
|
pure emptyErrorState
|
||||||
|
liftIO $ C.sleep $ milliseconds interval
|
||||||
|
pure nextErr
|
||||||
|
|
||||||
-- | An IO action that processes events from Queue, in a loop forever.
|
-- | An IO action that processes events from Queue, in a loop forever.
|
||||||
processor
|
processor
|
||||||
|
Loading…
Reference in New Issue
Block a user