2019-03-12 08:46:27 +03:00
module Hasura.Server.SchemaUpdate
import Hasura.Prelude
import Hasura.Logging
2019-08-14 02:34:37 +03:00
import Hasura.RQL.DDL.Schema (buildSchemaCacheWithoutSetup)
2019-03-12 08:46:27 +03:00
import Hasura.RQL.Types
2019-08-14 02:34:37 +03:00
import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate)
import Hasura.Server.Init (InstanceId (..))
2019-03-12 08:46:27 +03:00
import Hasura.Server.Logging
import Hasura.Server.Query
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
2019-08-14 02:34:37 +03:00
import qualified Control.Concurrent as C
import qualified Control.Concurrent.STM as STM
import qualified Data.Text as T
import qualified Data.Time as UTC
import qualified Database.PG.Query as PG
import qualified Database.PostgreSQL.LibPQ as PQ
import qualified Network.HTTP.Client as HTTP
2019-03-12 08:46:27 +03:00
pgChannel :: PG.PGChannel
pgChannel = "hasura_schema_update"
data ThreadType
= TTListener
| TTProcessor
deriving (Eq)
instance Show ThreadType where
show TTListener = "listener"
show TTProcessor = "processor"
data SchemaSyncThreadLog
= SchemaSyncThreadLog
{ suelLogLevel :: !LogLevel
, suelThreadType :: !ThreadType
, suelInfo :: !Value
} deriving (Show, Eq)
instance ToJSON SchemaSyncThreadLog where
toJSON (SchemaSyncThreadLog _ t info) =
object [ "thread_type" .= show t
, "info" .= info
instance ToEngineLog SchemaSyncThreadLog where
toEngineLog threadLog =
2019-07-11 08:37:06 +03:00
(suelLogLevel threadLog, ELTSchemaSyncThread, toJSON threadLog)
2019-03-12 08:46:27 +03:00
data EventPayload
= EventPayload
{ _epInstanceId :: !InstanceId
, _epOccurredAt :: !UTC.UTCTime
} deriving (Show, Eq)
$(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload)
data ThreadError
= TEJsonParse !T.Text
| TEQueryError !QErr
defaultOptions { constructorTagModifier = snakeCase . drop 2
, sumEncoding = TaggedObject "type" "info"
-- | An IO action that enables metadata syncing
2019-04-17 19:29:39 +03:00
:: SQLGenCtx
2019-03-12 08:46:27 +03:00
-> PG.PGPool
-> Logger
-> HTTP.Manager
-> SchemaCacheRef
-> InstanceId
-> Maybe UTC.UTCTime -> IO ()
2019-04-17 19:29:39 +03:00
startSchemaSync sqlGenCtx pool logger httpMgr cacheRef instanceId cacheInitTime = do
2019-05-03 13:42:26 +03:00
-- only the latest event is recorded here
-- we don't want to store and process all the events, only the latest event
updateEventRef <- STM.newTVarIO Nothing
2019-03-12 08:46:27 +03:00
-- Start listener thread
2019-04-17 19:29:39 +03:00
lTId <- C.forkIO $ listener sqlGenCtx pool
2019-05-03 13:42:26 +03:00
logger httpMgr updateEventRef cacheRef instanceId cacheInitTime
2019-03-12 08:46:27 +03:00
logThreadStarted TTListener lTId
-- Start processor thread
2019-04-17 19:29:39 +03:00
pTId <- C.forkIO $ processor sqlGenCtx pool
2019-05-03 13:42:26 +03:00
logger httpMgr updateEventRef cacheRef instanceId
2019-03-12 08:46:27 +03:00
logThreadStarted TTProcessor pTId
logThreadStarted threadType threadId =
let msg = T.pack (show threadType) <> " thread started"
in unLogger logger $
2019-07-11 08:37:06 +03:00
StartupLog LevelInfo "schema-sync" $
2019-03-12 08:46:27 +03:00
object [ "instance_id" .= getInstanceId instanceId
, "thread_id" .= show threadId
, "message" .= msg
-- | An IO action that listens to postgres for events and pushes them to a Queue
2019-04-17 19:29:39 +03:00
:: SQLGenCtx
2019-03-12 08:46:27 +03:00
-> PG.PGPool
-> Logger
-> HTTP.Manager
2019-05-03 13:42:26 +03:00
-> STM.TVar (Maybe EventPayload)
2019-03-12 08:46:27 +03:00
-> SchemaCacheRef
-> InstanceId
-> Maybe UTC.UTCTime -> IO ()
2019-05-03 13:42:26 +03:00
listener sqlGenCtx pool logger httpMgr updateEventRef
2019-03-12 08:46:27 +03:00
cacheRef instanceId cacheInitTime =
-- Never exits
forever $ do
listenResE <-
liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler
either onError return listenResE
C.threadDelay $ 1 * 1000 * 1000 -- 1 second
threadType = TTListener
shouldRefresh dbInstId accrdAt =
case cacheInitTime of
Nothing -> True
Just time -> (dbInstId /= instanceId) && accrdAt > time
refreshCache Nothing = return ()
refreshCache (Just (dbInstId, accrdAt)) =
when (shouldRefresh dbInstId accrdAt) $
2019-04-17 19:29:39 +03:00
refreshSchemaCache sqlGenCtx pool logger httpMgr cacheRef
2019-03-12 08:46:27 +03:00
threadType "schema cache reloaded after postgres listen init"
notifyHandler = \case
PG.PNEOnStart -> do
eRes <- runExceptT $ PG.runTx pool
(PG.Serializable, Nothing) fetchLastUpdate
case eRes of
Left e -> onError e
Right mLastUpd -> refreshCache mLastUpd
PG.PNEPQNotify notif ->
case eitherDecodeStrict $ PQ.notifyExtra notif of
Left e -> logError logger threadType $ TEJsonParse $ T.pack e
Right payload -> do
logInfo logger threadType $ object ["received_event" .= payload]
-- Push a notify event to Queue
2019-05-03 13:42:26 +03:00
STM.atomically $ STM.writeTVar updateEventRef $ Just payload
2019-03-12 08:46:27 +03:00
onError = logError logger threadType . TEQueryError
logWarn = unLogger logger $
SchemaSyncThreadLog LevelWarn TTListener $ String
"error occurred, retrying postgres listen after 1 second"
-- | An IO action that processes events from Queue
2019-04-17 19:29:39 +03:00
:: SQLGenCtx
2019-03-12 08:46:27 +03:00
-> PG.PGPool
-> Logger
-> HTTP.Manager
2019-05-03 13:42:26 +03:00
-> STM.TVar (Maybe EventPayload)
2019-03-12 08:46:27 +03:00
-> SchemaCacheRef
-> InstanceId -> IO ()
2019-05-03 13:42:26 +03:00
processor sqlGenCtx pool logger httpMgr updateEventRef
2019-03-12 08:46:27 +03:00
cacheRef instanceId =
-- Never exits
forever $ do
2019-05-03 13:42:26 +03:00
event <- STM.atomically getLatestEvent
2019-03-12 08:46:27 +03:00
logInfo logger threadType $ object ["processed_event" .= event]
when (shouldReload event) $
2019-04-17 19:29:39 +03:00
refreshSchemaCache sqlGenCtx pool logger httpMgr cacheRef
2019-03-12 08:46:27 +03:00
threadType "schema cache reloaded"
2019-05-03 13:42:26 +03:00
-- checks if there is an event
-- and replaces it with Nothing
getLatestEvent = do
eventM <- STM.readTVar updateEventRef
case eventM of
Just event -> do
STM.writeTVar updateEventRef Nothing
return event
Nothing -> STM.retry
2019-03-12 08:46:27 +03:00
threadType = TTProcessor
-- If event is from another server
shouldReload payload = _epInstanceId payload /= instanceId
2019-04-17 19:29:39 +03:00
:: SQLGenCtx
2019-03-12 08:46:27 +03:00
-> PG.PGPool
-> Logger
-> HTTP.Manager
-> SchemaCacheRef
-> ThreadType
-> T.Text -> IO ()
2019-04-17 19:29:39 +03:00
refreshSchemaCache sqlGenCtx pool logger httpManager cacheRef threadType msg = do
2019-03-12 08:46:27 +03:00
-- Reload schema cache from catalog
2019-04-29 09:22:48 +03:00
resE <- liftIO $ runExceptT $ withSCUpdate cacheRef logger $
2019-11-15 03:20:18 +03:00
peelRun emptySchemaCache runCtx pgCtx PG.ReadWrite buildSchemaCacheWithoutSetup
2019-03-12 08:46:27 +03:00
case resE of
2019-10-11 08:13:57 +03:00
Left e -> logError logger threadType $ TEQueryError e
Right _ -> logInfo logger threadType $ object ["message" .= msg]
2019-10-21 19:01:05 +03:00
runCtx = RunCtx adminUserInfo httpManager sqlGenCtx
2019-10-11 08:13:57 +03:00
pgCtx = PGExecCtx pool PG.Serializable
2019-03-12 08:46:27 +03:00
logInfo :: Logger -> ThreadType -> Value -> IO ()
logInfo logger threadType val = unLogger logger $
SchemaSyncThreadLog LevelInfo threadType val
logError :: ToJSON a => Logger -> ThreadType -> a -> IO ()
logError logger threadType err =
unLogger logger $ SchemaSyncThreadLog LevelError threadType $
object ["error" .= toJSON err]