server: expand metadata storage class with async actions and core metadata operations (#184)

An incremental PR towards https://github.com/hasura/graphql-engine/pull/5797
- Expands `MonadMetadataStorage` with operations related to async actions and setting/updating metadata

GitOrigin-RevId: 53386b7b2d007e162050b826d0708897f0b4c8f6
This commit is contained in:
Rakesh Emmadi 2020-12-14 10:00:19 +05:30 committed by hasura-bot
parent 479fc6efb0
commit a153e96309
29 changed files with 741 additions and 397 deletions

View File

@ -10,6 +10,7 @@ import Data.Time.Clock.POSIX (getPOSIXTime)
import Hasura.App
import Hasura.Logging (Hasura, LogLevel (..), defaultEnabledEngineLogTypes)
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.Schema
import Hasura.RQL.Types
@ -85,14 +86,16 @@ runApp env (HGEOptionsG rci hgeCmd) = do
queryBs <- liftIO BL.getContents
let sqlGenCtx = SQLGenCtx False
pool <- mkMinimalPool _gcConnInfo
res <- runAsAdmin pool sqlGenCtx _gcHttpManager $ do
schemaCache <- buildRebuildableSchemaCache env
metadata <- liftTx fetchMetadataFromCatalog
execQuery env queryBs
& Tracing.runTraceTWithReporter Tracing.noReporter "execute"
& runMetadataT metadata
& runCacheRWT schemaCache
& fmap (\((res, _), _, _) -> res)
res <- flip runPGMetadataStorageApp pool $
runMetadataStorageT $ liftEitherM $
runAsAdmin pool sqlGenCtx _gcHttpManager $ do
metadata <- liftTx fetchMetadataFromCatalog
schemaCache <- buildRebuildableSchemaCache env metadata
execQuery env queryBs
& Tracing.runTraceTWithReporter Tracing.noReporter "execute"
& runMetadataT metadata
& runCacheRWT schemaCache
& fmap (\((res, _), _, _) -> res)
either (printErrJExit ExecuteProcessError) (liftIO . BLC.putStrLn) res
HCDowngrade opts -> do

View File

@ -13,7 +13,6 @@ import Control.Monad.Stateless
import Control.Monad.STM (atomically)
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Control.Monad.Unique
import Data.Aeson ((.=))
import Data.Time.Clock (UTCTime)
#ifndef PROFILING
import GHC.AssertNF
@ -26,6 +25,7 @@ import System.Mem (performMajorGC)
import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Control.Concurrent.Extended as C
import qualified Control.Immortal as Immortal
import qualified Data.Aeson as J
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy.Char8 as BLC
@ -48,7 +48,7 @@ import Hasura.Eventing.EventTrigger
import Hasura.Eventing.ScheduledTrigger
import Hasura.GraphQL.Execute (MonadGQLExecutionCheck (..),
checkQueryInAllowlist)
import Hasura.GraphQL.Execute.Action (asyncActionsProcessor)
import Hasura.GraphQL.Execute.Action
import Hasura.GraphQL.Execute.Query (MonadQueryInstrumentation (..),
noProfile)
import Hasura.GraphQL.Logging (MonadQueryLog (..), QueryLog (..))
@ -58,6 +58,7 @@ import Hasura.Logging
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.Schema.Cache
import Hasura.RQL.DDL.Schema.Catalog
import Hasura.RQL.Types
import Hasura.RQL.Types.Run
import Hasura.Server.API.Query (requiresAdmin, runQueryM)
@ -178,7 +179,7 @@ data ServeCtx
, _scConnInfo :: !Q.ConnInfo
, _scPgPool :: !Q.PGPool
, _scShutdownLatch :: !ShutdownLatch
, _scSchemaCache :: !(RebuildableSchemaCache Run)
, _scSchemaCache :: !RebuildableSchemaCache
, _scSchemaSyncCtx :: !SchemaSyncCtx
}
@ -202,7 +203,7 @@ newtype PGMetadataStorageApp a
-- | Initializes or migrates the catalog and returns the context required to start the server.
initialiseServeCtx
:: (HasVersion, MonadIO m, MonadCatch m)
:: (HasVersion, MonadIO m, MonadBaseControl IO m, MonadCatch m)
=> Env.Environment
-> GlobalCtx
-> ServeOptions Hasura
@ -243,9 +244,9 @@ mkLoggers enabledLogs logLevel = do
-- | helper function to initialize or migrate the @hdb_catalog@ schema (used by pro as well)
migrateCatalogSchema
:: (HasVersion, MonadIO m)
:: (HasVersion, MonadIO m, MonadBaseControl IO m)
=> Env.Environment -> Logger Hasura -> Q.PGPool -> HTTP.Manager -> SQLGenCtx
-> m (RebuildableSchemaCache Run, UTCTime)
-> m (RebuildableSchemaCache, UTCTime)
migrateCatalogSchema env logger pool httpManager sqlGenCtx = do
let pgExecCtx = mkPGExecCtx Q.Serializable pool
adminRunCtx = RunCtx adminUserInfo httpManager sqlGenCtx
@ -413,7 +414,7 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po
-- start a backgroud thread to handle async actions
asyncActionsThread <- C.forkImmortal "asyncActionsProcessor" logger $
asyncActionsProcessor env logger (_scrCache cacheRef) _scPgPool _scHttpManager
asyncActionsProcessor env logger (_scrCache cacheRef) _scHttpManager
-- start a background thread to create new cron events
cronEventsThread <- C.forkImmortal "runCronEventsGenerator" logger $
@ -461,16 +462,16 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po
shutdownHandler' <- liftWithStateless $ \lowerIO ->
pure $ shutdownHandler _scLoggers immortalThreads stopWsServer lockedEventsCtx _scPgPool $
\a b -> hoist lowerIO $ unlockScheduledEvents a b
-- Install a variant of forkIOWithUnmask which tracks Warp threads using an EKG metric
let setForkIOWithMetrics :: Warp.Settings -> Warp.Settings
setForkIOWithMetrics = Warp.setFork \f -> do
void $ C.forkIOWithUnmask (\unmask ->
bracket_
bracket_
(EKG.Gauge.inc $ smWarpThreads serverMetrics)
(EKG.Gauge.dec $ smWarpThreads serverMetrics)
(f unmask))
let warpSettings = Warp.setPort soPort
. Warp.setHost soHost
. Warp.setGracefulShutdownTimeout (Just 30) -- 30s graceful shutdown
@ -625,11 +626,11 @@ ourIdleGC (Logger logger) idleInterval minGCInterval maxNoGCInterval =
go gcs major_gcs timerSinceLastMajorGC
runAsAdmin
:: (MonadIO m)
:: (MonadIO m, MonadBaseControl IO m)
=> Q.PGPool
-> SQLGenCtx
-> HTTP.Manager
-> Run a
-> RunT m a
-> m (Either QErr a)
runAsAdmin pool sqlGenCtx httpManager m = do
let runCtx = RunCtx adminUserInfo httpManager sqlGenCtx
@ -647,6 +648,7 @@ execQuery
, UserInfoM m
, Tracing.MonadTrace m
, MetadataM m
, MonadScheduledEvents m
)
=> Env.Environment
-> BLC.ByteString
@ -684,8 +686,8 @@ instance UserAuthentication (Tracing.TraceT PGMetadataStorageApp) where
runExceptT $ getUserInfoWithExpTime logger manager headers authMode
instance MetadataApiAuthorization PGMetadataStorageApp where
authorizeMetadataApi query userInfo = do
let currRole = _uiRole userInfo
authorizeMetadataApi query handlerCtx = runExceptT do
let currRole = _uiRole $ hcUser handlerCtx
when (requiresAdmin query && currRole /= adminRoleName) $
withPathK "args" $ throw400 AccessDenied errMsg
where
@ -716,11 +718,33 @@ runInSeparateTx tx = do
pool <- lift ask
liftEitherM $ liftIO $ runExceptT $ Q.runTx pool (Q.RepeatableRead, Nothing) tx
-- | Using @pg_notify@ function to publish schema sync events to other server
-- instances via 'hasura_schema_update' channel.
-- See Note [Schema Cache Sync]
notifySchemaCacheSyncTx :: InstanceId -> CacheInvalidations -> Q.TxE QErr ()
notifySchemaCacheSyncTx instanceId invalidations = do
Q.Discard () <- Q.withQE defaultTxErrorHandler [Q.sql|
SELECT pg_notify('hasura_schema_update', json_build_object(
'instance_id', $1,
'occurred_at', NOW(),
'invalidations', $2
)::text
)
|] (instanceId, Q.AltJ invalidations) True
pure ()
-- | Each of the function in the type class is executed in a totally separate transaction.
--
-- To learn more about why the instance is derived as following, see Note [Generic MetadataStorageT transformer]
instance MonadMetadataStorage (MetadataStorageT PGMetadataStorageApp) where
fetchMetadata = runInSeparateTx fetchMetadataFromCatalog
setMetadata = runInSeparateTx . setMetadataInCatalog
notifySchemaCacheSync a b = runInSeparateTx $ notifySchemaCacheSyncTx a b
processSchemaSyncEventPayload instanceId payload = do
EventPayload{..} <- decodeValue payload
pure $ SchemaSyncEventProcessResult (instanceId /= _epInstanceId) _epInvalidations
getDeprivedCronTriggerStats = runInSeparateTx getDeprivedCronTriggerStatsTx
getScheduledEventsForDelivery = runInSeparateTx getScheduledEventsForDeliveryTx
insertScheduledEvent = runInSeparateTx . insertScheduledEventTx
@ -728,6 +752,12 @@ instance MonadMetadataStorage (MetadataStorageT PGMetadataStorageApp) where
setScheduledEventOp a b c = runInSeparateTx $ setScheduledEventOpTx a b c
unlockScheduledEvents a b = runInSeparateTx $ unlockScheduledEventsTx a b
unlockAllLockedScheduledEvents = runInSeparateTx unlockAllLockedScheduledEventsTx
clearFutureCronEvents = runInSeparateTx . dropFutureCronEventsTx
insertAction a b c d = runInSeparateTx $ insertActionTx a b c d
fetchUndeliveredActionEvents = runInSeparateTx fetchUndeliveredActionEventsTx
setActionStatus a b = runInSeparateTx $ setActionStatusTx a b
fetchActionResponse = runInSeparateTx . fetchActionResponseTx
--- helper functions ---
@ -735,12 +765,12 @@ mkConsoleHTML :: HasVersion => Text -> AuthMode -> Bool -> Maybe Text -> Either
mkConsoleHTML path authMode enableTelemetry consoleAssetsDir =
renderHtmlTemplate consoleTmplt $
-- variables required to render the template
A.object [ "isAdminSecretSet" .= isAdminSecretSet authMode
, "consolePath" .= consolePath
, "enableTelemetry" .= boolToText enableTelemetry
, "cdnAssets" .= boolToText (isNothing consoleAssetsDir)
, "assetsVersion" .= consoleAssetsVersion
, "serverVersion" .= currentVersion
A.object [ "isAdminSecretSet" J..= isAdminSecretSet authMode
, "consolePath" J..= consolePath
, "enableTelemetry" J..= boolToText enableTelemetry
, "cdnAssets" J..= boolToText (isNothing consoleAssetsDir)
, "assetsVersion" J..= consoleAssetsVersion
, "serverVersion" J..= currentVersion
]
where
consolePath = case path of

View File

@ -47,8 +47,8 @@ import Hasura.Backends.Postgres.SQL.Error
import Hasura.Backends.Postgres.SQL.Types
import Hasura.EncJSON
import Hasura.RQL.Types.Error
import Hasura.SQL.Types
import Hasura.Session
import Hasura.SQL.Types
type RunTx =
forall m a. (MonadIO m, MonadBaseControl IO m) => Q.TxET QErr m a -> ExceptT QErr m a

View File

@ -82,6 +82,7 @@ module Hasura.Eventing.ScheduledTrigger
, unlockScheduledEventsTx
, unlockAllLockedScheduledEventsTx
, insertScheduledEventTx
, dropFutureCronEventsTx
) where
import Hasura.Prelude
@ -147,7 +148,7 @@ runCronEventsGenerator logger getSC = do
insertCronEventsFor cronTriggersForHydrationWithStats
onLeft eitherRes $ L.unLogger logger .
ScheduledTriggerInternalErr . err500 Unexpected . T.pack . show
ScheduledTriggerInternalErr . err500 Unexpected . tshow
liftIO $ sleep (minutes 1)
where
@ -660,6 +661,15 @@ insertScheduledEventTx = \case
toArr (CronEventSeed n t) = [(triggerNameToTxt n), (formatTime' t)]
toTupleExp = TupleExp . map SELit
dropFutureCronEventsTx :: TriggerName -> Q.TxE QErr ()
dropFutureCronEventsTx name =
Q.unitQE defaultTxErrorHandler
[Q.sql|
DELETE FROM hdb_catalog.hdb_cron_events
WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0
|] (Identity name) False
cronEventsTable :: QualifiedTable
cronEventsTable =
QualifiedObject "hdb_catalog" $ TableName "hdb_cron_events"

View File

@ -41,6 +41,7 @@ import Hasura.GraphQL.RemoteServer (execRemoteGQ')
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Utils (showName)
import Hasura.HTTP
import Hasura.Metadata.Class
import Hasura.RQL.Types
import Hasura.Server.Types (RequestId)
import Hasura.Server.Version (HasVersion)
@ -107,6 +108,10 @@ instance MonadGQLExecutionCheck m => MonadGQLExecutionCheck (Tracing.TraceT m) w
checkGQLExecution ui det enableAL sc req =
lift $ checkGQLExecution ui det enableAL sc req
instance MonadGQLExecutionCheck m => MonadGQLExecutionCheck (MetadataStorageT m) where
checkGQLExecution ui det enableAL sc req =
lift $ checkGQLExecution ui det enableAL sc req
getExecPlanPartial
:: (MonadError QErr m)
=> UserInfo
@ -208,6 +213,7 @@ getResolvedExecPlan
:: forall m tx
. ( HasVersion
, MonadError QErr m
, MonadMetadataStorage (MetadataStorageT m)
, MonadIO m
, Tracing.MonadTrace m
, EQ.MonadQueryInstrumentation m

View File

@ -1,10 +1,14 @@
module Hasura.GraphQL.Execute.Action
( ActionExecuteTx
( ActionExecuteTx(..)
, ActionExecuteResult(..)
, resolveAsyncActionQuery
, asyncActionsProcessor
, resolveActionExecution
, resolveActionMutationAsync
, resolveAsyncActionQuery
, insertActionTx
, fetchUndeliveredActionEventsTx
, setActionStatusTx
, fetchActionResponseTx
) where
import Hasura.Prelude
@ -19,7 +23,6 @@ import qualified Data.Environment as Env
import qualified Data.HashMap.Strict as Map
import qualified Data.HashSet as Set
import qualified Data.Text as T
import qualified Data.UUID as UUID
import qualified Database.PG.Query as Q
import qualified Language.GraphQL.Draft.Syntax as G
import qualified Network.HTTP.Client as HTTP
@ -31,11 +34,12 @@ import Control.Exception (try)
import Control.Lens
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Has
import Data.IORef
import Data.Int (Int64)
import Data.IORef
import Data.Text.Extended
import qualified Hasura.Backends.Postgres.Execute.RemoteJoin as RJ
import qualified Hasura.Backends.Postgres.SQL.DML as S
import qualified Hasura.Backends.Postgres.Translate.Select as RS
import qualified Hasura.Logging as L
import qualified Hasura.RQL.IR.Select as RS
@ -43,26 +47,30 @@ import qualified Hasura.Tracing as Tracing
import Hasura.Backends.Postgres.SQL.Types
import Hasura.Backends.Postgres.SQL.Value (PGScalarValue (..))
import Hasura.Backends.Postgres.Translate.Select (asSingleRowJsonResp)
import Hasura.Backends.Postgres.Translate.Column (toTxtValue)
import Hasura.Backends.Postgres.Translate.Select (asSingleRowJsonResp)
import Hasura.EncJSON
import Hasura.GraphQL.Execute.Prepare
import Hasura.GraphQL.Parser
import Hasura.GraphQL.Utils (showNames)
import Hasura.HTTP
import Hasura.Metadata.Class
import Hasura.RQL.DDL.Headers
import Hasura.RQL.DDL.Schema.Cache
import Hasura.RQL.Types
import Hasura.RQL.Types.Run
import Hasura.SQL.Types
import Hasura.Server.Utils (mkClientHeadersForward,
mkSetCookieHeaders)
import Hasura.Server.Version (HasVersion)
import Hasura.Session
import Hasura.SQL.Types
type ActionExecuteTx =
forall tx. (MonadIO tx, MonadTx tx, Tracing.MonadTrace tx) => tx EncJSON
newtype ActionExecuteTx =
ActionExecuteTx {
unActionExecuteTx
:: forall tx
. (MonadIO tx, MonadTx tx, Tracing.MonadTrace tx) => tx EncJSON
}
newtype ActionContext
= ActionContext {_acName :: ActionName}
@ -93,7 +101,7 @@ instance J.FromJSON ActionWebhookResponse where
parseJSON v = case v of
J.Array{} -> AWRArray <$> J.parseJSON v
J.Object{} -> AWRObject <$> J.parseJSON v
_ -> fail $ "expecting object or array of objects for action webhook response"
_ -> fail "expecting object or array of objects for action webhook response"
instance J.ToJSON ActionWebhookResponse where
toJSON (AWRArray objects) = J.toJSON objects
@ -137,8 +145,8 @@ instance L.ToEngineLog ActionHandlerLog L.Hasura where
data ActionExecuteResult
= ActionExecuteResult
{ _aerTransaction :: !ActionExecuteTx
, _aerHeaders :: !HTTP.ResponseHeaders
{ _aerExecution :: !ActionExecuteTx
, _aerHeaders :: !HTTP.ResponseHeaders
}
-- | Synchronously execute webhook handler and resolve response to action "output"
@ -173,7 +181,7 @@ resolveActionExecution env logger userInfo annAction execContext = do
executeAction :: RS.AnnSimpleSel 'Postgres -> ActionExecuteTx
executeAction astResolved = do
executeAction astResolved = ActionExecuteTx do
let (astResolvedWithoutRemoteJoins,maybeRemoteJoins) = RJ.getRemoteJoins astResolved
jsonAggType = mkJsonAggSelect outputType
case maybeRemoteJoins of
@ -199,29 +207,17 @@ table provides the action response. See Note [Resolving async action query/subsc
-- | Resolve asynchronous action mutation which returns only the action uuid
resolveActionMutationAsync
:: ( MonadError QErr m
, MonadTx tx
)
:: (MonadMetadataStorage m)
=> AnnActionMutationAsync
-> [HTTP.Header]
-> SessionVariables
-> m (tx EncJSON)
resolveActionMutationAsync annAction reqHeaders sessionVariables =
pure $ liftTx do
actionId <- runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler [Q.sql|
INSERT INTO
"hdb_catalog"."hdb_action_log"
("action_name", "session_variables", "request_headers", "input_payload", "status")
VALUES
($1, $2, $3, $4, $5)
RETURNING "id"
|]
(actionName, Q.AltJ sessionVariables, Q.AltJ $ toHeadersMap reqHeaders, Q.AltJ inputArgs, "created"::Text) False
pure $ encJFromJValue $ UUID.toText actionId
-> m ActionExecuteTx
resolveActionMutationAsync annAction reqHeaders sessionVariables = do
actionId <- insertAction actionName sessionVariables reqHeaders inputArgs
pure $ ActionExecuteTx $
pure $ encJFromJValue $ actionIdToText actionId
where
AnnActionMutationAsync actionName inputArgs = annAction
toHeadersMap = Map.fromList . map ((bsToTxt . CI.original) *** bsToTxt)
{- Note: [Resolving async action query/subscription]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -234,13 +230,14 @@ action's type. Here, we treat the "output" field as a computed field to hdb_acti
`jsonb_to_record` as custom SQL function.
-}
-- TODO: Add tracing here? Avoided now because currently the function is pure
resolveAsyncActionQuery
:: UserInfo
:: (MonadMetadataStorage m)
=> UserInfo
-> AnnActionAsyncQuery 'Postgres (UnpreparedValue 'Postgres)
-> RS.AnnSimpleSelG 'Postgres (UnpreparedValue 'Postgres)
resolveAsyncActionQuery userInfo annAction =
-> m (RS.AnnSimpleSelG 'Postgres (UnpreparedValue 'Postgres))
resolveAsyncActionQuery userInfo annAction = do
actionLogResponse <- fetchActionResponse actionId
let annotatedFields = asyncFields <&> second \case
AsyncTypename t -> RS.AFExpression t
AsyncOutput annFields ->
@ -251,32 +248,41 @@ resolveAsyncActionQuery userInfo annAction =
processOutputSelectionSet inputTableArgument outputType
definitionList annFields stringifyNumerics
AsyncId -> mkAnnFldFromPGCol "id" PGUUID
AsyncCreatedAt -> mkAnnFldFromPGCol "created_at" PGTimeStampTZ
AsyncErrors -> mkAnnFldFromPGCol "errors" PGJSONB
AsyncId -> mkAnnFldFromPGCol idColumn
AsyncCreatedAt -> mkAnnFldFromPGCol createdAtColumn
AsyncErrors -> mkAnnFldFromPGCol errorsColumn
tableFromExp = RS.FromTable actionLogTable
jsonbToRecordSet = QualifiedObject "pg_catalog" $ FunctionName "jsonb_to_recordset"
actionLogInput = UVLiteral $ S.SELit $ lbsToTxt $ J.encode [actionLogResponse]
functionArgs = RS.FunctionArgsExp [RS.AEInput actionLogInput] mempty
tableFromExp = RS.FromFunction jsonbToRecordSet functionArgs $ Just
[idColumn, createdAtColumn, responsePayloadColumn, errorsColumn, sessionVarsColumn]
tableArguments = RS.noSelectArgs
{ RS._saWhere = Just tableBoolExpression}
tablePermissions = RS.TablePerm annBoolExpTrue Nothing
in RS.AnnSelectG annotatedFields tableFromExp tablePermissions
tableArguments stringifyNumerics
pure $ RS.AnnSelectG annotatedFields tableFromExp tablePermissions
tableArguments stringifyNumerics
where
AnnActionAsyncQuery actionName actionId outputType asyncFields definitionList stringifyNumerics = annAction
actionLogTable = QualifiedObject (SchemaName "hdb_catalog") (TableName "hdb_action_log")
AnnActionAsyncQuery _ actionId outputType asyncFields definitionList stringifyNumerics = annAction
idColumn = (unsafePGCol "id", PGUUID)
responsePayloadColumn = (unsafePGCol "response_payload", PGJSONB)
createdAtColumn = (unsafePGCol "created_at", PGTimeStampTZ)
errorsColumn = (unsafePGCol "errors", PGJSONB)
sessionVarsColumn = (unsafePGCol "session_variables", PGJSONB)
-- TODO (from master):- Avoid using ColumnInfo
mkAnnFldFromPGCol column' columnType =
flip RS.mkAnnColumnField Nothing $
ColumnInfo (unsafePGCol column') (G.unsafeMkName column') 0 (ColumnScalar columnType) True Nothing
mkAnnFldFromPGCol = flip RS.mkAnnColumnField Nothing . mkPGColumnInfo
mkPGColumnInfo (column', columnType) =
ColumnInfo column' (G.unsafeMkName $ getPGColTxt column') 0 (ColumnScalar columnType) True Nothing
tableBoolExpression =
let actionIdColumnInfo = ColumnInfo (unsafePGCol "id") $$(G.litName "id")
0 (ColumnScalar PGUUID) False Nothing
actionIdColumnEq = BoolFld $ AVCol actionIdColumnInfo [AEQ True actionId]
sessionVarsColumnInfo = ColumnInfo (unsafePGCol "session_variables") $$(G.litName "session_variables")
0 (ColumnScalar PGJSONB) False Nothing
actionIdColumnEq = BoolFld $ AVCol actionIdColumnInfo [AEQ True $ UVLiteral $ S.SELit $ actionIdToText actionId]
sessionVarsColumnInfo = mkPGColumnInfo sessionVarsColumn
sessionVarValue = UVParameter Nothing $ ColumnValue (ColumnScalar PGJSONB) $
PGValJSONB $ Q.JSONB $ J.toJSON $ _uiSession userInfo
sessionVarsColumnEq = BoolFld $ AVCol sessionVarsColumnInfo [AEQ True sessionVarValue]
@ -287,14 +293,6 @@ resolveAsyncActionQuery userInfo annAction =
in if isAdmin (_uiRole userInfo) then actionIdColumnEq
else BoolAnd [actionIdColumnEq, sessionVarsColumnEq]
data ActionLogItem
= ActionLogItem
{ _aliId :: !UUID.UUID
, _aliActionName :: !ActionName
, _aliRequestHeaders :: ![HTTP.Header]
, _aliSessionVariables :: !SessionVariables
, _aliInputPayload :: !J.Value
} deriving (Show, Eq)
-- | Process async actions from hdb_catalog.hdb_action_log table. This functions is executed in a background thread.
-- See Note [Async action architecture] above
@ -305,24 +303,20 @@ asyncActionsProcessor
, MonadBaseControl IO m
, LA.Forall (LA.Pure m)
, Tracing.HasReporter m
, MonadMetadataStorage (MetadataStorageT m)
)
=> Env.Environment
-> L.Logger L.Hasura
-> IORef (RebuildableSchemaCache Run, SchemaCacheVer)
-> Q.PGPool
-> IORef (RebuildableSchemaCache, SchemaCacheVer)
-> HTTP.Manager
-> m void
asyncActionsProcessor env logger cacheRef pgPool httpManager = forever $ do
asyncInvocations <- liftIO getUndeliveredEvents
asyncActionsProcessor env logger cacheRef httpManager = forever $ do
asyncInvocationsE <- runMetadataStorageT fetchUndeliveredActionEvents
asyncInvocations <- liftIO $ onLeft asyncInvocationsE mempty
actionCache <- scActions . lastBuiltSchemaCache . fst <$> liftIO (readIORef cacheRef)
LA.mapConcurrently_ (callHandler actionCache) asyncInvocations
liftIO $ threadDelay (1 * 1000 * 1000)
where
runTx :: (Monoid a) => Q.TxE QErr a -> IO a
runTx q = do
res <- runExceptT $ Q.runTx' pgPool q
onLeft res mempty
callHandler :: ActionCache -> ActionLogItem -> m ()
callHandler actionCache actionLogItem = Tracing.runTraceT "async actions processor" do
let ActionLogItem actionId actionName reqHeaders
@ -340,61 +334,14 @@ asyncActionsProcessor env logger cacheRef pgPool httpManager = forever $ do
actionContext = ActionContext actionName
eitherRes <- runExceptT $ flip runReaderT logger $
callWebhook env httpManager outputType outputFields reqHeaders confHeaders
forwardClientHeaders webhookUrl (ActionWebhookPayload actionContext sessionVariables inputPayload)
forwardClientHeaders webhookUrl
(ActionWebhookPayload actionContext sessionVariables inputPayload)
timeout
resE <- runMetadataStorageT $ setActionStatus actionId $ case eitherRes of
Left e -> AASError e
Right (responsePayload, _) -> AASCompleted $ J.toJSON responsePayload
liftIO $ case eitherRes of
Left e -> setError actionId e
Right (responsePayload, _) -> setCompleted actionId $ J.toJSON responsePayload
setError :: UUID.UUID -> QErr -> IO ()
setError actionId e =
runTx $ setErrorQuery actionId e
setErrorQuery
:: UUID.UUID -> QErr -> Q.TxE QErr ()
setErrorQuery actionId e =
Q.unitQE defaultTxErrorHandler [Q.sql|
update hdb_catalog.hdb_action_log
set errors = $1, status = 'error'
where id = $2
|] (Q.AltJ e, actionId) False
setCompleted :: UUID.UUID -> J.Value -> IO ()
setCompleted actionId responsePayload =
runTx $ setCompletedQuery actionId responsePayload
setCompletedQuery
:: UUID.UUID -> J.Value -> Q.TxE QErr ()
setCompletedQuery actionId responsePayload =
Q.unitQE defaultTxErrorHandler [Q.sql|
update hdb_catalog.hdb_action_log
set response_payload = $1, status = 'completed'
where id = $2
|] (Q.AltJ responsePayload, actionId) False
undeliveredEventsQuery
:: Q.TxE QErr [ActionLogItem]
undeliveredEventsQuery =
map mapEvent <$> Q.listQE defaultTxErrorHandler [Q.sql|
update hdb_catalog.hdb_action_log set status = 'processing'
where
id in (
select id from hdb_catalog.hdb_action_log
where status = 'created'
for update skip locked limit 10
)
returning
id, action_name, request_headers::json, session_variables::json, input_payload::json
|] () False
where
mapEvent (actionId, actionName, Q.AltJ headersMap,
Q.AltJ sessionVariables, Q.AltJ inputPayload) =
ActionLogItem actionId actionName (fromHeadersMap headersMap) sessionVariables inputPayload
fromHeadersMap = map ((CI.mk . txtToBs) *** txtToBs) . Map.toList
getUndeliveredEvents = runTx undeliveredEventsQuery
liftIO $ onLeft resE mempty
callWebhook
:: forall m r.
@ -428,7 +375,7 @@ callWebhook env manager outputType outputFields reqHeaders confHeaders
requestBody = J.encode postPayload
requestBodySize = BL.length requestBody
url = unResolvedWebhook resolvedWebhook
responseTimeout = HTTP.responseTimeoutMicro $ unTimeout timeoutSeconds * 1000000
responseTimeout = HTTP.responseTimeoutMicro $ (unTimeout timeoutSeconds) * 1000000
httpResponse <- do
initReq <- liftIO $ HTTP.parseRequest (T.unpack url)
let req = initReq { HTTP.method = "POST"
@ -515,14 +462,10 @@ callWebhook env manager outputType outputFields reqHeaders confHeaders
Just v -> when (v == J.Null) $ throwUnexpected $
"expecting not null value for field " <>> fieldName
mkJsonAggSelect :: GraphQLType -> RS.JsonAggSelect
mkJsonAggSelect =
bool RS.JASSingleObject RS.JASMultipleRows . isListType
processOutputSelectionSet
:: RS.ArgumentExp 'Postgres v
-> GraphQLType
-> [(Column 'Postgres, ScalarType 'Postgres)]
-> [(PGCol, PGScalarType)]
-> RS.AnnFieldsG 'Postgres v
-> Bool
-> RS.AnnSimpleSelG 'Postgres v
@ -537,3 +480,74 @@ processOutputSelectionSet tableRowInput actionOutputType definitionList annotate
functionArgs = RS.FunctionArgsExp [tableRowInput] mempty
selectFrom = RS.FromFunction jsonbToPostgresRecordFunction functionArgs $ Just definitionList
mkJsonAggSelect :: GraphQLType -> RS.JsonAggSelect
mkJsonAggSelect =
bool RS.JASSingleObject RS.JASMultipleRows . isListType
insertActionTx
:: ActionName -> SessionVariables -> [HTTP.Header] -> J.Value
-> Q.TxE QErr ActionId
insertActionTx actionName sessionVariables httpHeaders inputArgsPayload =
runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler [Q.sql|
INSERT INTO
"hdb_catalog"."hdb_action_log"
("action_name", "session_variables", "request_headers", "input_payload", "status")
VALUES
($1, $2, $3, $4, $5)
RETURNING "id"
|]
( actionName
, Q.AltJ sessionVariables
, Q.AltJ $ toHeadersMap httpHeaders
, Q.AltJ inputArgsPayload
, "created"::Text
) False
where
toHeadersMap = Map.fromList . map ((bsToTxt . CI.original) *** bsToTxt)
fetchUndeliveredActionEventsTx :: Q.TxE QErr [ActionLogItem]
fetchUndeliveredActionEventsTx =
map mapEvent <$> Q.listQE defaultTxErrorHandler [Q.sql|
update hdb_catalog.hdb_action_log set status = 'processing'
where
id in (
select id from hdb_catalog.hdb_action_log
where status = 'created'
for update skip locked limit 10
)
returning
id, action_name, request_headers::json, session_variables::json, input_payload::json
|] () False
where
mapEvent (actionId, actionName, Q.AltJ headersMap,
Q.AltJ sessionVariables, Q.AltJ inputPayload) =
ActionLogItem actionId actionName (fromHeadersMap headersMap) sessionVariables inputPayload
fromHeadersMap = map ((CI.mk . txtToBs) *** txtToBs) . Map.toList
setActionStatusTx :: ActionId -> AsyncActionStatus -> Q.TxE QErr ()
setActionStatusTx actionId = \case
AASCompleted responsePayload ->
Q.unitQE defaultTxErrorHandler [Q.sql|
update hdb_catalog.hdb_action_log
set response_payload = $1, status = 'completed'
where id = $2
|] (Q.AltJ responsePayload, actionId) False
AASError qerr ->
Q.unitQE defaultTxErrorHandler [Q.sql|
update hdb_catalog.hdb_action_log
set errors = $1, status = 'error'
where id = $2
|] (Q.AltJ qerr, actionId) False
fetchActionResponseTx :: ActionId -> Q.TxE QErr ActionLogResponse
fetchActionResponseTx actionId = do
(ca, rp, errs, Q.AltJ sessVars) <-
Q.getRow <$> Q.withQE defaultTxErrorHandler [Q.sql|
SELECT created_at, response_payload::json, errors::json, session_variables::json
FROM hdb_catalog.hdb_action_log
WHERE id = $1
|] (Identity actionId) True
pure $ ActionLogResponse actionId ca (Q.getAltJ <$> rp) (Q.getAltJ <$> errs) sessVars

View File

@ -85,7 +85,7 @@ mkCurPlanTx env manager reqHdrs userInfo instrument ep = \case
asSingleRowJsonResp (instrument q) prepArgs
Just remoteJoins ->
executeQueryWithRemoteJoins env manager reqHdrs userInfo q prepArgs remoteJoins
RFPActionQuery atx -> (atx, Nothing)
RFPActionQuery atx -> (unActionExecuteTx atx, Nothing)
-- convert a query from an intermediate representation to... another
irToRootFieldPlan

View File

@ -33,7 +33,6 @@ import qualified Data.ByteString as B
import qualified Data.HashMap.Strict as Map
import qualified Data.HashMap.Strict.InsOrd as OMap
import qualified Data.HashSet as Set
import qualified Data.Text as T
import qualified Data.UUID.V4 as UUID
import qualified Database.PG.Query as Q
import qualified Database.PG.Query.PTI as PTI
@ -59,9 +58,10 @@ import Hasura.GraphQL.Context
import Hasura.GraphQL.Execute.Action
import Hasura.GraphQL.Execute.Query
import Hasura.GraphQL.Parser.Column
import Hasura.Metadata.Class
import Hasura.RQL.Types
import Hasura.SQL.Types
import Hasura.Session
import Hasura.SQL.Types
-- -------------------------------------------------------------------------------------------------
@ -298,7 +298,7 @@ resolveMultiplexedValue = \case
Nothing -> do
syntheticVarIndex <- use (qpiSyntheticVariableValues . to length)
modifying qpiSyntheticVariableValues (|> colVal)
pure ["synthetic", T.pack $ show syntheticVarIndex]
pure ["synthetic", tshow syntheticVarIndex]
pure $ fromResVars (CollectableTypeScalar $ unsafePGColumnToBackend $ cvType colVal) varJsonPath
UVSessionVar ty sessVar -> do
modifying qpiReferencedSessionVariables (Set.insert sessVar)
@ -346,6 +346,7 @@ $(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) ''ReusableLiveQueryPlan)
-- of the plan if possible.
buildLiveQueryPlan
:: ( MonadError QErr m
, MonadMetadataStorage (MetadataStorageT m)
, MonadIO m
)
=> PGExecCtx
@ -366,7 +367,9 @@ buildLiveQueryPlan pgExecCtx userInfo unpreparedAST = do
when (remoteJoins /= mempty)
$ throw400 NotSupported "Remote relationships are not allowed in subscriptions"
_ -> pure ()
traverseAction (DS.traverseAnnSimpleSelect resolveMultiplexedValue . resolveAsyncActionQuery userInfo) resolvedRootField
flip traverseAction resolvedRootField $
(lift . liftEitherM . runMetadataStorageT . resolveAsyncActionQuery userInfo)
>=> DS.traverseAnnSimpleSelect resolveMultiplexedValue
let multiplexedQuery = mkMultiplexedQuery preparedAST
roleName = _uiRole userInfo

View File

@ -34,6 +34,7 @@ import Hasura.GraphQL.Execute.Prepare
import Hasura.GraphQL.Execute.Remote
import Hasura.GraphQL.Execute.Resolve
import Hasura.GraphQL.Parser
import Hasura.Metadata.Class
import Hasura.RQL.Types
import Hasura.Server.Version (HasVersion)
import Hasura.Session
@ -102,6 +103,7 @@ convertMutationAction
::( HasVersion
, MonadIO m
, MonadError QErr m
, MonadMetadataStorage (MetadataStorageT m)
, Tracing.MonadTrace m
, Tracing.MonadTrace tx
, MonadIO tx
@ -115,9 +117,10 @@ convertMutationAction
-> ActionMutation 'Postgres (UnpreparedValue 'Postgres)
-> m (tx EncJSON, HTTP.ResponseHeaders)
convertMutationAction env logger userInfo manager reqHeaders = \case
AMSync s -> (_aerTransaction &&& _aerHeaders) <$>
AMSync s -> ((unActionExecuteTx . _aerExecution) &&& _aerHeaders) <$>
resolveActionExecution env logger userInfo s actionExecContext
AMAsync s -> noResponseHeaders <$> resolveActionMutationAsync s reqHeaders userSession
AMAsync s -> (noResponseHeaders . unActionExecuteTx) <$>
liftEitherM (runMetadataStorageT $ resolveActionMutationAsync s reqHeaders userSession)
where
userSession = _uiSession userInfo
actionExecContext = ActionExecContext manager reqHeaders $ _uiSession userInfo
@ -128,6 +131,7 @@ convertMutationSelectionSet
, Tracing.MonadTrace m
, MonadIO m
, MonadError QErr m
, MonadMetadataStorage (MetadataStorageT m)
, MonadTx tx
, Tracing.MonadTrace tx
, MonadIO tx

View File

@ -38,6 +38,7 @@ import Hasura.GraphQL.Execute.Prepare
import Hasura.GraphQL.Execute.Remote
import Hasura.GraphQL.Execute.Resolve
import Hasura.GraphQL.Parser
import Hasura.Metadata.Class
import Hasura.RQL.Types
import Hasura.Server.Version (HasVersion)
import Hasura.Session
@ -146,10 +147,12 @@ class Monad m => MonadQueryInstrumentation m where
instance MonadQueryInstrumentation m => MonadQueryInstrumentation (ReaderT r m)
instance MonadQueryInstrumentation m => MonadQueryInstrumentation (ExceptT e m)
instance MonadQueryInstrumentation m => MonadQueryInstrumentation (Tracing.TraceT m)
instance MonadQueryInstrumentation m => MonadQueryInstrumentation (MetadataStorageT m)
convertQuerySelSet
:: forall m tx .
( MonadError QErr m
, MonadMetadataStorage (MetadataStorageT m)
, HasVersion
, MonadIO m
, Tracing.MonadTrace m
@ -208,13 +211,16 @@ convertQuerySelSet env logger gqlContext userInfo manager reqHeaders directives
usrVars = _uiSession userInfo
convertActionQuery
:: ActionQuery 'Postgres (UnpreparedValue 'Postgres) -> StateT PlanningSt m (ActionQueryPlan 'Postgres)
:: ActionQuery 'Postgres (UnpreparedValue 'Postgres)
-> StateT PlanningSt m (ActionQueryPlan 'Postgres)
convertActionQuery = \case
AQQuery s -> lift $ do
result <- resolveActionExecution env logger userInfo s $ ActionExecContext manager reqHeaders usrVars
pure $ AQPQuery $ _aerTransaction result
AQAsync s -> AQPAsyncQuery <$>
DS.traverseAnnSimpleSelect prepareWithPlan (resolveAsyncActionQuery userInfo s)
pure $ AQPQuery $ _aerExecution result
AQAsync s -> do
unpreparedAst <- lift $ liftEitherM $ runMetadataStorageT $
resolveAsyncActionQuery userInfo s
AQPAsyncQuery <$> DS.traverseAnnSimpleSelect prepareWithPlan unpreparedAst
-- See Note [Temporarily disabling query plan caching]
-- use the existing plan and new variables to create a pg query

View File

@ -30,6 +30,7 @@ import Hasura.Backends.Postgres.Translate.Column (toTxtValue)
import Hasura.EncJSON
import Hasura.GraphQL.Context
import Hasura.GraphQL.Parser
import Hasura.Metadata.Class
import Hasura.RQL.DML.Internal
import Hasura.RQL.Types
import Hasura.SQL.Types
@ -111,6 +112,7 @@ explainGQLQuery
:: forall m
. ( MonadError QErr m
, MonadIO m
, MonadMetadataStorage (MetadataStorageT m)
)
=> PGExecCtx
-> SchemaCache

View File

@ -12,6 +12,7 @@ import qualified Data.Aeson as J
import qualified Language.GraphQL.Draft.Syntax as G
import Hasura.GraphQL.Transport.HTTP.Protocol (GQLReqUnparsed)
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.Server.Types (RequestId)
import Hasura.Tracing (TraceT)
@ -59,3 +60,6 @@ instance MonadQueryLog m => MonadQueryLog (ReaderT r m) where
instance MonadQueryLog m => MonadQueryLog (TraceT m) where
logQueryLog l req sqlMap reqId = lift $ logQueryLog l req sqlMap reqId
instance MonadQueryLog m => MonadQueryLog (MetadataStorageT m) where
logQueryLog l req sqlMap reqId = lift $ logQueryLog l req sqlMap reqId

View File

@ -18,7 +18,7 @@ import qualified Data.HashMap.Strict.Extended as M
import qualified Data.HashMap.Strict.InsOrd as OMap
import qualified Data.HashSet as S
import qualified Data.List.Extended as LE
import qualified Data.Text as T
import qualified Data.UUID as UUID
import Control.Lens.Extended hiding (enum, index)
import Data.Int (Int32, Int64)
@ -162,6 +162,19 @@ float = scalar floatScalar Nothing SRFloat
string :: MonadParse m => Parser 'Both m Text
string = scalar stringScalar Nothing SRString
uuid :: MonadParse m => Parser 'Both m UUID.UUID
uuid = Parser
{ pType = schemaType
, pParser = peelVariable (Just $ toGraphQLType schemaType) >=> \case
GraphQLValue (VString s) -> parseUUID $ A.String s
JSONValue v -> parseUUID v
v -> typeMismatch name "a UUID" v
}
where
name = $$(litName "uuid")
schemaType = NonNullable $ TNamed $ mkDefinition name Nothing TIScalar
parseUUID = either (parseErrorWith ParseFailed . qeError) pure . runAesonParser A.parseJSON
-- | As an input type, any string or integer input value should be coerced to ID as Text
-- https://spec.graphql.org/June2018/#sec-ID
identifier :: MonadParse m => Parser 'Both m Text
@ -169,7 +182,7 @@ identifier = Parser
{ pType = schemaType
, pParser = peelVariable (Just $ toGraphQLType schemaType) >=> \case
GraphQLValue (VString s) -> pure s
GraphQLValue (VInt i) -> pure $ T.pack $ show i
GraphQLValue (VInt i) -> pure $ tshow i
JSONValue (A.String s) -> pure s
JSONValue (A.Number n) -> parseScientific n
v -> typeMismatch idName "a String or a 32-bit integer" v
@ -178,7 +191,7 @@ identifier = Parser
idName = idScalar
schemaType = NonNullable $ TNamed $ mkDefinition idName Nothing TIScalar
parseScientific = either (parseErrorWith ParseFailed . qeError)
(pure . T.pack . show @Int) . runAesonParser scientificToInteger
(pure . tshow @Int) . runAesonParser scientificToInteger
namedJSON :: MonadParse m => Name -> Maybe Description -> Parser 'Both m A.Value
namedJSON name description = Parser
@ -221,9 +234,8 @@ enum name description values = Parser
where
schemaType = NonNullable $ TNamed $ mkDefinition name description $ TIEnum (fst <$> values)
valuesMap = M.fromList $ over (traverse._1) dName $ toList values
validate value = case M.lookup value valuesMap of
Just result -> pure result
Nothing -> parseError $ "expected one of the values "
validate value = onNothing (M.lookup value valuesMap) $
parseError $ "expected one of the values "
<> englishList "or" (toTxt . dName . fst <$> values) <> " for type "
<> name <<> ", but found " <>> value

View File

@ -77,7 +77,7 @@ actionExecute nonObjectTypeMap actionInfo = runMaybeT do
--
-- > action_name(action_input_arguments)
actionAsyncMutation
:: forall m n r. (BackendSchema 'Postgres, MonadSchema n m, MonadTableInfo 'Postgres r m, MonadRole r m)
:: forall m n r. (MonadSchema n m, MonadTableInfo 'Postgres r m, MonadRole r m)
=> NonObjectTypeMap
-> ActionInfo 'Postgres
-> m (Maybe (FieldParser n AnnActionMutationAsync))
@ -85,10 +85,9 @@ actionAsyncMutation nonObjectTypeMap actionInfo = runMaybeT do
roleName <- lift askRoleName
guard $ roleName == adminRoleName || roleName `Map.member` permissions
inputArguments <- lift $ actionInputArguments nonObjectTypeMap $ _adArguments definition
actionId <- lift actionIdParser
let fieldName = unActionName actionName
description = G.Description <$> comment
pure $ P.selection fieldName description inputArguments actionId
pure $ P.selection fieldName description inputArguments actionIdParser
<&> AnnActionMutationAsync actionName
where
ActionInfo actionName _ definition permissions comment = actionInfo
@ -113,7 +112,6 @@ actionAsyncQuery
actionAsyncQuery actionInfo = runMaybeT do
roleName <- lift askRoleName
guard $ roleName == adminRoleName || roleName `Map.member` permissions
actionId <- lift actionIdParser
actionOutputParser <- lift $ actionOutputFields outputObject
createdAtFieldParser <-
lift $ columnParser @'Postgres (ColumnScalar PGTimeStampTZ) (G.Nullability False)
@ -123,9 +121,9 @@ actionAsyncQuery actionInfo = runMaybeT do
let fieldName = unActionName actionName
description = G.Description <$> comment
actionIdInputField =
P.field idFieldName (Just idFieldDescription) actionId
P.field idFieldName (Just idFieldDescription) actionIdParser
allFieldParsers =
let idField = P.selection_ idFieldName (Just idFieldDescription) actionId $> AsyncId
let idField = P.selection_ idFieldName (Just idFieldDescription) actionIdParser $> AsyncId
createdAtField = P.selection_ $$(G.litName "created_at")
(Just "the time at which this action was created")
createdAtFieldParser $> AsyncCreatedAt
@ -159,10 +157,8 @@ actionAsyncQuery actionInfo = runMaybeT do
-- | Async action's unique id
actionIdParser
:: (BackendSchema 'Postgres, MonadSchema n m, MonadError QErr m)
=> m (Parser 'Both n (UnpreparedValue 'Postgres))
actionIdParser =
fmap P.mkParameter <$> columnParser (ColumnScalar PGUUID) (G.Nullability False)
:: MonadParse n => Parser 'Both n ActionId
actionIdParser = ActionId <$> P.uuid
actionOutputFields
:: forall m n r. (BackendSchema 'Postgres, MonadSchema n m, MonadTableInfo 'Postgres r m, MonadRole r m, Has QueryContext r)

View File

@ -17,39 +17,40 @@ module Hasura.GraphQL.Transport.HTTP
, ResultsFragment(..)
) where
import Control.Monad.Morph (hoist)
import Control.Monad.Morph (hoist)
import Hasura.EncJSON
import Hasura.GraphQL.Context
import Hasura.GraphQL.Execute.Prepare (ExecutionPlan)
import Hasura.GraphQL.Logging (MonadQueryLog (..))
import Hasura.GraphQL.Parser.Column (UnpreparedValue)
import Hasura.GraphQL.Execute.Prepare (ExecutionPlan)
import Hasura.GraphQL.Logging (MonadQueryLog (..))
import Hasura.GraphQL.Parser.Column (UnpreparedValue)
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.HTTP
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.Types
import Hasura.Server.Init.Config
import Hasura.Server.Types (RequestId)
import Hasura.Server.Version (HasVersion)
import Hasura.Server.Types (RequestId)
import Hasura.Server.Version (HasVersion)
import Hasura.Session
import Hasura.Tracing (MonadTrace, TraceT, trace)
import Hasura.Tracing (MonadTrace, TraceT, trace)
import qualified Data.Aeson as J
import qualified Data.Aeson.Ordered as JO
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Environment as Env
import qualified Data.HashMap.Strict.InsOrd as OMap
import qualified Data.Text as T
import qualified Database.PG.Query as Q
import qualified Data.Aeson as J
import qualified Data.Aeson.Ordered as JO
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Environment as Env
import qualified Data.HashMap.Strict.InsOrd as OMap
import qualified Data.Text as T
import qualified Database.PG.Query as Q
import qualified Hasura.Backends.Postgres.Execute.RemoteJoin as RJ
import qualified Hasura.GraphQL.Execute as E
import qualified Hasura.GraphQL.Execute.Query as EQ
import qualified Hasura.Logging as L
import qualified Hasura.Server.Telemetry.Counters as Telem
import qualified Hasura.Tracing as Tracing
import qualified Language.GraphQL.Draft.Syntax as G
import qualified Network.HTTP.Types as HTTP
import qualified Network.Wai.Extended as Wai
import qualified Hasura.GraphQL.Execute as E
import qualified Hasura.GraphQL.Execute.Query as EQ
import qualified Hasura.Logging as L
import qualified Hasura.Server.Telemetry.Counters as Telem
import qualified Hasura.Tracing as Tracing
import qualified Language.GraphQL.Draft.Syntax as G
import qualified Network.HTTP.Types as HTTP
import qualified Network.Wai.Extended as Wai
data QueryCacheKey = QueryCacheKey
{ qckQueryString :: !GQLReqParsed
@ -108,6 +109,10 @@ instance MonadExecuteQuery m => MonadExecuteQuery (TraceT m) where
cacheLookup a b c = hoist (hoist lift) $ cacheLookup a b c
cacheStore a b = hoist (hoist lift) $ cacheStore a b
instance MonadExecuteQuery m => MonadExecuteQuery (MetadataStorageT m) where
cacheLookup a b c = hoist (hoist lift) $ cacheLookup a b c
cacheStore a b = hoist (hoist lift) $ cacheStore a b
data ResultsFragment = ResultsFragment
{ rfTimeIO :: DiffTime
, rfLocality :: Telem.Locality
@ -127,6 +132,7 @@ runGQ
, MonadTrace m
, MonadExecuteQuery m
, EQ.MonadQueryInstrumentation m
, MonadMetadataStorage (MetadataStorageT m)
)
=> Env.Environment
-> L.Logger L.Hasura
@ -254,6 +260,7 @@ runGQBatched
, MonadTrace m
, MonadExecuteQuery m
, EQ.MonadQueryInstrumentation m
, MonadMetadataStorage (MetadataStorageT m)
)
=> Env.Environment
-> L.Logger L.Hasura

View File

@ -56,6 +56,7 @@ import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery
import Hasura.GraphQL.Transport.HTTP.Protocol
import Hasura.GraphQL.Transport.WebSocket.Protocol
import Hasura.HTTP
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.Types
import Hasura.Server.Auth (AuthMode, UserAuthentication,
@ -72,9 +73,9 @@ import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as LQ
import qualified Hasura.GraphQL.Execute.Query as EQ
import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS
import qualified Hasura.Logging as L
import Hasura.Server.Init.Config (KeepAliveDelay (..))
import qualified Hasura.Server.Telemetry.Counters as Telem
import qualified Hasura.Tracing as Tracing
import Hasura.Server.Init.Config (KeepAliveDelay (..))
-- | 'LQ.LiveQueryId' comes from 'Hasura.GraphQL.Execute.LiveQuery.State.addLiveQuery'. We use
-- this to track a connection's operations so we can remove them from 'LiveQueryState', and
@ -332,6 +333,7 @@ onStart
, Tracing.MonadTrace m
, MonadExecuteQuery m
, EQ.MonadQueryInstrumentation m
, MonadMetadataStorage (MetadataStorageT m)
)
=> Env.Environment -> WSServerEnv -> WSConn -> StartMsg -> m ()
onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do
@ -531,6 +533,7 @@ onMessage
, Tracing.HasReporter m
, MonadExecuteQuery m
, EQ.MonadQueryInstrumentation m
, MonadMetadataStorage (MetadataStorageT m)
)
=> Env.Environment
-> AuthMode
@ -722,6 +725,7 @@ createWSServerApp
, Tracing.HasReporter m
, MonadExecuteQuery m
, EQ.MonadQueryInstrumentation m
, MonadMetadataStorage (MetadataStorageT m)
)
=> Env.Environment
-> AuthMode

View File

@ -1,21 +1,35 @@
-- | This module has type class and types which implements the Metadata Storage Abstraction
{-# LANGUAGE UndecidableInstances #-}
module Hasura.Metadata.Class
( MetadataStorageT(..)
( SchemaSyncEventProcessResult(..)
, MetadataStorageT(..)
, runMetadataStorageT
, MonadMetadataStorage(..)
, MonadScheduledEvents(..)
)
where
import Control.Monad.Morph (MFunctor)
import Control.Monad.Morph (MFunctor, hoist)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson
import qualified Network.HTTP.Types as HTTP
import Hasura.Eventing.HTTP
import Hasura.Eventing.ScheduledTrigger.Types
import Hasura.Prelude
import Hasura.RQL.Types
import Hasura.Server.Types
import Hasura.Session
import qualified Hasura.Tracing as Tracing
data SchemaSyncEventProcessResult
= SchemaSyncEventProcessResult
{ _sseprShouldReload :: !Bool
, _sseprCacheInvalidations :: !CacheInvalidations
}
{- Note [Todo: Common interface for eventing sub-system]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Postgres tables' event triggers and scheduled event triggers are similar in the
@ -36,10 +50,11 @@ TODO: Reference to open issue or rfc?
--
-- This class has functions broadly related to:
--
-- 1. Metadata Management (TODO: Need to be added to the type class)
-- 1. Metadata Management
-- ----------------------
-- Basic metadata management functions such as retrieving metadata from storage
-- database and replacing the given metadata.
-- TODO: Console specific operations
--
-- 2. Scheduled Triggers
-- ---------------------
@ -51,7 +66,7 @@ TODO: Reference to open issue or rfc?
-- - Deleting an scheduled event
-- - Creating an one-off scheduled event
--
-- 3. Async Actions (TODO: Need to be added to the type class)
-- 3. Async Actions
-- ----------------
-- Operations to implement async actions sub-system. This includes recording an
-- async action event and retreiving the details of action delivery to the webhook.
@ -63,6 +78,12 @@ TODO: Reference to open issue or rfc?
class (MonadError QErr m) => MonadMetadataStorage m where
-- Metadata
fetchMetadata :: m Metadata
setMetadata :: Metadata -> m ()
notifySchemaCacheSync :: InstanceId -> CacheInvalidations -> m ()
processSchemaSyncEventPayload :: InstanceId -> Value -> m SchemaSyncEventProcessResult
-- Scheduled triggers
-- TODO:-
-- Ideally we would've liked to avoid having functions that are specific to
@ -77,8 +98,22 @@ class (MonadError QErr m) => MonadMetadataStorage m where
setScheduledEventOp :: ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> m ()
unlockScheduledEvents :: ScheduledEventType -> [ScheduledEventId] -> m Int
unlockAllLockedScheduledEvents :: m ()
clearFutureCronEvents :: TriggerName -> m ()
-- Async actions
insertAction
:: ActionName -> SessionVariables -> [HTTP.Header] -> Value
-> m ActionId
fetchUndeliveredActionEvents :: m [ActionLogItem]
setActionStatus :: ActionId -> AsyncActionStatus -> m ()
fetchActionResponse :: ActionId -> m ActionLogResponse
instance (MonadMetadataStorage m) => MonadMetadataStorage (ReaderT r m) where
fetchMetadata = lift fetchMetadata
setMetadata = lift . setMetadata
notifySchemaCacheSync a b = lift $ notifySchemaCacheSync a b
processSchemaSyncEventPayload a b = lift $ processSchemaSyncEventPayload a b
getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats
getScheduledEventsForDelivery = lift getScheduledEventsForDelivery
insertScheduledEvent = lift . insertScheduledEvent
@ -86,8 +121,39 @@ instance (MonadMetadataStorage m) => MonadMetadataStorage (ReaderT r m) where
setScheduledEventOp a b c = lift $ setScheduledEventOp a b c
unlockScheduledEvents a b = lift $ unlockScheduledEvents a b
unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents
clearFutureCronEvents = lift . clearFutureCronEvents
insertAction a b c d = lift $ insertAction a b c d
fetchUndeliveredActionEvents = lift fetchUndeliveredActionEvents
setActionStatus a b = lift $ setActionStatus a b
fetchActionResponse = lift . fetchActionResponse
instance (MonadMetadataStorage m) => MonadMetadataStorage (StateT s m) where
fetchMetadata = lift fetchMetadata
setMetadata = lift . setMetadata
notifySchemaCacheSync a b = lift $ notifySchemaCacheSync a b
processSchemaSyncEventPayload a b = lift $ processSchemaSyncEventPayload a b
getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats
getScheduledEventsForDelivery = lift getScheduledEventsForDelivery
insertScheduledEvent = lift . insertScheduledEvent
insertScheduledEventInvocation a b = lift $ insertScheduledEventInvocation a b
setScheduledEventOp a b c = lift $ setScheduledEventOp a b c
unlockScheduledEvents a b = lift $ unlockScheduledEvents a b
unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents
clearFutureCronEvents = lift . clearFutureCronEvents
insertAction a b c d = lift $ insertAction a b c d
fetchUndeliveredActionEvents = lift fetchUndeliveredActionEvents
setActionStatus a b = lift $ setActionStatus a b
fetchActionResponse = lift . fetchActionResponse
instance (MonadMetadataStorage m) => MonadMetadataStorage (Tracing.TraceT m) where
fetchMetadata = lift fetchMetadata
setMetadata = lift . setMetadata
notifySchemaCacheSync a b = lift $ notifySchemaCacheSync a b
processSchemaSyncEventPayload a b = lift $ processSchemaSyncEventPayload a b
getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats
getScheduledEventsForDelivery = lift getScheduledEventsForDelivery
insertScheduledEvent = lift . insertScheduledEvent
@ -95,6 +161,52 @@ instance (MonadMetadataStorage m) => MonadMetadataStorage (Tracing.TraceT m) whe
setScheduledEventOp a b c = lift $ setScheduledEventOp a b c
unlockScheduledEvents a b = lift $ unlockScheduledEvents a b
unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents
clearFutureCronEvents = lift . clearFutureCronEvents
insertAction a b c d = lift $ insertAction a b c d
fetchUndeliveredActionEvents = lift fetchUndeliveredActionEvents
setActionStatus a b = lift $ setActionStatus a b
fetchActionResponse = lift . fetchActionResponse
instance (MonadMetadataStorage m) => MonadMetadataStorage (LazyTxT QErr m) where
fetchMetadata = lift fetchMetadata
setMetadata = lift . setMetadata
notifySchemaCacheSync a b = lift $ notifySchemaCacheSync a b
processSchemaSyncEventPayload a b = lift $ processSchemaSyncEventPayload a b
getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats
getScheduledEventsForDelivery = lift getScheduledEventsForDelivery
insertScheduledEvent = lift . insertScheduledEvent
insertScheduledEventInvocation a b = lift $ insertScheduledEventInvocation a b
setScheduledEventOp a b c = lift $ setScheduledEventOp a b c
unlockScheduledEvents a b = lift $ unlockScheduledEvents a b
unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents
clearFutureCronEvents = lift . clearFutureCronEvents
insertAction a b c d = lift $ insertAction a b c d
fetchUndeliveredActionEvents = lift fetchUndeliveredActionEvents
setActionStatus a b = lift $ setActionStatus a b
fetchActionResponse = lift . fetchActionResponse
instance (MonadMetadataStorage m) => MonadMetadataStorage (MetadataT m) where
fetchMetadata = lift fetchMetadata
setMetadata = lift . setMetadata
notifySchemaCacheSync a b = lift $ notifySchemaCacheSync a b
processSchemaSyncEventPayload a b = lift $ processSchemaSyncEventPayload a b
getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats
getScheduledEventsForDelivery = lift getScheduledEventsForDelivery
insertScheduledEvent = lift . insertScheduledEvent
insertScheduledEventInvocation a b = lift $ insertScheduledEventInvocation a b
setScheduledEventOp a b c = lift $ setScheduledEventOp a b c
unlockScheduledEvents a b = lift $ unlockScheduledEvents a b
unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents
clearFutureCronEvents = lift . clearFutureCronEvents
insertAction a b c d = lift $ insertAction a b c d
fetchUndeliveredActionEvents = lift fetchUndeliveredActionEvents
setActionStatus a b = lift $ setActionStatus a b
fetchActionResponse = lift . fetchActionResponse
{- Note [Generic MetadataStorageT transformer]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -147,13 +259,55 @@ newtype MetadataStorageT m a
= MetadataStorageT {unMetadataStorageT :: ExceptT QErr m a}
deriving ( Functor, Applicative, Monad
, MonadError QErr
, MonadReader r
, MonadState s
, MonadTrans
, MonadIO
, MFunctor
, Tracing.HasReporter
, Tracing.MonadTrace
)
deriving instance (MonadBase IO m) => MonadBase IO (MetadataStorageT m)
deriving instance (MonadBaseControl IO m) => MonadBaseControl IO (MetadataStorageT m)
runMetadataStorageT
:: MetadataStorageT m a -> m (Either QErr a)
runMetadataStorageT =
runExceptT . unMetadataStorageT
instance (Monad m, Monad (t m), MonadTrans t, MonadMetadataStorage (MetadataStorageT m))
=> MonadMetadataStorage (MetadataStorageT (t m)) where
fetchMetadata = hoist lift fetchMetadata
setMetadata = hoist lift . setMetadata
notifySchemaCacheSync a b = hoist lift $ notifySchemaCacheSync a b
processSchemaSyncEventPayload a b = hoist lift $ processSchemaSyncEventPayload a b
getDeprivedCronTriggerStats = hoist lift getDeprivedCronTriggerStats
getScheduledEventsForDelivery = hoist lift getScheduledEventsForDelivery
insertScheduledEvent = hoist lift . insertScheduledEvent
insertScheduledEventInvocation a b = hoist lift $ insertScheduledEventInvocation a b
setScheduledEventOp a b c = hoist lift $ setScheduledEventOp a b c
unlockScheduledEvents a b = hoist lift $ unlockScheduledEvents a b
unlockAllLockedScheduledEvents = hoist lift unlockAllLockedScheduledEvents
clearFutureCronEvents = hoist lift . clearFutureCronEvents
insertAction a b c d = hoist lift $ insertAction a b c d
fetchUndeliveredActionEvents = hoist lift fetchUndeliveredActionEvents
setActionStatus a b = hoist lift $ setActionStatus a b
fetchActionResponse = hoist lift . fetchActionResponse
class (MonadMetadataStorage m) => MonadScheduledEvents m where
-- | Record a cron/one-off event
createScheduledEvent :: ScheduledEventSeed -> m ()
createScheduledEvent = insertScheduledEvent
-- | Clear cron events
dropFutureCronEvents :: TriggerName -> m ()
dropFutureCronEvents = clearFutureCronEvents
instance (MonadScheduledEvents m) => MonadScheduledEvents (ReaderT r m)
instance (MonadScheduledEvents m) => MonadScheduledEvents (StateT s m)
instance (MonadScheduledEvents m) => MonadScheduledEvents (Tracing.TraceT m)
instance (MonadScheduledEvents m) => MonadScheduledEvents (MetadataT m)

View File

@ -11,6 +11,7 @@ module Hasura.Prelude
, choice
, afold
, bsToTxt
, lbsToTxt
, txtToBs
, base64Decode
, spanMaybeM
@ -119,6 +120,9 @@ afold = getAlt . foldMap pure
bsToTxt :: B.ByteString -> Text
bsToTxt = TE.decodeUtf8With TE.lenientDecode
lbsToTxt :: BL.ByteString -> Text
lbsToTxt = bsToTxt . BL.toStrict
txtToBs :: Text -> B.ByteString
txtToBs = TE.encodeUtf8

View File

@ -6,24 +6,26 @@ module Hasura.RQL.DDL.ScheduledTrigger
, runCreateScheduledEvent
) where
import Hasura.Backends.Postgres.Connection
import Hasura.EncJSON
import Hasura.Eventing.ScheduledTrigger
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf)
import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf)
import Hasura.RQL.Types
import qualified Data.Environment as Env
import qualified Data.HashMap.Strict as Map
import qualified Data.HashMap.Strict.InsOrd as OMap
import qualified Data.Time.Clock as C
import qualified Database.PG.Query as Q
import qualified Data.Environment as Env
import qualified Data.HashMap.Strict as Map
import qualified Data.HashMap.Strict.InsOrd as OMap
import qualified Data.Time.Clock as C
-- | runCreateCronTrigger will update a existing cron trigger when the 'replace'
-- value is set to @true@ and when replace is @false@ a new cron trigger will
-- be created
runCreateCronTrigger
:: (CacheRWM m, MonadTx m, MonadIO m, MetadataM m) => CreateCronTrigger -> m EncJSON
:: ( CacheRWM m, MonadIO m
, MetadataM m, MonadScheduledEvents m
)
=> CreateCronTrigger -> m EncJSON
runCreateCronTrigger CreateCronTrigger {..} = do
let q = CronTriggerMetadata cctName
cctWebhook
@ -53,7 +55,7 @@ runCreateCronTrigger CreateCronTrigger {..} = do
$ metaCronTriggers %~ OMap.insert cctName metadata
currentTime <- liftIO C.getCurrentTime
let scheduleTimes = generateScheduleTimes currentTime 100 cctCronSchedule -- generate next 100 events
liftTx $ insertScheduledEventTx $ SESCron $ map (CronEventSeed cctName) scheduleTimes
createScheduledEvent $ SESCron $ map (CronEventSeed cctName) scheduleTimes
return successMsg
resolveCronTrigger
@ -75,9 +77,9 @@ resolveCronTrigger env CronTriggerMetadata{..} = do
updateCronTrigger
:: ( CacheRWM m
, MonadTx m
, MonadIO m
, MetadataM m
, MonadScheduledEvents m
)
=> CronTriggerMetadata -> m EncJSON
updateCronTrigger cronTriggerMetadata = do
@ -86,16 +88,16 @@ updateCronTrigger cronTriggerMetadata = do
buildSchemaCacheFor (MOCronTrigger triggerName)
$ MetadataModifier
$ metaCronTriggers %~ OMap.insert triggerName cronTriggerMetadata
liftTx $ dropFutureCronEvents triggerName
dropFutureCronEvents triggerName
currentTime <- liftIO C.getCurrentTime
let scheduleTimes = generateScheduleTimes currentTime 100 $ ctSchedule cronTriggerMetadata
liftTx $ insertScheduledEventTx $ SESCron $ map (CronEventSeed triggerName) scheduleTimes
createScheduledEvent $ SESCron $ map (CronEventSeed triggerName) scheduleTimes
pure successMsg
runDeleteCronTrigger
:: ( CacheRWM m
, MonadTx m
, MetadataM m
, MonadScheduledEvents m
)
=> ScheduledTriggerName -> m EncJSON
runDeleteCronTrigger (ScheduledTriggerName stName) = do
@ -103,37 +105,17 @@ runDeleteCronTrigger (ScheduledTriggerName stName) = do
withNewInconsistentObjsCheck
$ buildSchemaCache
$ dropCronTriggerInMetadata stName
liftTx $ dropFutureCronEvents stName
dropFutureCronEvents stName
return successMsg
dropFutureCronEvents :: TriggerName -> Q.TxE QErr ()
dropFutureCronEvents name =
Q.unitQE defaultTxErrorHandler
[Q.sql|
DELETE FROM hdb_catalog.hdb_cron_events
WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0
|] (Identity name) False
dropCronTriggerInMetadata :: TriggerName -> MetadataModifier
dropCronTriggerInMetadata name =
MetadataModifier $ metaCronTriggers %~ OMap.delete name
runCreateScheduledEvent :: (MonadTx m) => CreateScheduledEvent -> m EncJSON
runCreateScheduledEvent CreateScheduledEvent {..} = do
liftTx $ Q.unitQE defaultTxErrorHandler
[Q.sql|
INSERT INTO hdb_catalog.hdb_scheduled_events
(webhook_conf,scheduled_time,payload,retry_conf,header_conf,comment)
VALUES
($1, $2, $3, $4, $5, $6)
|] ( Q.AltJ cseWebhook
, cseScheduleAt
, Q.AltJ csePayload
, Q.AltJ cseRetryConf
, Q.AltJ cseHeaders
, cseComment)
False
pure successMsg
runCreateScheduledEvent
:: (MonadScheduledEvents m) => CreateScheduledEvent -> m EncJSON
runCreateScheduledEvent =
(createScheduledEvent . SESOneOff) >=> \() -> pure successMsg
checkExists :: (CacheRM m, MonadError QErr m) => TriggerName -> m ()
checkExists name = do

View File

@ -38,6 +38,7 @@ import Hasura.Backends.Postgres.Connection
import Hasura.Backends.Postgres.SQL.Types
import Hasura.GraphQL.Execute.Types
import Hasura.GraphQL.Schema (buildGQLContext)
import Hasura.Metadata.Class
import Hasura.RQL.DDL.Action
import Hasura.RQL.DDL.CustomTypes
import Hasura.RQL.DDL.Deps
@ -48,7 +49,6 @@ import Hasura.RQL.DDL.Schema.Cache.Common
import Hasura.RQL.DDL.Schema.Cache.Dependencies
import Hasura.RQL.DDL.Schema.Cache.Fields
import Hasura.RQL.DDL.Schema.Cache.Permission
import Hasura.RQL.DDL.Schema.Catalog
import Hasura.RQL.DDL.Schema.Common
import Hasura.RQL.DDL.Schema.Diff
import Hasura.RQL.DDL.Schema.Function
@ -57,12 +57,12 @@ import Hasura.RQL.Types hiding (fmFunction, tm
import Hasura.Server.Version (HasVersion)
buildRebuildableSchemaCache
:: (HasVersion, MonadIO m, MonadUnique m, MonadTx m, HasHttpManager m, HasSQLGenCtx m)
:: (HasVersion, MonadIO m, MonadTx m, HasHttpManager m, HasSQLGenCtx m)
=> Env.Environment
-> m (RebuildableSchemaCache m)
buildRebuildableSchemaCache env = do
metadata <- liftTx fetchMetadataFromCatalog
result <- flip runReaderT CatalogSync $
-> Metadata
-> m RebuildableSchemaCache
buildRebuildableSchemaCache env metadata = do
result <- runCacheBuild $ flip runReaderT CatalogSync $
Inc.build (buildSchemaCacheRule env) (metadata, initialInvalidationKeys)
pure $ RebuildableSchemaCache (Inc.result result) initialInvalidationKeys (Inc.rebuildRule result)
@ -70,14 +70,15 @@ newtype CacheRWT m a
-- The CacheInvalidations component of the state could actually be collected using WriterT, but
-- WriterT implementations prior to transformers-0.5.6.0 (which added
-- Control.Monad.Trans.Writer.CPS) are leaky, and we dont have that yet.
= CacheRWT (StateT (RebuildableSchemaCache m, CacheInvalidations) m a)
= CacheRWT (StateT (RebuildableSchemaCache, CacheInvalidations) m a)
deriving
( Functor, Applicative, Monad, MonadIO, MonadUnique, MonadReader r, MonadError e, MonadTx
, UserInfoM, HasHttpManager, HasSQLGenCtx, HasSystemDefined)
, UserInfoM, HasHttpManager, HasSQLGenCtx, HasSystemDefined, MonadMetadataStorage
, MonadScheduledEvents)
runCacheRWT
:: Functor m
=> RebuildableSchemaCache m -> CacheRWT m a -> m (a, RebuildableSchemaCache m, CacheInvalidations)
=> RebuildableSchemaCache -> CacheRWT m a -> m (a, RebuildableSchemaCache, CacheInvalidations)
runCacheRWT cache (CacheRWT m) =
runStateT m (cache, mempty) <&> \(v, (newCache, invalidations)) -> (v, newCache, invalidations)
@ -88,11 +89,12 @@ instance (Monad m) => TableCoreInfoRM (CacheRWT m)
instance (Monad m) => CacheRM (CacheRWT m) where
askSchemaCache = CacheRWT $ gets (lastBuiltSchemaCache . fst)
instance (MonadIO m, MonadTx m) => CacheRWM (CacheRWT m) where
instance (MonadIO m, MonadTx m, HasHttpManager m, HasSQLGenCtx m) => CacheRWM (CacheRWT m) where
buildSchemaCacheWithOptions buildReason invalidations metadata = CacheRWT do
(RebuildableSchemaCache _ invalidationKeys rule, oldInvalidations) <- get
let newInvalidationKeys = invalidateKeys invalidations invalidationKeys
result <- lift $ flip runReaderT buildReason $ Inc.build rule (metadata, newInvalidationKeys)
result <- lift $ runCacheBuild $ flip runReaderT buildReason $
Inc.build rule (metadata, newInvalidationKeys)
let schemaCache = Inc.result result
prunedInvalidationKeys = pruneInvalidationKeys schemaCache newInvalidationKeys
!newCache = RebuildableSchemaCache schemaCache prunedInvalidationKeys (Inc.rebuildRule result)

View File

@ -1,5 +1,6 @@
{-# LANGUAGE Arrows #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE Arrows #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE UndecidableInstances #-}
-- | Types/functions shared between modules that implement "Hasura.RQL.DDL.Schema.Cache". Other
-- modules should not import this module directly.
@ -11,16 +12,18 @@ import qualified Data.HashMap.Strict.Extended as M
import qualified Data.HashMap.Strict.InsOrd as OMap
import qualified Data.HashSet as HS
import qualified Data.Sequence as Seq
import qualified Network.HTTP.Client as HTTP
import Control.Arrow.Extended
import Control.Lens
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Unique
import Data.Text.Extended
import qualified Hasura.Incremental as Inc
import Hasura.Backends.Postgres.SQL.Types
import Hasura.RQL.Types
import Hasura.RQL.Types.Run
-- | 'InvalidationKeys' used to apply requested 'CacheInvalidations'.
data InvalidationKeys = InvalidationKeys
@ -105,17 +108,53 @@ data BuildOutputs
}
$(makeLenses ''BuildOutputs)
data RebuildableSchemaCache m
-- | Parameters required for schema cache build
data CacheBuildParams
= CacheBuildParams
{ _cbpManager :: !HTTP.Manager
, _cbpSqlGenCtx :: !SQLGenCtx
}
-- | The monad in which @'RebuildableSchemaCache' is being run
newtype CacheBuild a
= CacheBuild {unCacheBuild :: ReaderT CacheBuildParams (LazyTxT QErr IO) a}
deriving ( Functor, Applicative, Monad
, MonadError QErr
, MonadReader CacheBuildParams
, MonadIO
, MonadTx
, MonadBase IO
, MonadBaseControl IO
, MonadUnique
)
instance HasHttpManager CacheBuild where
askHttpManager = asks _cbpManager
instance HasSQLGenCtx CacheBuild where
askSQLGenCtx = asks _cbpSqlGenCtx
runCacheBuild
:: ( MonadIO m
, HasHttpManager m
, HasSQLGenCtx m
, MonadTx m
)
=> CacheBuild a -> m a
runCacheBuild (CacheBuild m) = do
httpManager <- askHttpManager
sqlGenCtx <- askSQLGenCtx
let params = CacheBuildParams httpManager sqlGenCtx
liftTx $ lazyTxToQTx (runReaderT m params)
data RebuildableSchemaCache
= RebuildableSchemaCache
{ lastBuiltSchemaCache :: !SchemaCache
, _rscInvalidationMap :: !InvalidationKeys
, _rscRebuild :: !(Inc.Rule (ReaderT BuildReason m) (Metadata, InvalidationKeys) SchemaCache)
, _rscRebuild :: !(Inc.Rule (ReaderT BuildReason CacheBuild) (Metadata, InvalidationKeys) SchemaCache)
}
$(makeLenses ''RebuildableSchemaCache)
type CacheBuildM = ReaderT BuildReason Run
type CacheBuildA = WriterA (Seq CollectedInfo) (Inc.Rule CacheBuildM)
bindErrorA
:: (ArrowChoice arr, ArrowKleisli m arr, ArrowError e arr, MonadError e m)
=> arr (m a) a

View File

@ -50,6 +50,12 @@ module Hasura.RQL.Types.Action
, ActionExecContext(..)
, AsyncActionQueryFieldG(..)
, AnnActionAsyncQuery(..)
, ActionId(..)
, actionIdToText
, ActionLogItem(..)
, ActionLogResponse(..)
, AsyncActionStatus(..)
) where
@ -59,6 +65,8 @@ import qualified Data.Aeson as J
import qualified Data.Aeson.Casing as J
import qualified Data.Aeson.TH as J
import qualified Data.HashMap.Strict as Map
import qualified Data.Time.Clock as UTC
import qualified Data.UUID as UUID
import qualified Database.PG.Query as Q
import qualified Language.GraphQL.Draft.Syntax as G
import qualified Network.HTTP.Client as HTTP
@ -72,6 +80,7 @@ import Hasura.RQL.DDL.Headers
import Hasura.RQL.IR.Select
import Hasura.RQL.Types.Common
import Hasura.RQL.Types.CustomTypes
import Hasura.RQL.Types.Error
import Hasura.Session
import Hasura.SQL.Backend
@ -300,7 +309,7 @@ type AsyncActionQueryFieldsG b v = Fields (AsyncActionQueryFieldG b v)
data AnnActionAsyncQuery (b :: BackendType) v
= AnnActionAsyncQuery
{ _aaaqName :: !ActionName
, _aaaqActionId :: !v
, _aaaqActionId :: !ActionId
, _aaaqOutputType :: !GraphQLType
, _aaaqFields :: !(AsyncActionQueryFieldsG b v)
, _aaaqDefinitionList :: ![(Column b, ScalarType b)]
@ -313,3 +322,32 @@ data ActionExecContext
, _aecHeaders :: !HTTP.RequestHeaders
, _aecSessionVariables :: !SessionVariables
}
newtype ActionId = ActionId {unActionId :: UUID.UUID}
deriving (Show, Eq, Q.ToPrepArg, Q.FromCol, J.ToJSON, J.FromJSON)
actionIdToText :: ActionId -> Text
actionIdToText = UUID.toText . unActionId
data ActionLogItem
= ActionLogItem
{ _aliId :: !ActionId
, _aliActionName :: !ActionName
, _aliRequestHeaders :: ![HTTP.Header]
, _aliSessionVariables :: !SessionVariables
, _aliInputPayload :: !J.Value
} deriving (Show, Eq)
data ActionLogResponse
= ActionLogResponse
{ _alrId :: !ActionId
, _alrCreatedAt :: !UTC.UTCTime
, _alrResponsePayload :: !(Maybe J.Value)
, _alrErrors :: !(Maybe J.Value)
, _alrSessionVariables :: !SessionVariables
} deriving (Show, Eq)
$(J.deriveJSON (J.aesonDrop 4 J.snakeCase) ''ActionLogResponse)
data AsyncActionStatus
= AASCompleted !J.Value
| AASError !QErr

View File

@ -1,7 +1,7 @@
{-# LANGUAGE UndecidableInstances #-}
module Hasura.RQL.Types.Run
( Run(..)
( RunT(..)
, RunCtx(..)
, peelRun
) where
@ -14,6 +14,7 @@ import qualified Network.HTTP.Client as HTTP
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Unique
import Hasura.Metadata.Class
import Hasura.RQL.Types
import qualified Hasura.Tracing as Tracing
@ -25,37 +26,43 @@ data RunCtx
, _rcSqlGenCtx :: !SQLGenCtx
}
newtype Run a
= Run { unRun :: ReaderT RunCtx (LazyTxT QErr IO) a }
newtype RunT m a
= RunT { unRunT :: ReaderT RunCtx (LazyTxT QErr m) a }
deriving ( Functor, Applicative, Monad
, MonadError QErr
, MonadReader RunCtx
, MonadTx
, MonadIO
, MonadBase IO
, MonadBaseControl IO
, MonadUnique
, MonadMetadataStorage
)
instance UserInfoM Run where
instance (MonadMetadataStorage m) => MonadScheduledEvents (RunT m)
deriving instance (MonadIO m, MonadBase IO m) => MonadBase IO (RunT m)
deriving instance (MonadIO m, MonadBaseControl IO m) => MonadBaseControl IO (RunT m)
instance (Monad m) => UserInfoM (RunT m) where
askUserInfo = asks _rcUserInfo
instance HasHttpManager Run where
instance (Monad m) => HasHttpManager (RunT m) where
askHttpManager = asks _rcHttpMgr
instance HasSQLGenCtx Run where
instance (Monad m) => HasSQLGenCtx (RunT m) where
askSQLGenCtx = asks _rcSqlGenCtx
peelRun
:: (MonadIO m)
:: ( MonadIO m
, MonadBaseControl IO m
)
=> RunCtx
-> PGExecCtx
-> Q.TxAccess
-> Maybe Tracing.TraceContext
-> Run a
-> RunT m a
-> ExceptT QErr m a
peelRun runCtx pgExecCtx txAccess ctx (Run m) =
mapExceptT liftIO $ runLazyTx pgExecCtx txAccess $
peelRun runCtx pgExecCtx txAccess ctx (RunT m) =
runLazyTx pgExecCtx txAccess $
maybe id withTraceContext ctx $ withUserInfo userInfo $ runReaderT m runCtx
where
userInfo = _rcUserInfo runCtx

View File

@ -3,6 +3,7 @@
module Hasura.Server.API.Query where
import Control.Lens
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Unique
import Data.Aeson
import Data.Aeson.Casing
@ -14,6 +15,7 @@ import qualified Database.PG.Query as Q
import qualified Network.HTTP.Client as HTTP
import Hasura.EncJSON
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.Action
import Hasura.RQL.DDL.ComputedField
@ -175,55 +177,36 @@ $(deriveJSON
''RQLQueryV2
)
-- | Using @pg_notify@ function to publish schema sync events to other server
-- instances via 'hasura_schema_update' channel.
-- See Note [Schema Cache Sync]
notifySchemaCacheSync :: InstanceId -> CacheInvalidations -> Q.TxE QErr ()
notifySchemaCacheSync instanceId invalidations = do
Q.Discard () <- Q.withQE defaultTxErrorHandler [Q.sql|
SELECT pg_notify('hasura_schema_update', json_build_object(
'instance_id', $1,
'occurred_at', NOW(),
'invalidations', $2
)::text
)
|] (instanceId, Q.AltJ invalidations) True
pure ()
runQuery
:: (HasVersion, MonadIO m, MonadError QErr m, Tracing.MonadTrace m)
:: ( HasVersion, MonadIO m, Tracing.MonadTrace m
, MonadBaseControl IO m, MonadMetadataStorage m
)
=> Env.Environment -> PGExecCtx -> InstanceId
-> UserInfo -> RebuildableSchemaCache Run -> HTTP.Manager
-> SQLGenCtx -> RQLQuery -> m (EncJSON, RebuildableSchemaCache Run)
-> UserInfo -> RebuildableSchemaCache -> HTTP.Manager
-> SQLGenCtx -> RQLQuery -> m (EncJSON, RebuildableSchemaCache)
runQuery env pgExecCtx instanceId userInfo sc hMgr sqlGenCtx query = do
accessMode <- getQueryAccessMode query
traceCtx <- Tracing.currentContext
metadata <- fetchMetadata
result <- runQueryM env query & Tracing.interpTraceT \x -> do
((js, meta), rsc, ci) <-
x & withMetadata
(((js, tracemeta), meta), rsc, ci) <-
x & runMetadataT metadata
& runCacheRWT sc
& peelRun runCtx pgExecCtx accessMode (Just traceCtx)
& runExceptT
& liftEitherM
pure ((js, rsc, ci), meta)
pure ((js, rsc, ci, meta), tracemeta)
withReload result
where
runCtx = RunCtx userInfo hMgr sqlGenCtx
withMetadata :: (MonadTx m) => MetadataT m a -> m a
withMetadata m = do
metadata <- liftTx fetchMetadataFromCatalog
(r, modifiedMetadata) <- runMetadataT metadata m
when (queryModifiesSchemaCache query) $
liftTx $ setMetadataInCatalog modifiedMetadata
pure r
withReload (result, updatedCache, invalidations) = do
withReload (result, updatedCache, invalidations, updatedMetadata) = do
when (queryModifiesSchemaCache query) $ do
e <- liftIO $ runExceptT $ runLazyTx pgExecCtx Q.ReadWrite $ liftTx $
notifySchemaCacheSync instanceId invalidations
liftEither e
return (result, updatedCache)
-- set modified metadata in storage
setMetadata updatedMetadata
-- notify schema cache sync
notifySchemaCacheSync instanceId invalidations
pure (result, updatedCache)
-- | A predicate that determines whether the given query might modify/rebuild the schema cache. If
-- so, it needs to acquire the global lock on the schema cache so that other queries do not modify
@ -360,6 +343,7 @@ runQueryM
, MonadIO m, MonadUnique m, HasHttpManager m, HasSQLGenCtx m
, Tracing.MonadTrace m
, MetadataM m
, MonadScheduledEvents m
)
=> Env.Environment
-> RQLQuery

View File

@ -26,7 +26,6 @@ import qualified Web.Spock.Core as Spock
import Control.Concurrent.MVar.Lifted
import Control.Exception (IOException, try)
import Control.Monad.Morph (hoist)
import Control.Monad.Stateless
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson hiding (json)
@ -54,9 +53,9 @@ import qualified Hasura.Tracing as Tracing
import Hasura.EncJSON
import Hasura.GraphQL.Logging (MonadQueryLog (..))
import Hasura.HTTP
import Hasura.Metadata.Class
import Hasura.RQL.DDL.Schema
import Hasura.RQL.Types
import Hasura.RQL.Types.Run
import Hasura.Server.API.Config (runGetConfig)
import Hasura.Server.API.Query
import Hasura.Server.Auth (AuthMode (..), UserAuthentication (..))
@ -87,7 +86,7 @@ data SchemaCacheRef
-- the IORef) where we serve a request with a stale schemacache but I guess
-- it is an okay trade-off to pay for a higher throughput (I remember doing a
-- bunch of benchmarks to test this hypothesis).
, _scrCache :: IORef (RebuildableSchemaCache Run, SchemaCacheVer)
, _scrCache :: IORef (RebuildableSchemaCache, SchemaCacheVer)
, _scrOnChange :: IO ()
-- ^ an action to run when schemacache changes
}
@ -120,7 +119,7 @@ data HandlerCtx
, hcSourceIpAddress :: !Wai.IpAddress
}
type Handler m = ExceptT QErr (ReaderT HandlerCtx m)
type Handler m = ReaderT HandlerCtx (MetadataStorageT m)
data APIResp
= JSONResp !(HttpResponse EncJSON)
@ -130,7 +129,6 @@ data APIHandler m a
= AHGet !(Handler m APIResp)
| AHPost !(a -> Handler m APIResp)
boolToText :: Bool -> Text
boolToText = bool "false" "true"
@ -147,7 +145,7 @@ logInconsObjs logger objs =
withSCUpdate
:: (MonadIO m, MonadBaseControl IO m)
=> SchemaCacheRef -> L.Logger L.Hasura -> m (a, RebuildableSchemaCache Run) -> m a
=> SchemaCacheRef -> L.Logger L.Hasura -> m (a, RebuildableSchemaCache) -> m a
withSCUpdate scr logger action =
withMVarMasked lk $ \() -> do
(!res, !newSC) <- action
@ -194,13 +192,13 @@ parseBody reqBody =
Left e -> throw400 InvalidJSON (T.pack e)
Right jVal -> decodeValue jVal
onlyAdmin :: (Monad m) => Handler m ()
onlyAdmin :: (MonadError QErr m, MonadReader HandlerCtx m) => m ()
onlyAdmin = do
uRole <- asks (_uiRole . hcUser)
when (uRole /= adminRoleName) $
throw400 AccessDenied "You have to be an admin to access this endpoint"
buildQCtx :: (MonadIO m) => Handler m QCtx
buildQCtx :: (MonadIO m, MonadReader HandlerCtx m) => m QCtx
buildQCtx = do
scRef <- asks (scCacheRef . hcServerCtx)
userInfo <- asks hcUser
@ -213,11 +211,18 @@ setHeader (headerName, headerValue) =
Spock.setHeader (bsToTxt $ CI.original headerName) (bsToTxt headerValue)
-- | Typeclass representing the metadata API authorization effect
class Monad m => MetadataApiAuthorization m where
authorizeMetadataApi :: HasVersion => RQLQuery -> UserInfo -> Handler m ()
class (Monad m) => MetadataApiAuthorization m where
authorizeMetadataApi
:: HasVersion => RQLQuery -> HandlerCtx -> m (Either QErr ())
instance MetadataApiAuthorization m => MetadataApiAuthorization (ReaderT r m) where
authorizeMetadataApi q hc = lift $ authorizeMetadataApi q hc
instance MetadataApiAuthorization m => MetadataApiAuthorization (MetadataStorageT m) where
authorizeMetadataApi q hc = lift $ authorizeMetadataApi q hc
instance MetadataApiAuthorization m => MetadataApiAuthorization (Tracing.TraceT m) where
authorizeMetadataApi q ui = hoist (hoist lift) $ authorizeMetadataApi q ui
authorizeMetadataApi q hc = lift $ authorizeMetadataApi q hc
-- | The config API (/v1alpha1/config) handler
class Monad m => MonadConfigApiHandler m where
@ -307,15 +312,16 @@ mkSpockAction serverCtx qErrEncoder qErrModifier apiHandler = do
scResponseInternalErrorsConfig serverCtx
limits <- lift askResourceLimits
let runHandler = runMetadataStorageT . flip runReaderT handlerState . runResourceLimits limits
(serviceTime, (result, q)) <- withElapsedTime $ case apiHandler of
AHGet handler -> do
res <- lift $ runReaderT (runExceptT (runResourceLimits limits handler)) handlerState
res <- lift $ runHandler handler
return (res, Nothing)
AHPost handler -> do
parsedReqE <- runExceptT $ parseBody reqBody
parsedReq <- onLeft parsedReqE (logErrorAndResp (Just userInfo) requestId req (reqBody, Nothing) includeInternal headers . qErrModifier)
res <- lift $ runReaderT (runExceptT . runResourceLimits limits $ handler parsedReq) handlerState
res <- lift $ runHandler $ handler parsedReq
return (res, Just parsedReq)
-- apply the error modifier
@ -364,12 +370,14 @@ mkSpockAction serverCtx qErrEncoder qErrModifier apiHandler = do
v1QueryHandler
:: (HasVersion, MonadIO m, MonadBaseControl IO m, MetadataApiAuthorization m, Tracing.MonadTrace m)
:: ( HasVersion, MonadIO m, MonadBaseControl IO m, MetadataApiAuthorization m, Tracing.MonadTrace m
, MonadReader HandlerCtx m
, MonadMetadataStorage m
)
=> RQLQuery
-> Handler m (HttpResponse EncJSON)
-> m (HttpResponse EncJSON)
v1QueryHandler query = do
userInfo <- asks hcUser
authorizeMetadataApi query userInfo
(liftEitherM . authorizeMetadataApi query) =<< ask
scRef <- asks (scCacheRef . hcServerCtx)
logger <- asks (scLogger . hcServerCtx)
res <- bool (fst <$> dbAction) (withSCUpdate scRef logger dbAction) $ queryModifiesSchemaCache query
@ -395,9 +403,12 @@ v1Alpha1GQHandler
, Tracing.MonadTrace m
, GH.MonadExecuteQuery m
, EQ.MonadQueryInstrumentation m
, MonadError QErr m
, MonadReader HandlerCtx m
, MonadMetadataStorage (MetadataStorageT m)
)
=> E.GraphQLQueryType -> GH.GQLBatchedReqs GH.GQLQueryText
-> Handler m (HttpResponse EncJSON)
-> m (HttpResponse EncJSON)
v1Alpha1GQHandler queryType query = do
userInfo <- asks hcUser
reqHeaders <- asks hcReqHeaders
@ -428,9 +439,12 @@ v1GQHandler
, Tracing.MonadTrace m
, GH.MonadExecuteQuery m
, EQ.MonadQueryInstrumentation m
, MonadError QErr m
, MonadReader HandlerCtx m
, MonadMetadataStorage (MetadataStorageT m)
)
=> GH.GQLBatchedReqs GH.GQLQueryText
-> Handler m (HttpResponse EncJSON)
-> m (HttpResponse EncJSON)
v1GQHandler = v1Alpha1GQHandler E.QueryHasura
v1GQRelayHandler
@ -441,15 +455,22 @@ v1GQRelayHandler
, Tracing.MonadTrace m
, GH.MonadExecuteQuery m
, EQ.MonadQueryInstrumentation m
, MonadError QErr m
, MonadReader HandlerCtx m
, MonadMetadataStorage (MetadataStorageT m)
)
=> GH.GQLBatchedReqs GH.GQLQueryText
-> Handler m (HttpResponse EncJSON)
-> m (HttpResponse EncJSON)
v1GQRelayHandler = v1Alpha1GQHandler E.QueryRelay
gqlExplainHandler
:: forall m. (MonadIO m)
:: forall m. ( MonadIO m
, MonadError QErr m
, MonadReader HandlerCtx m
, MonadMetadataStorage (MetadataStorageT m)
)
=> GE.GQLExplain
-> Handler (Tracing.TraceT m) (HttpResponse EncJSON)
-> m (HttpResponse EncJSON)
gqlExplainHandler query = do
onlyAdmin
scRef <- asks (scCacheRef . hcServerCtx)
@ -468,7 +489,7 @@ gqlExplainHandler query = do
res <- GE.explainGQLQuery pgExecCtx sc query
return $ HttpResponse res []
v1Alpha1PGDumpHandler :: (MonadIO m) => PGD.PGDumpReqBody -> Handler m APIResp
v1Alpha1PGDumpHandler :: (MonadIO m, MonadError QErr m, MonadReader HandlerCtx m) => PGD.PGDumpReqBody -> m APIResp
v1Alpha1PGDumpHandler b = do
onlyAdmin
ci <- asks (scConnInfo . hcServerCtx)
@ -518,9 +539,9 @@ renderHtmlTemplate template jVal =
newtype LegacyQueryParser m
= LegacyQueryParser
{ getLegacyQueryParser :: PG.QualifiedTable -> Object -> Handler m RQLQueryV1 }
{ getLegacyQueryParser :: PG.QualifiedTable -> Object -> m RQLQueryV1 }
queryParsers :: (Monad m) => M.HashMap Text (LegacyQueryParser m)
queryParsers :: (MonadError QErr m) => M.HashMap Text (LegacyQueryParser m)
queryParsers =
M.fromList
[ ("select", mkLegacyQueryParser RQSelect)
@ -537,9 +558,12 @@ queryParsers =
return $ f q
legacyQueryHandler
:: (HasVersion, MonadIO m, MonadBaseControl IO m, MetadataApiAuthorization m, Tracing.MonadTrace m)
:: ( HasVersion, MonadIO m, MonadBaseControl IO m, MetadataApiAuthorization m, Tracing.MonadTrace m
, MonadReader HandlerCtx m
, MonadMetadataStorage m
)
=> PG.TableName -> Text -> Object
-> Handler m (HttpResponse EncJSON)
-> m (HttpResponse EncJSON)
legacyQueryHandler tn queryType req =
case M.lookup queryType queryParsers of
Just queryParser -> getLegacyQueryParser queryParser qt req >>= v1QueryHandler . RQV1
@ -587,6 +611,7 @@ mkWaiApp
, GH.MonadExecuteQuery m
, EQ.MonadQueryInstrumentation m
, HasResourceLimits m
, MonadMetadataStorage (MetadataStorageT m)
)
=> Env.Environment
-- ^ Set of environment variables for reference in UIs
@ -621,7 +646,7 @@ mkWaiApp
-> E.PlanCacheOptions
-> ResponseInternalErrorsConfig
-> Maybe EL.LiveQueryPostPollHook
-> RebuildableSchemaCache Run
-> RebuildableSchemaCache
-> EKG.Store
-> WS.ConnectionOptions
-> KeepAliveDelay
@ -699,6 +724,7 @@ httpApp
, Tracing.HasReporter m
, GH.MonadExecuteQuery m
, EQ.MonadQueryInstrumentation m
, MonadMetadataStorage (MetadataStorageT m)
, HasResourceLimits m
)
=> CorsConfig

View File

@ -32,7 +32,6 @@ import qualified Database.PG.Query.Connection as Q
import qualified Language.Haskell.TH.Lib as TH
import qualified Language.Haskell.TH.Syntax as TH
import Control.Monad.Unique
import Data.Time.Clock (UTCTime)
import System.Directory (doesFileExist)
@ -85,22 +84,24 @@ migrateCatalog
. ( HasVersion
, MonadIO m
, MonadTx m
, MonadUnique m
, HasHttpManager m
, HasSQLGenCtx m
)
=> Env.Environment
-> UTCTime
-> m (MigrationResult, RebuildableSchemaCache m)
-> m (MigrationResult, RebuildableSchemaCache)
migrateCatalog env migrationTime = do
doesSchemaExist (SchemaName "hdb_catalog") >>= \case
migrationResult <- doesSchemaExist (SchemaName "hdb_catalog") >>= \case
False -> initialize True
True -> doesTableExist (SchemaName "hdb_catalog") (TableName "hdb_version") >>= \case
False -> initialize False
True -> migrateFrom =<< getCatalogVersion
metadata <- liftTx fetchMetadataFromCatalog
schemaCache <- buildRebuildableSchemaCache env metadata
pure (migrationResult, schemaCache)
where
-- initializes the catalog, creating the schema if necessary
initialize :: Bool -> m (MigrationResult, RebuildableSchemaCache m)
initialize :: Bool -> m MigrationResult
initialize createSchema = do
liftTx $ Q.catchE defaultTxErrorHandler $
when createSchema $ Q.unitQ "CREATE SCHEMA hdb_catalog" () False
@ -114,9 +115,8 @@ migrateCatalog env migrationTime = do
<> "PostgreSQL server. Please make sure this extension is available."
runTx $(Q.sqlFromFile "src-rsr/initialise.sql")
schemaCache <- buildRebuildableSchemaCache env
updateCatalogVersion
pure (MRInitialized, schemaCache)
pure MRInitialized
where
needsPGCryptoError e@(Q.PGTxErr _ _ _ err) =
case err of
@ -136,11 +136,9 @@ migrateCatalog env migrationTime = do
<> " https://hasura.io/docs/1.0/graphql/manual/deployment/postgres-permissions.html"
-- migrates an existing catalog to the latest version from an existing verion
migrateFrom :: Text -> m (MigrationResult, RebuildableSchemaCache m)
migrateFrom :: Text -> m MigrationResult
migrateFrom previousVersion
| previousVersion == latestCatalogVersionString = do
schemaCache <- buildRebuildableSchemaCache env
pure (MRNothingToDo, schemaCache)
| previousVersion == latestCatalogVersionString = pure MRNothingToDo
| [] <- neededMigrations =
throw400 NotSupported $
"Cannot use database previously used with a newer version of graphql-engine (expected"
@ -148,9 +146,8 @@ migrateCatalog env migrationTime = do
<> " is " <> previousVersion <> ")."
| otherwise = do
traverse_ (mpMigrate . snd) neededMigrations
schemaCache <- buildRebuildableSchemaCache env
updateCatalogVersion
pure (MRMigrated previousVersion, schemaCache)
pure $ MRMigrated previousVersion
where
neededMigrations =
dropWhile ((/= previousVersion) . fst) (migrations False)

View File

@ -2,14 +2,16 @@
module Hasura.Server.SchemaUpdate
( startSchemaSyncListenerThread
, startSchemaSyncProcessorThread
, EventPayload(..)
, SchemaSyncCtx(..)
)
where
import Hasura.Backends.Postgres.Connection
import Hasura.Logging
import Hasura.Metadata.Class
import Hasura.Prelude
import Hasura.RQL.DDL.Schema (fetchMetadataFromCatalog, runCacheRWT)
import Hasura.RQL.DDL.Schema (runCacheRWT)
import Hasura.RQL.Types
import Hasura.RQL.Types.Run
import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate)
@ -17,6 +19,7 @@ import Hasura.Server.Logging
import Hasura.Server.Types (InstanceId (..))
import Hasura.Session
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
@ -74,7 +77,7 @@ $(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload)
data SchemaSyncEvent
= SSEListenStart !UTC.UTCTime
| SSEPayload !EventPayload
| SSEPayload !Value
instance ToJSON SchemaSyncEvent where
toJSON = \case
@ -82,7 +85,7 @@ instance ToJSON SchemaSyncEvent where
SSEPayload payload -> toJSON payload
data ThreadError
= TEJsonParse !Text
= TEPayloadParse !Text
| TEQueryError !QErr
$(deriveToJSON
defaultOptions { constructorTagModifier = snakeCase . drop 2
@ -177,7 +180,7 @@ startSchemaSyncListenerThread pool logger instanceId = do
-- | An async thread which processes the schema sync events
-- See Note [Schema Cache Sync]
startSchemaSyncProcessorThread
:: (MonadIO m)
:: (C.ForkableMonadIO m, MonadMetadataStorage (MetadataStorageT m))
=> SQLGenCtx
-> PG.PGPool
-> Logger Hasura
@ -190,7 +193,7 @@ startSchemaSyncProcessorThread
startSchemaSyncProcessorThread sqlGenCtx pool logger httpMgr
schemaSyncEventRef cacheRef instanceId cacheInitStartTime = do
-- Start processor thread
processorThread <- liftIO $ C.forkImmortal "SchemeUpdate.processor" logger $
processorThread <- C.forkImmortal "SchemeUpdate.processor" logger $
processor sqlGenCtx pool logger httpMgr schemaSyncEventRef cacheRef instanceId cacheInitStartTime
logThreadStarted logger instanceId TTProcessor processorThread
pure processorThread
@ -220,7 +223,7 @@ listener pool logger updateEventRef =
STM.atomically $ STM.writeTVar updateEventRef $ Just $ SSEListenStart time
PG.PNEPQNotify notif ->
case eitherDecodeStrict $ PQ.notifyExtra notif of
Left e -> logError logger threadType $ TEJsonParse $ T.pack e
Left e -> logError logger threadType $ TEPayloadParse $ T.pack e
Right payload -> do
logInfo logger threadType $ object ["received_event" .= payload]
#ifndef PROFILING
@ -239,7 +242,11 @@ listener pool logger updateEventRef =
-- | An IO action that processes events from Queue, in a loop forever.
processor
:: SQLGenCtx
:: forall m void.
( C.ForkableMonadIO m
, MonadMetadataStorage (MetadataStorageT m)
)
=> SQLGenCtx
-> PG.PGPool
-> Logger Hasura
-> HTTP.Manager
@ -247,15 +254,29 @@ processor
-> SchemaCacheRef
-> InstanceId
-> UTC.UTCTime
-> IO void
-> m void
processor sqlGenCtx pool logger httpMgr updateEventRef
cacheRef instanceId cacheInitStartTime =
-- Never exits
forever $ do
event <- STM.atomically getLatestEvent
event <- liftIO $ STM.atomically getLatestEvent
logInfo logger threadType $ object ["processed_event" .= event]
when (shouldReload event) $
refreshSchemaCache sqlGenCtx pool logger httpMgr cacheRef (getCacheInvalidations event)
(shouldReload, cacheInvalidations) <- case event of
SSEListenStart time ->
-- If listening started after cache initialization, just refresh the schema cache unconditionally.
-- See Note [Schema Cache Sync]
pure $ (time > cacheInitStartTime, mempty)
SSEPayload payload -> do
eitherResult <- runMetadataStorageT $ processSchemaSyncEventPayload instanceId payload
case eitherResult of
Left e -> do
logError logger threadType $ TEPayloadParse $ qeError e
pure (False, mempty)
Right SchemaSyncEventProcessResult{..} ->
pure (_sseprShouldReload, _sseprCacheInvalidations)
when shouldReload $
refreshSchemaCache sqlGenCtx pool logger httpMgr cacheRef cacheInvalidations
threadType "schema cache reloaded"
where
-- checks if there is an event
@ -269,36 +290,30 @@ processor sqlGenCtx pool logger httpMgr updateEventRef
Nothing -> STM.retry
threadType = TTProcessor
shouldReload = \case
SSEListenStart time ->
-- If listening started after cache initialization, just refresh the schema cache unconditionally.
-- See Note [Schema Cache Sync]
time > cacheInitStartTime
SSEPayload payload ->
-- When event is from other sever instance
_epInstanceId payload /= instanceId
getCacheInvalidations = \case
SSEListenStart _ -> mempty
SSEPayload payload -> _epInvalidations payload
refreshSchemaCache
:: SQLGenCtx
:: ( MonadIO m
, MonadBaseControl IO m
, MonadMetadataStorage (MetadataStorageT m)
)
=> SQLGenCtx
-> PG.PGPool
-> Logger Hasura
-> HTTP.Manager
-> SchemaCacheRef
-> CacheInvalidations
-> ThreadType
-> Text -> IO ()
-> Text -> m ()
refreshSchemaCache sqlGenCtx pool logger httpManager cacheRef invalidations threadType msg = do
-- Reload schema cache from catalog
resE <- liftIO $ runExceptT $ withSCUpdate cacheRef logger do
rebuildableCache <- fst <$> liftIO (readIORef $ _scrCache cacheRef)
((), cache, _) <- fetchMetadataAndBuildCache
& runCacheRWT rebuildableCache
& peelRun runCtx pgCtx PG.ReadWrite Nothing
pure ((), cache)
eitherMetadata <- runMetadataStorageT fetchMetadata
resE <- runExceptT $ do
metadata <- liftEither eitherMetadata
withSCUpdate cacheRef logger do
rebuildableCache <- fst <$> liftIO (readIORef $ _scrCache cacheRef)
((), cache, _) <- buildSchemaCacheWithOptions CatalogSync invalidations metadata
& runCacheRWT rebuildableCache
& peelRun runCtx pgCtx PG.ReadWrite Nothing
pure ((), cache)
case resE of
Left e -> logError logger threadType $ TEQueryError e
Right () -> logInfo logger threadType $ object ["message" .= msg]
@ -306,15 +321,11 @@ refreshSchemaCache sqlGenCtx pool logger httpManager cacheRef invalidations thre
runCtx = RunCtx adminUserInfo httpManager sqlGenCtx
pgCtx = mkPGExecCtx PG.Serializable pool
fetchMetadataAndBuildCache = do
metadata <- liftTx fetchMetadataFromCatalog
buildSchemaCacheWithOptions CatalogSync invalidations metadata
logInfo :: Logger Hasura -> ThreadType -> Value -> IO ()
logInfo :: (MonadIO m) => Logger Hasura -> ThreadType -> Value -> m ()
logInfo logger threadType val = unLogger logger $
SchemaSyncThreadLog LevelInfo threadType val
logError :: ToJSON a => Logger Hasura -> ThreadType -> a -> IO ()
logError :: (MonadIO m, ToJSON a) => Logger Hasura -> ThreadType -> a -> m ()
logError logger threadType err =
unLogger logger $ SchemaSyncThreadLog LevelError threadType $
object ["error" .= toJSON err]

View File

@ -28,11 +28,11 @@ import Hasura.Server.Version (HasVersion)
-- -- NOTE: downgrade test disabled for now (see #5273)
newtype CacheRefT m a
= CacheRefT { runCacheRefT :: MVar (RebuildableSchemaCache m) -> m a }
= CacheRefT { runCacheRefT :: MVar RebuildableSchemaCache -> m a }
deriving
( Functor, Applicative, Monad, MonadIO, MonadError e, MonadBase b, MonadBaseControl b
, MonadTx, MonadUnique, UserInfoM, HasHttpManager, HasSQLGenCtx )
via (ReaderT (MVar (RebuildableSchemaCache m)) m)
via (ReaderT (MVar RebuildableSchemaCache) m)
instance MonadTrans CacheRefT where
lift = CacheRefT . const
@ -41,7 +41,7 @@ instance (MonadBase IO m) => TableCoreInfoRM (CacheRefT m)
instance (MonadBase IO m) => CacheRM (CacheRefT m) where
askSchemaCache = CacheRefT (fmap lastBuiltSchemaCache . readMVar)
instance (MonadIO m, MonadBaseControl IO m, MonadTx m) => CacheRWM (CacheRefT m) where
instance (MonadIO m, MonadBaseControl IO m, MonadTx m, HasHttpManager m, HasSQLGenCtx m) => CacheRWM (CacheRefT m) where
buildSchemaCacheWithOptions reason invalidations metadata = CacheRefT $ flip modifyMVar \schemaCache -> do
((), cache, _) <- runCacheRWT schemaCache (buildSchemaCacheWithOptions reason invalidations metadata)
pure (cache, ())
@ -60,7 +60,6 @@ spec
, MonadIO m
, MonadBaseControl IO m
, MonadTx m
, MonadUnique m
, HasHttpManager m
, HasSQLGenCtx m
)

View File

@ -86,7 +86,7 @@ buildPostgresSpecs pgConnOptions = do
httpManager <- HTTP.newManager HTTP.tlsManagerSettings
let runContext = RunCtx adminUserInfo httpManager (SQLGenCtx False)
runAsAdmin :: Run a -> IO a
runAsAdmin :: RunT IO a -> IO a
runAsAdmin =
peelRun runContext pgContext Q.ReadWrite Nothing
>>> runExceptT