diff --git a/server/src-exec/Main.hs b/server/src-exec/Main.hs index 46ebb8252ee..69e09099b4a 100644 --- a/server/src-exec/Main.hs +++ b/server/src-exec/Main.hs @@ -10,6 +10,7 @@ import Data.Time.Clock.POSIX (getPOSIXTime) import Hasura.App import Hasura.Logging (Hasura, LogLevel (..), defaultEnabledEngineLogTypes) +import Hasura.Metadata.Class import Hasura.Prelude import Hasura.RQL.DDL.Schema import Hasura.RQL.Types @@ -85,14 +86,16 @@ runApp env (HGEOptionsG rci hgeCmd) = do queryBs <- liftIO BL.getContents let sqlGenCtx = SQLGenCtx False pool <- mkMinimalPool _gcConnInfo - res <- runAsAdmin pool sqlGenCtx _gcHttpManager $ do - schemaCache <- buildRebuildableSchemaCache env - metadata <- liftTx fetchMetadataFromCatalog - execQuery env queryBs - & Tracing.runTraceTWithReporter Tracing.noReporter "execute" - & runMetadataT metadata - & runCacheRWT schemaCache - & fmap (\((res, _), _, _) -> res) + res <- flip runPGMetadataStorageApp pool $ + runMetadataStorageT $ liftEitherM $ + runAsAdmin pool sqlGenCtx _gcHttpManager $ do + metadata <- liftTx fetchMetadataFromCatalog + schemaCache <- buildRebuildableSchemaCache env metadata + execQuery env queryBs + & Tracing.runTraceTWithReporter Tracing.noReporter "execute" + & runMetadataT metadata + & runCacheRWT schemaCache + & fmap (\((res, _), _, _) -> res) either (printErrJExit ExecuteProcessError) (liftIO . BLC.putStrLn) res HCDowngrade opts -> do diff --git a/server/src-lib/Hasura/App.hs b/server/src-lib/Hasura/App.hs index 8124ca855f4..075457f37a8 100644 --- a/server/src-lib/Hasura/App.hs +++ b/server/src-lib/Hasura/App.hs @@ -13,7 +13,6 @@ import Control.Monad.Stateless import Control.Monad.STM (atomically) import Control.Monad.Trans.Control (MonadBaseControl (..)) import Control.Monad.Unique -import Data.Aeson ((.=)) import Data.Time.Clock (UTCTime) #ifndef PROFILING import GHC.AssertNF @@ -26,6 +25,7 @@ import System.Mem (performMajorGC) import qualified Control.Concurrent.Async.Lifted.Safe as LA import qualified Control.Concurrent.Extended as C import qualified Control.Immortal as Immortal +import qualified Data.Aeson as J import qualified Data.Aeson as A import qualified Data.ByteString.Char8 as BC import qualified Data.ByteString.Lazy.Char8 as BLC @@ -48,7 +48,7 @@ import Hasura.Eventing.EventTrigger import Hasura.Eventing.ScheduledTrigger import Hasura.GraphQL.Execute (MonadGQLExecutionCheck (..), checkQueryInAllowlist) -import Hasura.GraphQL.Execute.Action (asyncActionsProcessor) +import Hasura.GraphQL.Execute.Action import Hasura.GraphQL.Execute.Query (MonadQueryInstrumentation (..), noProfile) import Hasura.GraphQL.Logging (MonadQueryLog (..), QueryLog (..)) @@ -58,6 +58,7 @@ import Hasura.Logging import Hasura.Metadata.Class import Hasura.Prelude import Hasura.RQL.DDL.Schema.Cache +import Hasura.RQL.DDL.Schema.Catalog import Hasura.RQL.Types import Hasura.RQL.Types.Run import Hasura.Server.API.Query (requiresAdmin, runQueryM) @@ -178,7 +179,7 @@ data ServeCtx , _scConnInfo :: !Q.ConnInfo , _scPgPool :: !Q.PGPool , _scShutdownLatch :: !ShutdownLatch - , _scSchemaCache :: !(RebuildableSchemaCache Run) + , _scSchemaCache :: !RebuildableSchemaCache , _scSchemaSyncCtx :: !SchemaSyncCtx } @@ -202,7 +203,7 @@ newtype PGMetadataStorageApp a -- | Initializes or migrates the catalog and returns the context required to start the server. initialiseServeCtx - :: (HasVersion, MonadIO m, MonadCatch m) + :: (HasVersion, MonadIO m, MonadBaseControl IO m, MonadCatch m) => Env.Environment -> GlobalCtx -> ServeOptions Hasura @@ -243,9 +244,9 @@ mkLoggers enabledLogs logLevel = do -- | helper function to initialize or migrate the @hdb_catalog@ schema (used by pro as well) migrateCatalogSchema - :: (HasVersion, MonadIO m) + :: (HasVersion, MonadIO m, MonadBaseControl IO m) => Env.Environment -> Logger Hasura -> Q.PGPool -> HTTP.Manager -> SQLGenCtx - -> m (RebuildableSchemaCache Run, UTCTime) + -> m (RebuildableSchemaCache, UTCTime) migrateCatalogSchema env logger pool httpManager sqlGenCtx = do let pgExecCtx = mkPGExecCtx Q.Serializable pool adminRunCtx = RunCtx adminUserInfo httpManager sqlGenCtx @@ -413,7 +414,7 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po -- start a backgroud thread to handle async actions asyncActionsThread <- C.forkImmortal "asyncActionsProcessor" logger $ - asyncActionsProcessor env logger (_scrCache cacheRef) _scPgPool _scHttpManager + asyncActionsProcessor env logger (_scrCache cacheRef) _scHttpManager -- start a background thread to create new cron events cronEventsThread <- C.forkImmortal "runCronEventsGenerator" logger $ @@ -461,16 +462,16 @@ runHGEServer env ServeOptions{..} ServeCtx{..} pgExecCtx initTime shutdownApp po shutdownHandler' <- liftWithStateless $ \lowerIO -> pure $ shutdownHandler _scLoggers immortalThreads stopWsServer lockedEventsCtx _scPgPool $ \a b -> hoist lowerIO $ unlockScheduledEvents a b - + -- Install a variant of forkIOWithUnmask which tracks Warp threads using an EKG metric let setForkIOWithMetrics :: Warp.Settings -> Warp.Settings setForkIOWithMetrics = Warp.setFork \f -> do void $ C.forkIOWithUnmask (\unmask -> - bracket_ + bracket_ (EKG.Gauge.inc $ smWarpThreads serverMetrics) (EKG.Gauge.dec $ smWarpThreads serverMetrics) (f unmask)) - + let warpSettings = Warp.setPort soPort . Warp.setHost soHost . Warp.setGracefulShutdownTimeout (Just 30) -- 30s graceful shutdown @@ -625,11 +626,11 @@ ourIdleGC (Logger logger) idleInterval minGCInterval maxNoGCInterval = go gcs major_gcs timerSinceLastMajorGC runAsAdmin - :: (MonadIO m) + :: (MonadIO m, MonadBaseControl IO m) => Q.PGPool -> SQLGenCtx -> HTTP.Manager - -> Run a + -> RunT m a -> m (Either QErr a) runAsAdmin pool sqlGenCtx httpManager m = do let runCtx = RunCtx adminUserInfo httpManager sqlGenCtx @@ -647,6 +648,7 @@ execQuery , UserInfoM m , Tracing.MonadTrace m , MetadataM m + , MonadScheduledEvents m ) => Env.Environment -> BLC.ByteString @@ -684,8 +686,8 @@ instance UserAuthentication (Tracing.TraceT PGMetadataStorageApp) where runExceptT $ getUserInfoWithExpTime logger manager headers authMode instance MetadataApiAuthorization PGMetadataStorageApp where - authorizeMetadataApi query userInfo = do - let currRole = _uiRole userInfo + authorizeMetadataApi query handlerCtx = runExceptT do + let currRole = _uiRole $ hcUser handlerCtx when (requiresAdmin query && currRole /= adminRoleName) $ withPathK "args" $ throw400 AccessDenied errMsg where @@ -716,11 +718,33 @@ runInSeparateTx tx = do pool <- lift ask liftEitherM $ liftIO $ runExceptT $ Q.runTx pool (Q.RepeatableRead, Nothing) tx +-- | Using @pg_notify@ function to publish schema sync events to other server +-- instances via 'hasura_schema_update' channel. +-- See Note [Schema Cache Sync] +notifySchemaCacheSyncTx :: InstanceId -> CacheInvalidations -> Q.TxE QErr () +notifySchemaCacheSyncTx instanceId invalidations = do + Q.Discard () <- Q.withQE defaultTxErrorHandler [Q.sql| + SELECT pg_notify('hasura_schema_update', json_build_object( + 'instance_id', $1, + 'occurred_at', NOW(), + 'invalidations', $2 + )::text + ) + |] (instanceId, Q.AltJ invalidations) True + pure () + -- | Each of the function in the type class is executed in a totally separate transaction. -- -- To learn more about why the instance is derived as following, see Note [Generic MetadataStorageT transformer] instance MonadMetadataStorage (MetadataStorageT PGMetadataStorageApp) where + fetchMetadata = runInSeparateTx fetchMetadataFromCatalog + setMetadata = runInSeparateTx . setMetadataInCatalog + notifySchemaCacheSync a b = runInSeparateTx $ notifySchemaCacheSyncTx a b + processSchemaSyncEventPayload instanceId payload = do + EventPayload{..} <- decodeValue payload + pure $ SchemaSyncEventProcessResult (instanceId /= _epInstanceId) _epInvalidations + getDeprivedCronTriggerStats = runInSeparateTx getDeprivedCronTriggerStatsTx getScheduledEventsForDelivery = runInSeparateTx getScheduledEventsForDeliveryTx insertScheduledEvent = runInSeparateTx . insertScheduledEventTx @@ -728,6 +752,12 @@ instance MonadMetadataStorage (MetadataStorageT PGMetadataStorageApp) where setScheduledEventOp a b c = runInSeparateTx $ setScheduledEventOpTx a b c unlockScheduledEvents a b = runInSeparateTx $ unlockScheduledEventsTx a b unlockAllLockedScheduledEvents = runInSeparateTx unlockAllLockedScheduledEventsTx + clearFutureCronEvents = runInSeparateTx . dropFutureCronEventsTx + + insertAction a b c d = runInSeparateTx $ insertActionTx a b c d + fetchUndeliveredActionEvents = runInSeparateTx fetchUndeliveredActionEventsTx + setActionStatus a b = runInSeparateTx $ setActionStatusTx a b + fetchActionResponse = runInSeparateTx . fetchActionResponseTx --- helper functions --- @@ -735,12 +765,12 @@ mkConsoleHTML :: HasVersion => Text -> AuthMode -> Bool -> Maybe Text -> Either mkConsoleHTML path authMode enableTelemetry consoleAssetsDir = renderHtmlTemplate consoleTmplt $ -- variables required to render the template - A.object [ "isAdminSecretSet" .= isAdminSecretSet authMode - , "consolePath" .= consolePath - , "enableTelemetry" .= boolToText enableTelemetry - , "cdnAssets" .= boolToText (isNothing consoleAssetsDir) - , "assetsVersion" .= consoleAssetsVersion - , "serverVersion" .= currentVersion + A.object [ "isAdminSecretSet" J..= isAdminSecretSet authMode + , "consolePath" J..= consolePath + , "enableTelemetry" J..= boolToText enableTelemetry + , "cdnAssets" J..= boolToText (isNothing consoleAssetsDir) + , "assetsVersion" J..= consoleAssetsVersion + , "serverVersion" J..= currentVersion ] where consolePath = case path of diff --git a/server/src-lib/Hasura/Backends/Postgres/Connection.hs b/server/src-lib/Hasura/Backends/Postgres/Connection.hs index 7c1e4bed1ad..5fa8e3ca752 100644 --- a/server/src-lib/Hasura/Backends/Postgres/Connection.hs +++ b/server/src-lib/Hasura/Backends/Postgres/Connection.hs @@ -47,8 +47,8 @@ import Hasura.Backends.Postgres.SQL.Error import Hasura.Backends.Postgres.SQL.Types import Hasura.EncJSON import Hasura.RQL.Types.Error -import Hasura.SQL.Types import Hasura.Session +import Hasura.SQL.Types type RunTx = forall m a. (MonadIO m, MonadBaseControl IO m) => Q.TxET QErr m a -> ExceptT QErr m a diff --git a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs index 6b1f502eba3..7d5e1a7392f 100644 --- a/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/Eventing/ScheduledTrigger.hs @@ -82,6 +82,7 @@ module Hasura.Eventing.ScheduledTrigger , unlockScheduledEventsTx , unlockAllLockedScheduledEventsTx , insertScheduledEventTx + , dropFutureCronEventsTx ) where import Hasura.Prelude @@ -147,7 +148,7 @@ runCronEventsGenerator logger getSC = do insertCronEventsFor cronTriggersForHydrationWithStats onLeft eitherRes $ L.unLogger logger . - ScheduledTriggerInternalErr . err500 Unexpected . T.pack . show + ScheduledTriggerInternalErr . err500 Unexpected . tshow liftIO $ sleep (minutes 1) where @@ -660,6 +661,15 @@ insertScheduledEventTx = \case toArr (CronEventSeed n t) = [(triggerNameToTxt n), (formatTime' t)] toTupleExp = TupleExp . map SELit +dropFutureCronEventsTx :: TriggerName -> Q.TxE QErr () +dropFutureCronEventsTx name = + Q.unitQE defaultTxErrorHandler + [Q.sql| + DELETE FROM hdb_catalog.hdb_cron_events + WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0 + |] (Identity name) False + + cronEventsTable :: QualifiedTable cronEventsTable = QualifiedObject "hdb_catalog" $ TableName "hdb_cron_events" diff --git a/server/src-lib/Hasura/GraphQL/Execute.hs b/server/src-lib/Hasura/GraphQL/Execute.hs index 58b2aa984ad..5bf09b581f5 100644 --- a/server/src-lib/Hasura/GraphQL/Execute.hs +++ b/server/src-lib/Hasura/GraphQL/Execute.hs @@ -41,6 +41,7 @@ import Hasura.GraphQL.RemoteServer (execRemoteGQ') import Hasura.GraphQL.Transport.HTTP.Protocol import Hasura.GraphQL.Utils (showName) import Hasura.HTTP +import Hasura.Metadata.Class import Hasura.RQL.Types import Hasura.Server.Types (RequestId) import Hasura.Server.Version (HasVersion) @@ -107,6 +108,10 @@ instance MonadGQLExecutionCheck m => MonadGQLExecutionCheck (Tracing.TraceT m) w checkGQLExecution ui det enableAL sc req = lift $ checkGQLExecution ui det enableAL sc req +instance MonadGQLExecutionCheck m => MonadGQLExecutionCheck (MetadataStorageT m) where + checkGQLExecution ui det enableAL sc req = + lift $ checkGQLExecution ui det enableAL sc req + getExecPlanPartial :: (MonadError QErr m) => UserInfo @@ -208,6 +213,7 @@ getResolvedExecPlan :: forall m tx . ( HasVersion , MonadError QErr m + , MonadMetadataStorage (MetadataStorageT m) , MonadIO m , Tracing.MonadTrace m , EQ.MonadQueryInstrumentation m diff --git a/server/src-lib/Hasura/GraphQL/Execute/Action.hs b/server/src-lib/Hasura/GraphQL/Execute/Action.hs index ca49506b456..835fd0605f9 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Action.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Action.hs @@ -1,10 +1,14 @@ module Hasura.GraphQL.Execute.Action - ( ActionExecuteTx + ( ActionExecuteTx(..) , ActionExecuteResult(..) - , resolveAsyncActionQuery , asyncActionsProcessor , resolveActionExecution , resolveActionMutationAsync + , resolveAsyncActionQuery + , insertActionTx + , fetchUndeliveredActionEventsTx + , setActionStatusTx + , fetchActionResponseTx ) where import Hasura.Prelude @@ -19,7 +23,6 @@ import qualified Data.Environment as Env import qualified Data.HashMap.Strict as Map import qualified Data.HashSet as Set import qualified Data.Text as T -import qualified Data.UUID as UUID import qualified Database.PG.Query as Q import qualified Language.GraphQL.Draft.Syntax as G import qualified Network.HTTP.Client as HTTP @@ -31,11 +34,12 @@ import Control.Exception (try) import Control.Lens import Control.Monad.Trans.Control (MonadBaseControl) import Data.Has -import Data.IORef import Data.Int (Int64) +import Data.IORef import Data.Text.Extended import qualified Hasura.Backends.Postgres.Execute.RemoteJoin as RJ +import qualified Hasura.Backends.Postgres.SQL.DML as S import qualified Hasura.Backends.Postgres.Translate.Select as RS import qualified Hasura.Logging as L import qualified Hasura.RQL.IR.Select as RS @@ -43,26 +47,30 @@ import qualified Hasura.Tracing as Tracing import Hasura.Backends.Postgres.SQL.Types import Hasura.Backends.Postgres.SQL.Value (PGScalarValue (..)) -import Hasura.Backends.Postgres.Translate.Select (asSingleRowJsonResp) import Hasura.Backends.Postgres.Translate.Column (toTxtValue) +import Hasura.Backends.Postgres.Translate.Select (asSingleRowJsonResp) import Hasura.EncJSON import Hasura.GraphQL.Execute.Prepare import Hasura.GraphQL.Parser import Hasura.GraphQL.Utils (showNames) import Hasura.HTTP +import Hasura.Metadata.Class import Hasura.RQL.DDL.Headers import Hasura.RQL.DDL.Schema.Cache import Hasura.RQL.Types -import Hasura.RQL.Types.Run -import Hasura.SQL.Types import Hasura.Server.Utils (mkClientHeadersForward, mkSetCookieHeaders) import Hasura.Server.Version (HasVersion) import Hasura.Session +import Hasura.SQL.Types -type ActionExecuteTx = - forall tx. (MonadIO tx, MonadTx tx, Tracing.MonadTrace tx) => tx EncJSON +newtype ActionExecuteTx = + ActionExecuteTx { + unActionExecuteTx + :: forall tx + . (MonadIO tx, MonadTx tx, Tracing.MonadTrace tx) => tx EncJSON + } newtype ActionContext = ActionContext {_acName :: ActionName} @@ -93,7 +101,7 @@ instance J.FromJSON ActionWebhookResponse where parseJSON v = case v of J.Array{} -> AWRArray <$> J.parseJSON v J.Object{} -> AWRObject <$> J.parseJSON v - _ -> fail $ "expecting object or array of objects for action webhook response" + _ -> fail "expecting object or array of objects for action webhook response" instance J.ToJSON ActionWebhookResponse where toJSON (AWRArray objects) = J.toJSON objects @@ -137,8 +145,8 @@ instance L.ToEngineLog ActionHandlerLog L.Hasura where data ActionExecuteResult = ActionExecuteResult - { _aerTransaction :: !ActionExecuteTx - , _aerHeaders :: !HTTP.ResponseHeaders + { _aerExecution :: !ActionExecuteTx + , _aerHeaders :: !HTTP.ResponseHeaders } -- | Synchronously execute webhook handler and resolve response to action "output" @@ -173,7 +181,7 @@ resolveActionExecution env logger userInfo annAction execContext = do executeAction :: RS.AnnSimpleSel 'Postgres -> ActionExecuteTx - executeAction astResolved = do + executeAction astResolved = ActionExecuteTx do let (astResolvedWithoutRemoteJoins,maybeRemoteJoins) = RJ.getRemoteJoins astResolved jsonAggType = mkJsonAggSelect outputType case maybeRemoteJoins of @@ -199,29 +207,17 @@ table provides the action response. See Note [Resolving async action query/subsc -- | Resolve asynchronous action mutation which returns only the action uuid resolveActionMutationAsync - :: ( MonadError QErr m - , MonadTx tx - ) + :: (MonadMetadataStorage m) => AnnActionMutationAsync -> [HTTP.Header] -> SessionVariables - -> m (tx EncJSON) -resolveActionMutationAsync annAction reqHeaders sessionVariables = - pure $ liftTx do - actionId <- runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler [Q.sql| - INSERT INTO - "hdb_catalog"."hdb_action_log" - ("action_name", "session_variables", "request_headers", "input_payload", "status") - VALUES - ($1, $2, $3, $4, $5) - RETURNING "id" - |] - (actionName, Q.AltJ sessionVariables, Q.AltJ $ toHeadersMap reqHeaders, Q.AltJ inputArgs, "created"::Text) False - - pure $ encJFromJValue $ UUID.toText actionId + -> m ActionExecuteTx +resolveActionMutationAsync annAction reqHeaders sessionVariables = do + actionId <- insertAction actionName sessionVariables reqHeaders inputArgs + pure $ ActionExecuteTx $ + pure $ encJFromJValue $ actionIdToText actionId where AnnActionMutationAsync actionName inputArgs = annAction - toHeadersMap = Map.fromList . map ((bsToTxt . CI.original) *** bsToTxt) {- Note: [Resolving async action query/subscription] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -234,13 +230,14 @@ action's type. Here, we treat the "output" field as a computed field to hdb_acti `jsonb_to_record` as custom SQL function. -} - -- TODO: Add tracing here? Avoided now because currently the function is pure resolveAsyncActionQuery - :: UserInfo + :: (MonadMetadataStorage m) + => UserInfo -> AnnActionAsyncQuery 'Postgres (UnpreparedValue 'Postgres) - -> RS.AnnSimpleSelG 'Postgres (UnpreparedValue 'Postgres) -resolveAsyncActionQuery userInfo annAction = + -> m (RS.AnnSimpleSelG 'Postgres (UnpreparedValue 'Postgres)) +resolveAsyncActionQuery userInfo annAction = do + actionLogResponse <- fetchActionResponse actionId let annotatedFields = asyncFields <&> second \case AsyncTypename t -> RS.AFExpression t AsyncOutput annFields -> @@ -251,32 +248,41 @@ resolveAsyncActionQuery userInfo annAction = processOutputSelectionSet inputTableArgument outputType definitionList annFields stringifyNumerics - AsyncId -> mkAnnFldFromPGCol "id" PGUUID - AsyncCreatedAt -> mkAnnFldFromPGCol "created_at" PGTimeStampTZ - AsyncErrors -> mkAnnFldFromPGCol "errors" PGJSONB + AsyncId -> mkAnnFldFromPGCol idColumn + AsyncCreatedAt -> mkAnnFldFromPGCol createdAtColumn + AsyncErrors -> mkAnnFldFromPGCol errorsColumn - tableFromExp = RS.FromTable actionLogTable + jsonbToRecordSet = QualifiedObject "pg_catalog" $ FunctionName "jsonb_to_recordset" + actionLogInput = UVLiteral $ S.SELit $ lbsToTxt $ J.encode [actionLogResponse] + functionArgs = RS.FunctionArgsExp [RS.AEInput actionLogInput] mempty + tableFromExp = RS.FromFunction jsonbToRecordSet functionArgs $ Just + [idColumn, createdAtColumn, responsePayloadColumn, errorsColumn, sessionVarsColumn] tableArguments = RS.noSelectArgs { RS._saWhere = Just tableBoolExpression} tablePermissions = RS.TablePerm annBoolExpTrue Nothing - in RS.AnnSelectG annotatedFields tableFromExp tablePermissions - tableArguments stringifyNumerics + pure $ RS.AnnSelectG annotatedFields tableFromExp tablePermissions + tableArguments stringifyNumerics where - AnnActionAsyncQuery actionName actionId outputType asyncFields definitionList stringifyNumerics = annAction - actionLogTable = QualifiedObject (SchemaName "hdb_catalog") (TableName "hdb_action_log") + AnnActionAsyncQuery _ actionId outputType asyncFields definitionList stringifyNumerics = annAction + + idColumn = (unsafePGCol "id", PGUUID) + responsePayloadColumn = (unsafePGCol "response_payload", PGJSONB) + createdAtColumn = (unsafePGCol "created_at", PGTimeStampTZ) + errorsColumn = (unsafePGCol "errors", PGJSONB) + sessionVarsColumn = (unsafePGCol "session_variables", PGJSONB) -- TODO (from master):- Avoid using ColumnInfo - mkAnnFldFromPGCol column' columnType = - flip RS.mkAnnColumnField Nothing $ - ColumnInfo (unsafePGCol column') (G.unsafeMkName column') 0 (ColumnScalar columnType) True Nothing + mkAnnFldFromPGCol = flip RS.mkAnnColumnField Nothing . mkPGColumnInfo + + mkPGColumnInfo (column', columnType) = + ColumnInfo column' (G.unsafeMkName $ getPGColTxt column') 0 (ColumnScalar columnType) True Nothing tableBoolExpression = let actionIdColumnInfo = ColumnInfo (unsafePGCol "id") $$(G.litName "id") 0 (ColumnScalar PGUUID) False Nothing - actionIdColumnEq = BoolFld $ AVCol actionIdColumnInfo [AEQ True actionId] - sessionVarsColumnInfo = ColumnInfo (unsafePGCol "session_variables") $$(G.litName "session_variables") - 0 (ColumnScalar PGJSONB) False Nothing + actionIdColumnEq = BoolFld $ AVCol actionIdColumnInfo [AEQ True $ UVLiteral $ S.SELit $ actionIdToText actionId] + sessionVarsColumnInfo = mkPGColumnInfo sessionVarsColumn sessionVarValue = UVParameter Nothing $ ColumnValue (ColumnScalar PGJSONB) $ PGValJSONB $ Q.JSONB $ J.toJSON $ _uiSession userInfo sessionVarsColumnEq = BoolFld $ AVCol sessionVarsColumnInfo [AEQ True sessionVarValue] @@ -287,14 +293,6 @@ resolveAsyncActionQuery userInfo annAction = in if isAdmin (_uiRole userInfo) then actionIdColumnEq else BoolAnd [actionIdColumnEq, sessionVarsColumnEq] -data ActionLogItem - = ActionLogItem - { _aliId :: !UUID.UUID - , _aliActionName :: !ActionName - , _aliRequestHeaders :: ![HTTP.Header] - , _aliSessionVariables :: !SessionVariables - , _aliInputPayload :: !J.Value - } deriving (Show, Eq) -- | Process async actions from hdb_catalog.hdb_action_log table. This functions is executed in a background thread. -- See Note [Async action architecture] above @@ -305,24 +303,20 @@ asyncActionsProcessor , MonadBaseControl IO m , LA.Forall (LA.Pure m) , Tracing.HasReporter m + , MonadMetadataStorage (MetadataStorageT m) ) => Env.Environment -> L.Logger L.Hasura - -> IORef (RebuildableSchemaCache Run, SchemaCacheVer) - -> Q.PGPool + -> IORef (RebuildableSchemaCache, SchemaCacheVer) -> HTTP.Manager -> m void -asyncActionsProcessor env logger cacheRef pgPool httpManager = forever $ do - asyncInvocations <- liftIO getUndeliveredEvents +asyncActionsProcessor env logger cacheRef httpManager = forever $ do + asyncInvocationsE <- runMetadataStorageT fetchUndeliveredActionEvents + asyncInvocations <- liftIO $ onLeft asyncInvocationsE mempty actionCache <- scActions . lastBuiltSchemaCache . fst <$> liftIO (readIORef cacheRef) LA.mapConcurrently_ (callHandler actionCache) asyncInvocations liftIO $ threadDelay (1 * 1000 * 1000) where - runTx :: (Monoid a) => Q.TxE QErr a -> IO a - runTx q = do - res <- runExceptT $ Q.runTx' pgPool q - onLeft res mempty - callHandler :: ActionCache -> ActionLogItem -> m () callHandler actionCache actionLogItem = Tracing.runTraceT "async actions processor" do let ActionLogItem actionId actionName reqHeaders @@ -340,61 +334,14 @@ asyncActionsProcessor env logger cacheRef pgPool httpManager = forever $ do actionContext = ActionContext actionName eitherRes <- runExceptT $ flip runReaderT logger $ callWebhook env httpManager outputType outputFields reqHeaders confHeaders - forwardClientHeaders webhookUrl (ActionWebhookPayload actionContext sessionVariables inputPayload) + forwardClientHeaders webhookUrl + (ActionWebhookPayload actionContext sessionVariables inputPayload) timeout + resE <- runMetadataStorageT $ setActionStatus actionId $ case eitherRes of + Left e -> AASError e + Right (responsePayload, _) -> AASCompleted $ J.toJSON responsePayload - liftIO $ case eitherRes of - Left e -> setError actionId e - Right (responsePayload, _) -> setCompleted actionId $ J.toJSON responsePayload - - setError :: UUID.UUID -> QErr -> IO () - setError actionId e = - runTx $ setErrorQuery actionId e - - setErrorQuery - :: UUID.UUID -> QErr -> Q.TxE QErr () - setErrorQuery actionId e = - Q.unitQE defaultTxErrorHandler [Q.sql| - update hdb_catalog.hdb_action_log - set errors = $1, status = 'error' - where id = $2 - |] (Q.AltJ e, actionId) False - - setCompleted :: UUID.UUID -> J.Value -> IO () - setCompleted actionId responsePayload = - runTx $ setCompletedQuery actionId responsePayload - - setCompletedQuery - :: UUID.UUID -> J.Value -> Q.TxE QErr () - setCompletedQuery actionId responsePayload = - Q.unitQE defaultTxErrorHandler [Q.sql| - update hdb_catalog.hdb_action_log - set response_payload = $1, status = 'completed' - where id = $2 - |] (Q.AltJ responsePayload, actionId) False - - undeliveredEventsQuery - :: Q.TxE QErr [ActionLogItem] - undeliveredEventsQuery = - map mapEvent <$> Q.listQE defaultTxErrorHandler [Q.sql| - update hdb_catalog.hdb_action_log set status = 'processing' - where - id in ( - select id from hdb_catalog.hdb_action_log - where status = 'created' - for update skip locked limit 10 - ) - returning - id, action_name, request_headers::json, session_variables::json, input_payload::json - |] () False - where - mapEvent (actionId, actionName, Q.AltJ headersMap, - Q.AltJ sessionVariables, Q.AltJ inputPayload) = - ActionLogItem actionId actionName (fromHeadersMap headersMap) sessionVariables inputPayload - - fromHeadersMap = map ((CI.mk . txtToBs) *** txtToBs) . Map.toList - - getUndeliveredEvents = runTx undeliveredEventsQuery + liftIO $ onLeft resE mempty callWebhook :: forall m r. @@ -428,7 +375,7 @@ callWebhook env manager outputType outputFields reqHeaders confHeaders requestBody = J.encode postPayload requestBodySize = BL.length requestBody url = unResolvedWebhook resolvedWebhook - responseTimeout = HTTP.responseTimeoutMicro $ unTimeout timeoutSeconds * 1000000 + responseTimeout = HTTP.responseTimeoutMicro $ (unTimeout timeoutSeconds) * 1000000 httpResponse <- do initReq <- liftIO $ HTTP.parseRequest (T.unpack url) let req = initReq { HTTP.method = "POST" @@ -515,14 +462,10 @@ callWebhook env manager outputType outputFields reqHeaders confHeaders Just v -> when (v == J.Null) $ throwUnexpected $ "expecting not null value for field " <>> fieldName -mkJsonAggSelect :: GraphQLType -> RS.JsonAggSelect -mkJsonAggSelect = - bool RS.JASSingleObject RS.JASMultipleRows . isListType - processOutputSelectionSet :: RS.ArgumentExp 'Postgres v -> GraphQLType - -> [(Column 'Postgres, ScalarType 'Postgres)] + -> [(PGCol, PGScalarType)] -> RS.AnnFieldsG 'Postgres v -> Bool -> RS.AnnSimpleSelG 'Postgres v @@ -537,3 +480,74 @@ processOutputSelectionSet tableRowInput actionOutputType definitionList annotate functionArgs = RS.FunctionArgsExp [tableRowInput] mempty selectFrom = RS.FromFunction jsonbToPostgresRecordFunction functionArgs $ Just definitionList + +mkJsonAggSelect :: GraphQLType -> RS.JsonAggSelect +mkJsonAggSelect = + bool RS.JASSingleObject RS.JASMultipleRows . isListType + +insertActionTx + :: ActionName -> SessionVariables -> [HTTP.Header] -> J.Value + -> Q.TxE QErr ActionId +insertActionTx actionName sessionVariables httpHeaders inputArgsPayload = + runIdentity . Q.getRow <$> Q.withQE defaultTxErrorHandler [Q.sql| + INSERT INTO + "hdb_catalog"."hdb_action_log" + ("action_name", "session_variables", "request_headers", "input_payload", "status") + VALUES + ($1, $2, $3, $4, $5) + RETURNING "id" + |] + ( actionName + , Q.AltJ sessionVariables + , Q.AltJ $ toHeadersMap httpHeaders + , Q.AltJ inputArgsPayload + , "created"::Text + ) False + where + toHeadersMap = Map.fromList . map ((bsToTxt . CI.original) *** bsToTxt) + +fetchUndeliveredActionEventsTx :: Q.TxE QErr [ActionLogItem] +fetchUndeliveredActionEventsTx = + map mapEvent <$> Q.listQE defaultTxErrorHandler [Q.sql| + update hdb_catalog.hdb_action_log set status = 'processing' + where + id in ( + select id from hdb_catalog.hdb_action_log + where status = 'created' + for update skip locked limit 10 + ) + returning + id, action_name, request_headers::json, session_variables::json, input_payload::json + |] () False + where + mapEvent (actionId, actionName, Q.AltJ headersMap, + Q.AltJ sessionVariables, Q.AltJ inputPayload) = + ActionLogItem actionId actionName (fromHeadersMap headersMap) sessionVariables inputPayload + + fromHeadersMap = map ((CI.mk . txtToBs) *** txtToBs) . Map.toList + +setActionStatusTx :: ActionId -> AsyncActionStatus -> Q.TxE QErr () +setActionStatusTx actionId = \case + AASCompleted responsePayload -> + Q.unitQE defaultTxErrorHandler [Q.sql| + update hdb_catalog.hdb_action_log + set response_payload = $1, status = 'completed' + where id = $2 + |] (Q.AltJ responsePayload, actionId) False + + AASError qerr -> + Q.unitQE defaultTxErrorHandler [Q.sql| + update hdb_catalog.hdb_action_log + set errors = $1, status = 'error' + where id = $2 + |] (Q.AltJ qerr, actionId) False + +fetchActionResponseTx :: ActionId -> Q.TxE QErr ActionLogResponse +fetchActionResponseTx actionId = do + (ca, rp, errs, Q.AltJ sessVars) <- + Q.getRow <$> Q.withQE defaultTxErrorHandler [Q.sql| + SELECT created_at, response_payload::json, errors::json, session_variables::json + FROM hdb_catalog.hdb_action_log + WHERE id = $1 + |] (Identity actionId) True + pure $ ActionLogResponse actionId ca (Q.getAltJ <$> rp) (Q.getAltJ <$> errs) sessVars diff --git a/server/src-lib/Hasura/GraphQL/Execute/Common.hs b/server/src-lib/Hasura/GraphQL/Execute/Common.hs index c22e046aee3..36cda50e33d 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Common.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Common.hs @@ -85,7 +85,7 @@ mkCurPlanTx env manager reqHdrs userInfo instrument ep = \case asSingleRowJsonResp (instrument q) prepArgs Just remoteJoins -> executeQueryWithRemoteJoins env manager reqHdrs userInfo q prepArgs remoteJoins - RFPActionQuery atx -> (atx, Nothing) + RFPActionQuery atx -> (unActionExecuteTx atx, Nothing) -- convert a query from an intermediate representation to... another irToRootFieldPlan diff --git a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs index 9fca7848bb7..7f953e3ed07 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/LiveQuery/Plan.hs @@ -33,7 +33,6 @@ import qualified Data.ByteString as B import qualified Data.HashMap.Strict as Map import qualified Data.HashMap.Strict.InsOrd as OMap import qualified Data.HashSet as Set -import qualified Data.Text as T import qualified Data.UUID.V4 as UUID import qualified Database.PG.Query as Q import qualified Database.PG.Query.PTI as PTI @@ -59,9 +58,10 @@ import Hasura.GraphQL.Context import Hasura.GraphQL.Execute.Action import Hasura.GraphQL.Execute.Query import Hasura.GraphQL.Parser.Column +import Hasura.Metadata.Class import Hasura.RQL.Types -import Hasura.SQL.Types import Hasura.Session +import Hasura.SQL.Types -- ------------------------------------------------------------------------------------------------- @@ -298,7 +298,7 @@ resolveMultiplexedValue = \case Nothing -> do syntheticVarIndex <- use (qpiSyntheticVariableValues . to length) modifying qpiSyntheticVariableValues (|> colVal) - pure ["synthetic", T.pack $ show syntheticVarIndex] + pure ["synthetic", tshow syntheticVarIndex] pure $ fromResVars (CollectableTypeScalar $ unsafePGColumnToBackend $ cvType colVal) varJsonPath UVSessionVar ty sessVar -> do modifying qpiReferencedSessionVariables (Set.insert sessVar) @@ -346,6 +346,7 @@ $(J.deriveToJSON (J.aesonDrop 4 J.snakeCase) ''ReusableLiveQueryPlan) -- of the plan if possible. buildLiveQueryPlan :: ( MonadError QErr m + , MonadMetadataStorage (MetadataStorageT m) , MonadIO m ) => PGExecCtx @@ -366,7 +367,9 @@ buildLiveQueryPlan pgExecCtx userInfo unpreparedAST = do when (remoteJoins /= mempty) $ throw400 NotSupported "Remote relationships are not allowed in subscriptions" _ -> pure () - traverseAction (DS.traverseAnnSimpleSelect resolveMultiplexedValue . resolveAsyncActionQuery userInfo) resolvedRootField + flip traverseAction resolvedRootField $ + (lift . liftEitherM . runMetadataStorageT . resolveAsyncActionQuery userInfo) + >=> DS.traverseAnnSimpleSelect resolveMultiplexedValue let multiplexedQuery = mkMultiplexedQuery preparedAST roleName = _uiRole userInfo diff --git a/server/src-lib/Hasura/GraphQL/Execute/Mutation.hs b/server/src-lib/Hasura/GraphQL/Execute/Mutation.hs index eb854874642..9251651fce5 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Mutation.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Mutation.hs @@ -34,6 +34,7 @@ import Hasura.GraphQL.Execute.Prepare import Hasura.GraphQL.Execute.Remote import Hasura.GraphQL.Execute.Resolve import Hasura.GraphQL.Parser +import Hasura.Metadata.Class import Hasura.RQL.Types import Hasura.Server.Version (HasVersion) import Hasura.Session @@ -102,6 +103,7 @@ convertMutationAction ::( HasVersion , MonadIO m , MonadError QErr m + , MonadMetadataStorage (MetadataStorageT m) , Tracing.MonadTrace m , Tracing.MonadTrace tx , MonadIO tx @@ -115,9 +117,10 @@ convertMutationAction -> ActionMutation 'Postgres (UnpreparedValue 'Postgres) -> m (tx EncJSON, HTTP.ResponseHeaders) convertMutationAction env logger userInfo manager reqHeaders = \case - AMSync s -> (_aerTransaction &&& _aerHeaders) <$> + AMSync s -> ((unActionExecuteTx . _aerExecution) &&& _aerHeaders) <$> resolveActionExecution env logger userInfo s actionExecContext - AMAsync s -> noResponseHeaders <$> resolveActionMutationAsync s reqHeaders userSession + AMAsync s -> (noResponseHeaders . unActionExecuteTx) <$> + liftEitherM (runMetadataStorageT $ resolveActionMutationAsync s reqHeaders userSession) where userSession = _uiSession userInfo actionExecContext = ActionExecContext manager reqHeaders $ _uiSession userInfo @@ -128,6 +131,7 @@ convertMutationSelectionSet , Tracing.MonadTrace m , MonadIO m , MonadError QErr m + , MonadMetadataStorage (MetadataStorageT m) , MonadTx tx , Tracing.MonadTrace tx , MonadIO tx diff --git a/server/src-lib/Hasura/GraphQL/Execute/Query.hs b/server/src-lib/Hasura/GraphQL/Execute/Query.hs index 092cfd2f190..533941981b1 100644 --- a/server/src-lib/Hasura/GraphQL/Execute/Query.hs +++ b/server/src-lib/Hasura/GraphQL/Execute/Query.hs @@ -38,6 +38,7 @@ import Hasura.GraphQL.Execute.Prepare import Hasura.GraphQL.Execute.Remote import Hasura.GraphQL.Execute.Resolve import Hasura.GraphQL.Parser +import Hasura.Metadata.Class import Hasura.RQL.Types import Hasura.Server.Version (HasVersion) import Hasura.Session @@ -146,10 +147,12 @@ class Monad m => MonadQueryInstrumentation m where instance MonadQueryInstrumentation m => MonadQueryInstrumentation (ReaderT r m) instance MonadQueryInstrumentation m => MonadQueryInstrumentation (ExceptT e m) instance MonadQueryInstrumentation m => MonadQueryInstrumentation (Tracing.TraceT m) +instance MonadQueryInstrumentation m => MonadQueryInstrumentation (MetadataStorageT m) convertQuerySelSet :: forall m tx . ( MonadError QErr m + , MonadMetadataStorage (MetadataStorageT m) , HasVersion , MonadIO m , Tracing.MonadTrace m @@ -208,13 +211,16 @@ convertQuerySelSet env logger gqlContext userInfo manager reqHeaders directives usrVars = _uiSession userInfo convertActionQuery - :: ActionQuery 'Postgres (UnpreparedValue 'Postgres) -> StateT PlanningSt m (ActionQueryPlan 'Postgres) + :: ActionQuery 'Postgres (UnpreparedValue 'Postgres) + -> StateT PlanningSt m (ActionQueryPlan 'Postgres) convertActionQuery = \case AQQuery s -> lift $ do result <- resolveActionExecution env logger userInfo s $ ActionExecContext manager reqHeaders usrVars - pure $ AQPQuery $ _aerTransaction result - AQAsync s -> AQPAsyncQuery <$> - DS.traverseAnnSimpleSelect prepareWithPlan (resolveAsyncActionQuery userInfo s) + pure $ AQPQuery $ _aerExecution result + AQAsync s -> do + unpreparedAst <- lift $ liftEitherM $ runMetadataStorageT $ + resolveAsyncActionQuery userInfo s + AQPAsyncQuery <$> DS.traverseAnnSimpleSelect prepareWithPlan unpreparedAst -- See Note [Temporarily disabling query plan caching] -- use the existing plan and new variables to create a pg query diff --git a/server/src-lib/Hasura/GraphQL/Explain.hs b/server/src-lib/Hasura/GraphQL/Explain.hs index a9ea9f3bddd..2dc3566a7d0 100644 --- a/server/src-lib/Hasura/GraphQL/Explain.hs +++ b/server/src-lib/Hasura/GraphQL/Explain.hs @@ -30,6 +30,7 @@ import Hasura.Backends.Postgres.Translate.Column (toTxtValue) import Hasura.EncJSON import Hasura.GraphQL.Context import Hasura.GraphQL.Parser +import Hasura.Metadata.Class import Hasura.RQL.DML.Internal import Hasura.RQL.Types import Hasura.SQL.Types @@ -111,6 +112,7 @@ explainGQLQuery :: forall m . ( MonadError QErr m , MonadIO m + , MonadMetadataStorage (MetadataStorageT m) ) => PGExecCtx -> SchemaCache diff --git a/server/src-lib/Hasura/GraphQL/Logging.hs b/server/src-lib/Hasura/GraphQL/Logging.hs index 8faefb6aba1..a76e8cc3301 100644 --- a/server/src-lib/Hasura/GraphQL/Logging.hs +++ b/server/src-lib/Hasura/GraphQL/Logging.hs @@ -12,6 +12,7 @@ import qualified Data.Aeson as J import qualified Language.GraphQL.Draft.Syntax as G import Hasura.GraphQL.Transport.HTTP.Protocol (GQLReqUnparsed) +import Hasura.Metadata.Class import Hasura.Prelude import Hasura.Server.Types (RequestId) import Hasura.Tracing (TraceT) @@ -59,3 +60,6 @@ instance MonadQueryLog m => MonadQueryLog (ReaderT r m) where instance MonadQueryLog m => MonadQueryLog (TraceT m) where logQueryLog l req sqlMap reqId = lift $ logQueryLog l req sqlMap reqId + +instance MonadQueryLog m => MonadQueryLog (MetadataStorageT m) where + logQueryLog l req sqlMap reqId = lift $ logQueryLog l req sqlMap reqId diff --git a/server/src-lib/Hasura/GraphQL/Parser/Internal/Parser.hs b/server/src-lib/Hasura/GraphQL/Parser/Internal/Parser.hs index bbea169bd79..a39c815c6b9 100644 --- a/server/src-lib/Hasura/GraphQL/Parser/Internal/Parser.hs +++ b/server/src-lib/Hasura/GraphQL/Parser/Internal/Parser.hs @@ -18,7 +18,7 @@ import qualified Data.HashMap.Strict.Extended as M import qualified Data.HashMap.Strict.InsOrd as OMap import qualified Data.HashSet as S import qualified Data.List.Extended as LE -import qualified Data.Text as T +import qualified Data.UUID as UUID import Control.Lens.Extended hiding (enum, index) import Data.Int (Int32, Int64) @@ -162,6 +162,19 @@ float = scalar floatScalar Nothing SRFloat string :: MonadParse m => Parser 'Both m Text string = scalar stringScalar Nothing SRString +uuid :: MonadParse m => Parser 'Both m UUID.UUID +uuid = Parser + { pType = schemaType + , pParser = peelVariable (Just $ toGraphQLType schemaType) >=> \case + GraphQLValue (VString s) -> parseUUID $ A.String s + JSONValue v -> parseUUID v + v -> typeMismatch name "a UUID" v + } + where + name = $$(litName "uuid") + schemaType = NonNullable $ TNamed $ mkDefinition name Nothing TIScalar + parseUUID = either (parseErrorWith ParseFailed . qeError) pure . runAesonParser A.parseJSON + -- | As an input type, any string or integer input value should be coerced to ID as Text -- https://spec.graphql.org/June2018/#sec-ID identifier :: MonadParse m => Parser 'Both m Text @@ -169,7 +182,7 @@ identifier = Parser { pType = schemaType , pParser = peelVariable (Just $ toGraphQLType schemaType) >=> \case GraphQLValue (VString s) -> pure s - GraphQLValue (VInt i) -> pure $ T.pack $ show i + GraphQLValue (VInt i) -> pure $ tshow i JSONValue (A.String s) -> pure s JSONValue (A.Number n) -> parseScientific n v -> typeMismatch idName "a String or a 32-bit integer" v @@ -178,7 +191,7 @@ identifier = Parser idName = idScalar schemaType = NonNullable $ TNamed $ mkDefinition idName Nothing TIScalar parseScientific = either (parseErrorWith ParseFailed . qeError) - (pure . T.pack . show @Int) . runAesonParser scientificToInteger + (pure . tshow @Int) . runAesonParser scientificToInteger namedJSON :: MonadParse m => Name -> Maybe Description -> Parser 'Both m A.Value namedJSON name description = Parser @@ -221,9 +234,8 @@ enum name description values = Parser where schemaType = NonNullable $ TNamed $ mkDefinition name description $ TIEnum (fst <$> values) valuesMap = M.fromList $ over (traverse._1) dName $ toList values - validate value = case M.lookup value valuesMap of - Just result -> pure result - Nothing -> parseError $ "expected one of the values " + validate value = onNothing (M.lookup value valuesMap) $ + parseError $ "expected one of the values " <> englishList "or" (toTxt . dName . fst <$> values) <> " for type " <> name <<> ", but found " <>> value diff --git a/server/src-lib/Hasura/GraphQL/Schema/Action.hs b/server/src-lib/Hasura/GraphQL/Schema/Action.hs index df4cc87640f..4befefbec66 100644 --- a/server/src-lib/Hasura/GraphQL/Schema/Action.hs +++ b/server/src-lib/Hasura/GraphQL/Schema/Action.hs @@ -77,7 +77,7 @@ actionExecute nonObjectTypeMap actionInfo = runMaybeT do -- -- > action_name(action_input_arguments) actionAsyncMutation - :: forall m n r. (BackendSchema 'Postgres, MonadSchema n m, MonadTableInfo 'Postgres r m, MonadRole r m) + :: forall m n r. (MonadSchema n m, MonadTableInfo 'Postgres r m, MonadRole r m) => NonObjectTypeMap -> ActionInfo 'Postgres -> m (Maybe (FieldParser n AnnActionMutationAsync)) @@ -85,10 +85,9 @@ actionAsyncMutation nonObjectTypeMap actionInfo = runMaybeT do roleName <- lift askRoleName guard $ roleName == adminRoleName || roleName `Map.member` permissions inputArguments <- lift $ actionInputArguments nonObjectTypeMap $ _adArguments definition - actionId <- lift actionIdParser let fieldName = unActionName actionName description = G.Description <$> comment - pure $ P.selection fieldName description inputArguments actionId + pure $ P.selection fieldName description inputArguments actionIdParser <&> AnnActionMutationAsync actionName where ActionInfo actionName _ definition permissions comment = actionInfo @@ -113,7 +112,6 @@ actionAsyncQuery actionAsyncQuery actionInfo = runMaybeT do roleName <- lift askRoleName guard $ roleName == adminRoleName || roleName `Map.member` permissions - actionId <- lift actionIdParser actionOutputParser <- lift $ actionOutputFields outputObject createdAtFieldParser <- lift $ columnParser @'Postgres (ColumnScalar PGTimeStampTZ) (G.Nullability False) @@ -123,9 +121,9 @@ actionAsyncQuery actionInfo = runMaybeT do let fieldName = unActionName actionName description = G.Description <$> comment actionIdInputField = - P.field idFieldName (Just idFieldDescription) actionId + P.field idFieldName (Just idFieldDescription) actionIdParser allFieldParsers = - let idField = P.selection_ idFieldName (Just idFieldDescription) actionId $> AsyncId + let idField = P.selection_ idFieldName (Just idFieldDescription) actionIdParser $> AsyncId createdAtField = P.selection_ $$(G.litName "created_at") (Just "the time at which this action was created") createdAtFieldParser $> AsyncCreatedAt @@ -159,10 +157,8 @@ actionAsyncQuery actionInfo = runMaybeT do -- | Async action's unique id actionIdParser - :: (BackendSchema 'Postgres, MonadSchema n m, MonadError QErr m) - => m (Parser 'Both n (UnpreparedValue 'Postgres)) -actionIdParser = - fmap P.mkParameter <$> columnParser (ColumnScalar PGUUID) (G.Nullability False) + :: MonadParse n => Parser 'Both n ActionId +actionIdParser = ActionId <$> P.uuid actionOutputFields :: forall m n r. (BackendSchema 'Postgres, MonadSchema n m, MonadTableInfo 'Postgres r m, MonadRole r m, Has QueryContext r) diff --git a/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs b/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs index a43c100b63a..8080c37c02d 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/HTTP.hs @@ -17,39 +17,40 @@ module Hasura.GraphQL.Transport.HTTP , ResultsFragment(..) ) where -import Control.Monad.Morph (hoist) +import Control.Monad.Morph (hoist) import Hasura.EncJSON import Hasura.GraphQL.Context -import Hasura.GraphQL.Execute.Prepare (ExecutionPlan) -import Hasura.GraphQL.Logging (MonadQueryLog (..)) -import Hasura.GraphQL.Parser.Column (UnpreparedValue) +import Hasura.GraphQL.Execute.Prepare (ExecutionPlan) +import Hasura.GraphQL.Logging (MonadQueryLog (..)) +import Hasura.GraphQL.Parser.Column (UnpreparedValue) import Hasura.GraphQL.Transport.HTTP.Protocol import Hasura.HTTP +import Hasura.Metadata.Class import Hasura.Prelude import Hasura.RQL.Types import Hasura.Server.Init.Config -import Hasura.Server.Types (RequestId) -import Hasura.Server.Version (HasVersion) +import Hasura.Server.Types (RequestId) +import Hasura.Server.Version (HasVersion) import Hasura.Session -import Hasura.Tracing (MonadTrace, TraceT, trace) +import Hasura.Tracing (MonadTrace, TraceT, trace) -import qualified Data.Aeson as J -import qualified Data.Aeson.Ordered as JO -import qualified Data.ByteString.Lazy as LBS -import qualified Data.Environment as Env -import qualified Data.HashMap.Strict.InsOrd as OMap -import qualified Data.Text as T -import qualified Database.PG.Query as Q +import qualified Data.Aeson as J +import qualified Data.Aeson.Ordered as JO +import qualified Data.ByteString.Lazy as LBS +import qualified Data.Environment as Env +import qualified Data.HashMap.Strict.InsOrd as OMap +import qualified Data.Text as T +import qualified Database.PG.Query as Q import qualified Hasura.Backends.Postgres.Execute.RemoteJoin as RJ -import qualified Hasura.GraphQL.Execute as E -import qualified Hasura.GraphQL.Execute.Query as EQ -import qualified Hasura.Logging as L -import qualified Hasura.Server.Telemetry.Counters as Telem -import qualified Hasura.Tracing as Tracing -import qualified Language.GraphQL.Draft.Syntax as G -import qualified Network.HTTP.Types as HTTP -import qualified Network.Wai.Extended as Wai +import qualified Hasura.GraphQL.Execute as E +import qualified Hasura.GraphQL.Execute.Query as EQ +import qualified Hasura.Logging as L +import qualified Hasura.Server.Telemetry.Counters as Telem +import qualified Hasura.Tracing as Tracing +import qualified Language.GraphQL.Draft.Syntax as G +import qualified Network.HTTP.Types as HTTP +import qualified Network.Wai.Extended as Wai data QueryCacheKey = QueryCacheKey { qckQueryString :: !GQLReqParsed @@ -108,6 +109,10 @@ instance MonadExecuteQuery m => MonadExecuteQuery (TraceT m) where cacheLookup a b c = hoist (hoist lift) $ cacheLookup a b c cacheStore a b = hoist (hoist lift) $ cacheStore a b +instance MonadExecuteQuery m => MonadExecuteQuery (MetadataStorageT m) where + cacheLookup a b c = hoist (hoist lift) $ cacheLookup a b c + cacheStore a b = hoist (hoist lift) $ cacheStore a b + data ResultsFragment = ResultsFragment { rfTimeIO :: DiffTime , rfLocality :: Telem.Locality @@ -127,6 +132,7 @@ runGQ , MonadTrace m , MonadExecuteQuery m , EQ.MonadQueryInstrumentation m + , MonadMetadataStorage (MetadataStorageT m) ) => Env.Environment -> L.Logger L.Hasura @@ -254,6 +260,7 @@ runGQBatched , MonadTrace m , MonadExecuteQuery m , EQ.MonadQueryInstrumentation m + , MonadMetadataStorage (MetadataStorageT m) ) => Env.Environment -> L.Logger L.Hasura diff --git a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs index 4cc02140257..2d1d1cfc52d 100644 --- a/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs +++ b/server/src-lib/Hasura/GraphQL/Transport/WebSocket.hs @@ -56,6 +56,7 @@ import Hasura.GraphQL.Transport.HTTP (MonadExecuteQuery import Hasura.GraphQL.Transport.HTTP.Protocol import Hasura.GraphQL.Transport.WebSocket.Protocol import Hasura.HTTP +import Hasura.Metadata.Class import Hasura.Prelude import Hasura.RQL.Types import Hasura.Server.Auth (AuthMode, UserAuthentication, @@ -72,9 +73,9 @@ import qualified Hasura.GraphQL.Execute.LiveQuery.Poll as LQ import qualified Hasura.GraphQL.Execute.Query as EQ import qualified Hasura.GraphQL.Transport.WebSocket.Server as WS import qualified Hasura.Logging as L +import Hasura.Server.Init.Config (KeepAliveDelay (..)) import qualified Hasura.Server.Telemetry.Counters as Telem import qualified Hasura.Tracing as Tracing -import Hasura.Server.Init.Config (KeepAliveDelay (..)) -- | 'LQ.LiveQueryId' comes from 'Hasura.GraphQL.Execute.LiveQuery.State.addLiveQuery'. We use -- this to track a connection's operations so we can remove them from 'LiveQueryState', and @@ -332,6 +333,7 @@ onStart , Tracing.MonadTrace m , MonadExecuteQuery m , EQ.MonadQueryInstrumentation m + , MonadMetadataStorage (MetadataStorageT m) ) => Env.Environment -> WSServerEnv -> WSConn -> StartMsg -> m () onStart env serverEnv wsConn (StartMsg opId q) = catchAndIgnore $ do @@ -531,6 +533,7 @@ onMessage , Tracing.HasReporter m , MonadExecuteQuery m , EQ.MonadQueryInstrumentation m + , MonadMetadataStorage (MetadataStorageT m) ) => Env.Environment -> AuthMode @@ -722,6 +725,7 @@ createWSServerApp , Tracing.HasReporter m , MonadExecuteQuery m , EQ.MonadQueryInstrumentation m + , MonadMetadataStorage (MetadataStorageT m) ) => Env.Environment -> AuthMode diff --git a/server/src-lib/Hasura/Metadata/Class.hs b/server/src-lib/Hasura/Metadata/Class.hs index 6ca4443ff30..795c2434be6 100644 --- a/server/src-lib/Hasura/Metadata/Class.hs +++ b/server/src-lib/Hasura/Metadata/Class.hs @@ -1,21 +1,35 @@ -- | This module has type class and types which implements the Metadata Storage Abstraction {-# LANGUAGE UndecidableInstances #-} module Hasura.Metadata.Class - ( MetadataStorageT(..) + ( SchemaSyncEventProcessResult(..) + , MetadataStorageT(..) , runMetadataStorageT , MonadMetadataStorage(..) + , MonadScheduledEvents(..) ) where -import Control.Monad.Morph (MFunctor) +import Control.Monad.Morph (MFunctor, hoist) +import Control.Monad.Trans.Control (MonadBaseControl) +import Data.Aeson + +import qualified Network.HTTP.Types as HTTP import Hasura.Eventing.HTTP import Hasura.Eventing.ScheduledTrigger.Types import Hasura.Prelude import Hasura.RQL.Types +import Hasura.Server.Types +import Hasura.Session import qualified Hasura.Tracing as Tracing +data SchemaSyncEventProcessResult + = SchemaSyncEventProcessResult + { _sseprShouldReload :: !Bool + , _sseprCacheInvalidations :: !CacheInvalidations + } + {- Note [Todo: Common interface for eventing sub-system] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Postgres tables' event triggers and scheduled event triggers are similar in the @@ -36,10 +50,11 @@ TODO: Reference to open issue or rfc? -- -- This class has functions broadly related to: -- --- 1. Metadata Management (TODO: Need to be added to the type class) +-- 1. Metadata Management -- ---------------------- -- Basic metadata management functions such as retrieving metadata from storage -- database and replacing the given metadata. +-- TODO: Console specific operations -- -- 2. Scheduled Triggers -- --------------------- @@ -51,7 +66,7 @@ TODO: Reference to open issue or rfc? -- - Deleting an scheduled event -- - Creating an one-off scheduled event -- --- 3. Async Actions (TODO: Need to be added to the type class) +-- 3. Async Actions -- ---------------- -- Operations to implement async actions sub-system. This includes recording an -- async action event and retreiving the details of action delivery to the webhook. @@ -63,6 +78,12 @@ TODO: Reference to open issue or rfc? class (MonadError QErr m) => MonadMetadataStorage m where + -- Metadata + fetchMetadata :: m Metadata + setMetadata :: Metadata -> m () + notifySchemaCacheSync :: InstanceId -> CacheInvalidations -> m () + processSchemaSyncEventPayload :: InstanceId -> Value -> m SchemaSyncEventProcessResult + -- Scheduled triggers -- TODO:- -- Ideally we would've liked to avoid having functions that are specific to @@ -77,8 +98,22 @@ class (MonadError QErr m) => MonadMetadataStorage m where setScheduledEventOp :: ScheduledEventId -> ScheduledEventOp -> ScheduledEventType -> m () unlockScheduledEvents :: ScheduledEventType -> [ScheduledEventId] -> m Int unlockAllLockedScheduledEvents :: m () + clearFutureCronEvents :: TriggerName -> m () + + -- Async actions + insertAction + :: ActionName -> SessionVariables -> [HTTP.Header] -> Value + -> m ActionId + fetchUndeliveredActionEvents :: m [ActionLogItem] + setActionStatus :: ActionId -> AsyncActionStatus -> m () + fetchActionResponse :: ActionId -> m ActionLogResponse instance (MonadMetadataStorage m) => MonadMetadataStorage (ReaderT r m) where + fetchMetadata = lift fetchMetadata + setMetadata = lift . setMetadata + notifySchemaCacheSync a b = lift $ notifySchemaCacheSync a b + processSchemaSyncEventPayload a b = lift $ processSchemaSyncEventPayload a b + getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats getScheduledEventsForDelivery = lift getScheduledEventsForDelivery insertScheduledEvent = lift . insertScheduledEvent @@ -86,8 +121,39 @@ instance (MonadMetadataStorage m) => MonadMetadataStorage (ReaderT r m) where setScheduledEventOp a b c = lift $ setScheduledEventOp a b c unlockScheduledEvents a b = lift $ unlockScheduledEvents a b unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents + clearFutureCronEvents = lift . clearFutureCronEvents + + insertAction a b c d = lift $ insertAction a b c d + fetchUndeliveredActionEvents = lift fetchUndeliveredActionEvents + setActionStatus a b = lift $ setActionStatus a b + fetchActionResponse = lift . fetchActionResponse + +instance (MonadMetadataStorage m) => MonadMetadataStorage (StateT s m) where + fetchMetadata = lift fetchMetadata + setMetadata = lift . setMetadata + notifySchemaCacheSync a b = lift $ notifySchemaCacheSync a b + processSchemaSyncEventPayload a b = lift $ processSchemaSyncEventPayload a b + + getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats + getScheduledEventsForDelivery = lift getScheduledEventsForDelivery + insertScheduledEvent = lift . insertScheduledEvent + insertScheduledEventInvocation a b = lift $ insertScheduledEventInvocation a b + setScheduledEventOp a b c = lift $ setScheduledEventOp a b c + unlockScheduledEvents a b = lift $ unlockScheduledEvents a b + unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents + clearFutureCronEvents = lift . clearFutureCronEvents + + insertAction a b c d = lift $ insertAction a b c d + fetchUndeliveredActionEvents = lift fetchUndeliveredActionEvents + setActionStatus a b = lift $ setActionStatus a b + fetchActionResponse = lift . fetchActionResponse instance (MonadMetadataStorage m) => MonadMetadataStorage (Tracing.TraceT m) where + fetchMetadata = lift fetchMetadata + setMetadata = lift . setMetadata + notifySchemaCacheSync a b = lift $ notifySchemaCacheSync a b + processSchemaSyncEventPayload a b = lift $ processSchemaSyncEventPayload a b + getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats getScheduledEventsForDelivery = lift getScheduledEventsForDelivery insertScheduledEvent = lift . insertScheduledEvent @@ -95,6 +161,52 @@ instance (MonadMetadataStorage m) => MonadMetadataStorage (Tracing.TraceT m) whe setScheduledEventOp a b c = lift $ setScheduledEventOp a b c unlockScheduledEvents a b = lift $ unlockScheduledEvents a b unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents + clearFutureCronEvents = lift . clearFutureCronEvents + + insertAction a b c d = lift $ insertAction a b c d + fetchUndeliveredActionEvents = lift fetchUndeliveredActionEvents + setActionStatus a b = lift $ setActionStatus a b + fetchActionResponse = lift . fetchActionResponse + +instance (MonadMetadataStorage m) => MonadMetadataStorage (LazyTxT QErr m) where + fetchMetadata = lift fetchMetadata + setMetadata = lift . setMetadata + notifySchemaCacheSync a b = lift $ notifySchemaCacheSync a b + processSchemaSyncEventPayload a b = lift $ processSchemaSyncEventPayload a b + + getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats + getScheduledEventsForDelivery = lift getScheduledEventsForDelivery + insertScheduledEvent = lift . insertScheduledEvent + insertScheduledEventInvocation a b = lift $ insertScheduledEventInvocation a b + setScheduledEventOp a b c = lift $ setScheduledEventOp a b c + unlockScheduledEvents a b = lift $ unlockScheduledEvents a b + unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents + clearFutureCronEvents = lift . clearFutureCronEvents + + insertAction a b c d = lift $ insertAction a b c d + fetchUndeliveredActionEvents = lift fetchUndeliveredActionEvents + setActionStatus a b = lift $ setActionStatus a b + fetchActionResponse = lift . fetchActionResponse + +instance (MonadMetadataStorage m) => MonadMetadataStorage (MetadataT m) where + fetchMetadata = lift fetchMetadata + setMetadata = lift . setMetadata + notifySchemaCacheSync a b = lift $ notifySchemaCacheSync a b + processSchemaSyncEventPayload a b = lift $ processSchemaSyncEventPayload a b + + getDeprivedCronTriggerStats = lift getDeprivedCronTriggerStats + getScheduledEventsForDelivery = lift getScheduledEventsForDelivery + insertScheduledEvent = lift . insertScheduledEvent + insertScheduledEventInvocation a b = lift $ insertScheduledEventInvocation a b + setScheduledEventOp a b c = lift $ setScheduledEventOp a b c + unlockScheduledEvents a b = lift $ unlockScheduledEvents a b + unlockAllLockedScheduledEvents = lift unlockAllLockedScheduledEvents + clearFutureCronEvents = lift . clearFutureCronEvents + + insertAction a b c d = lift $ insertAction a b c d + fetchUndeliveredActionEvents = lift fetchUndeliveredActionEvents + setActionStatus a b = lift $ setActionStatus a b + fetchActionResponse = lift . fetchActionResponse {- Note [Generic MetadataStorageT transformer] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -147,13 +259,55 @@ newtype MetadataStorageT m a = MetadataStorageT {unMetadataStorageT :: ExceptT QErr m a} deriving ( Functor, Applicative, Monad , MonadError QErr + , MonadReader r + , MonadState s , MonadTrans , MonadIO , MFunctor , Tracing.HasReporter + , Tracing.MonadTrace ) +deriving instance (MonadBase IO m) => MonadBase IO (MetadataStorageT m) +deriving instance (MonadBaseControl IO m) => MonadBaseControl IO (MetadataStorageT m) + runMetadataStorageT :: MetadataStorageT m a -> m (Either QErr a) runMetadataStorageT = runExceptT . unMetadataStorageT + +instance (Monad m, Monad (t m), MonadTrans t, MonadMetadataStorage (MetadataStorageT m)) + => MonadMetadataStorage (MetadataStorageT (t m)) where + + fetchMetadata = hoist lift fetchMetadata + setMetadata = hoist lift . setMetadata + notifySchemaCacheSync a b = hoist lift $ notifySchemaCacheSync a b + processSchemaSyncEventPayload a b = hoist lift $ processSchemaSyncEventPayload a b + + getDeprivedCronTriggerStats = hoist lift getDeprivedCronTriggerStats + getScheduledEventsForDelivery = hoist lift getScheduledEventsForDelivery + insertScheduledEvent = hoist lift . insertScheduledEvent + insertScheduledEventInvocation a b = hoist lift $ insertScheduledEventInvocation a b + setScheduledEventOp a b c = hoist lift $ setScheduledEventOp a b c + unlockScheduledEvents a b = hoist lift $ unlockScheduledEvents a b + unlockAllLockedScheduledEvents = hoist lift unlockAllLockedScheduledEvents + clearFutureCronEvents = hoist lift . clearFutureCronEvents + + insertAction a b c d = hoist lift $ insertAction a b c d + fetchUndeliveredActionEvents = hoist lift fetchUndeliveredActionEvents + setActionStatus a b = hoist lift $ setActionStatus a b + fetchActionResponse = hoist lift . fetchActionResponse + +class (MonadMetadataStorage m) => MonadScheduledEvents m where + -- | Record a cron/one-off event + createScheduledEvent :: ScheduledEventSeed -> m () + createScheduledEvent = insertScheduledEvent + + -- | Clear cron events + dropFutureCronEvents :: TriggerName -> m () + dropFutureCronEvents = clearFutureCronEvents + +instance (MonadScheduledEvents m) => MonadScheduledEvents (ReaderT r m) +instance (MonadScheduledEvents m) => MonadScheduledEvents (StateT s m) +instance (MonadScheduledEvents m) => MonadScheduledEvents (Tracing.TraceT m) +instance (MonadScheduledEvents m) => MonadScheduledEvents (MetadataT m) diff --git a/server/src-lib/Hasura/Prelude.hs b/server/src-lib/Hasura/Prelude.hs index ebda762d220..4e628b50269 100644 --- a/server/src-lib/Hasura/Prelude.hs +++ b/server/src-lib/Hasura/Prelude.hs @@ -11,6 +11,7 @@ module Hasura.Prelude , choice , afold , bsToTxt + , lbsToTxt , txtToBs , base64Decode , spanMaybeM @@ -119,6 +120,9 @@ afold = getAlt . foldMap pure bsToTxt :: B.ByteString -> Text bsToTxt = TE.decodeUtf8With TE.lenientDecode +lbsToTxt :: BL.ByteString -> Text +lbsToTxt = bsToTxt . BL.toStrict + txtToBs :: Text -> B.ByteString txtToBs = TE.encodeUtf8 diff --git a/server/src-lib/Hasura/RQL/DDL/ScheduledTrigger.hs b/server/src-lib/Hasura/RQL/DDL/ScheduledTrigger.hs index 9e50571e714..ffa5fa633d4 100644 --- a/server/src-lib/Hasura/RQL/DDL/ScheduledTrigger.hs +++ b/server/src-lib/Hasura/RQL/DDL/ScheduledTrigger.hs @@ -6,24 +6,26 @@ module Hasura.RQL.DDL.ScheduledTrigger , runCreateScheduledEvent ) where -import Hasura.Backends.Postgres.Connection import Hasura.EncJSON import Hasura.Eventing.ScheduledTrigger +import Hasura.Metadata.Class import Hasura.Prelude -import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf) +import Hasura.RQL.DDL.EventTrigger (getHeaderInfosFromConf) import Hasura.RQL.Types -import qualified Data.Environment as Env -import qualified Data.HashMap.Strict as Map -import qualified Data.HashMap.Strict.InsOrd as OMap -import qualified Data.Time.Clock as C -import qualified Database.PG.Query as Q +import qualified Data.Environment as Env +import qualified Data.HashMap.Strict as Map +import qualified Data.HashMap.Strict.InsOrd as OMap +import qualified Data.Time.Clock as C -- | runCreateCronTrigger will update a existing cron trigger when the 'replace' -- value is set to @true@ and when replace is @false@ a new cron trigger will -- be created runCreateCronTrigger - :: (CacheRWM m, MonadTx m, MonadIO m, MetadataM m) => CreateCronTrigger -> m EncJSON + :: ( CacheRWM m, MonadIO m + , MetadataM m, MonadScheduledEvents m + ) + => CreateCronTrigger -> m EncJSON runCreateCronTrigger CreateCronTrigger {..} = do let q = CronTriggerMetadata cctName cctWebhook @@ -53,7 +55,7 @@ runCreateCronTrigger CreateCronTrigger {..} = do $ metaCronTriggers %~ OMap.insert cctName metadata currentTime <- liftIO C.getCurrentTime let scheduleTimes = generateScheduleTimes currentTime 100 cctCronSchedule -- generate next 100 events - liftTx $ insertScheduledEventTx $ SESCron $ map (CronEventSeed cctName) scheduleTimes + createScheduledEvent $ SESCron $ map (CronEventSeed cctName) scheduleTimes return successMsg resolveCronTrigger @@ -75,9 +77,9 @@ resolveCronTrigger env CronTriggerMetadata{..} = do updateCronTrigger :: ( CacheRWM m - , MonadTx m , MonadIO m , MetadataM m + , MonadScheduledEvents m ) => CronTriggerMetadata -> m EncJSON updateCronTrigger cronTriggerMetadata = do @@ -86,16 +88,16 @@ updateCronTrigger cronTriggerMetadata = do buildSchemaCacheFor (MOCronTrigger triggerName) $ MetadataModifier $ metaCronTriggers %~ OMap.insert triggerName cronTriggerMetadata - liftTx $ dropFutureCronEvents triggerName + dropFutureCronEvents triggerName currentTime <- liftIO C.getCurrentTime let scheduleTimes = generateScheduleTimes currentTime 100 $ ctSchedule cronTriggerMetadata - liftTx $ insertScheduledEventTx $ SESCron $ map (CronEventSeed triggerName) scheduleTimes + createScheduledEvent $ SESCron $ map (CronEventSeed triggerName) scheduleTimes pure successMsg runDeleteCronTrigger :: ( CacheRWM m - , MonadTx m , MetadataM m + , MonadScheduledEvents m ) => ScheduledTriggerName -> m EncJSON runDeleteCronTrigger (ScheduledTriggerName stName) = do @@ -103,37 +105,17 @@ runDeleteCronTrigger (ScheduledTriggerName stName) = do withNewInconsistentObjsCheck $ buildSchemaCache $ dropCronTriggerInMetadata stName - liftTx $ dropFutureCronEvents stName + dropFutureCronEvents stName return successMsg -dropFutureCronEvents :: TriggerName -> Q.TxE QErr () -dropFutureCronEvents name = - Q.unitQE defaultTxErrorHandler - [Q.sql| - DELETE FROM hdb_catalog.hdb_cron_events - WHERE trigger_name = $1 AND scheduled_time > now() AND tries = 0 - |] (Identity name) False - dropCronTriggerInMetadata :: TriggerName -> MetadataModifier dropCronTriggerInMetadata name = MetadataModifier $ metaCronTriggers %~ OMap.delete name -runCreateScheduledEvent :: (MonadTx m) => CreateScheduledEvent -> m EncJSON -runCreateScheduledEvent CreateScheduledEvent {..} = do - liftTx $ Q.unitQE defaultTxErrorHandler - [Q.sql| - INSERT INTO hdb_catalog.hdb_scheduled_events - (webhook_conf,scheduled_time,payload,retry_conf,header_conf,comment) - VALUES - ($1, $2, $3, $4, $5, $6) - |] ( Q.AltJ cseWebhook - , cseScheduleAt - , Q.AltJ csePayload - , Q.AltJ cseRetryConf - , Q.AltJ cseHeaders - , cseComment) - False - pure successMsg +runCreateScheduledEvent + :: (MonadScheduledEvents m) => CreateScheduledEvent -> m EncJSON +runCreateScheduledEvent = + (createScheduledEvent . SESOneOff) >=> \() -> pure successMsg checkExists :: (CacheRM m, MonadError QErr m) => TriggerName -> m () checkExists name = do diff --git a/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs b/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs index 22e4051308a..34cbdda4edc 100644 --- a/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs +++ b/server/src-lib/Hasura/RQL/DDL/Schema/Cache.hs @@ -38,6 +38,7 @@ import Hasura.Backends.Postgres.Connection import Hasura.Backends.Postgres.SQL.Types import Hasura.GraphQL.Execute.Types import Hasura.GraphQL.Schema (buildGQLContext) +import Hasura.Metadata.Class import Hasura.RQL.DDL.Action import Hasura.RQL.DDL.CustomTypes import Hasura.RQL.DDL.Deps @@ -48,7 +49,6 @@ import Hasura.RQL.DDL.Schema.Cache.Common import Hasura.RQL.DDL.Schema.Cache.Dependencies import Hasura.RQL.DDL.Schema.Cache.Fields import Hasura.RQL.DDL.Schema.Cache.Permission -import Hasura.RQL.DDL.Schema.Catalog import Hasura.RQL.DDL.Schema.Common import Hasura.RQL.DDL.Schema.Diff import Hasura.RQL.DDL.Schema.Function @@ -57,12 +57,12 @@ import Hasura.RQL.Types hiding (fmFunction, tm import Hasura.Server.Version (HasVersion) buildRebuildableSchemaCache - :: (HasVersion, MonadIO m, MonadUnique m, MonadTx m, HasHttpManager m, HasSQLGenCtx m) + :: (HasVersion, MonadIO m, MonadTx m, HasHttpManager m, HasSQLGenCtx m) => Env.Environment - -> m (RebuildableSchemaCache m) -buildRebuildableSchemaCache env = do - metadata <- liftTx fetchMetadataFromCatalog - result <- flip runReaderT CatalogSync $ + -> Metadata + -> m RebuildableSchemaCache +buildRebuildableSchemaCache env metadata = do + result <- runCacheBuild $ flip runReaderT CatalogSync $ Inc.build (buildSchemaCacheRule env) (metadata, initialInvalidationKeys) pure $ RebuildableSchemaCache (Inc.result result) initialInvalidationKeys (Inc.rebuildRule result) @@ -70,14 +70,15 @@ newtype CacheRWT m a -- The CacheInvalidations component of the state could actually be collected using WriterT, but -- WriterT implementations prior to transformers-0.5.6.0 (which added -- Control.Monad.Trans.Writer.CPS) are leaky, and we don’t have that yet. - = CacheRWT (StateT (RebuildableSchemaCache m, CacheInvalidations) m a) + = CacheRWT (StateT (RebuildableSchemaCache, CacheInvalidations) m a) deriving ( Functor, Applicative, Monad, MonadIO, MonadUnique, MonadReader r, MonadError e, MonadTx - , UserInfoM, HasHttpManager, HasSQLGenCtx, HasSystemDefined) + , UserInfoM, HasHttpManager, HasSQLGenCtx, HasSystemDefined, MonadMetadataStorage + , MonadScheduledEvents) runCacheRWT :: Functor m - => RebuildableSchemaCache m -> CacheRWT m a -> m (a, RebuildableSchemaCache m, CacheInvalidations) + => RebuildableSchemaCache -> CacheRWT m a -> m (a, RebuildableSchemaCache, CacheInvalidations) runCacheRWT cache (CacheRWT m) = runStateT m (cache, mempty) <&> \(v, (newCache, invalidations)) -> (v, newCache, invalidations) @@ -88,11 +89,12 @@ instance (Monad m) => TableCoreInfoRM (CacheRWT m) instance (Monad m) => CacheRM (CacheRWT m) where askSchemaCache = CacheRWT $ gets (lastBuiltSchemaCache . fst) -instance (MonadIO m, MonadTx m) => CacheRWM (CacheRWT m) where +instance (MonadIO m, MonadTx m, HasHttpManager m, HasSQLGenCtx m) => CacheRWM (CacheRWT m) where buildSchemaCacheWithOptions buildReason invalidations metadata = CacheRWT do (RebuildableSchemaCache _ invalidationKeys rule, oldInvalidations) <- get let newInvalidationKeys = invalidateKeys invalidations invalidationKeys - result <- lift $ flip runReaderT buildReason $ Inc.build rule (metadata, newInvalidationKeys) + result <- lift $ runCacheBuild $ flip runReaderT buildReason $ + Inc.build rule (metadata, newInvalidationKeys) let schemaCache = Inc.result result prunedInvalidationKeys = pruneInvalidationKeys schemaCache newInvalidationKeys !newCache = RebuildableSchemaCache schemaCache prunedInvalidationKeys (Inc.rebuildRule result) diff --git a/server/src-lib/Hasura/RQL/DDL/Schema/Cache/Common.hs b/server/src-lib/Hasura/RQL/DDL/Schema/Cache/Common.hs index e5a9c14a597..182d25eb239 100644 --- a/server/src-lib/Hasura/RQL/DDL/Schema/Cache/Common.hs +++ b/server/src-lib/Hasura/RQL/DDL/Schema/Cache/Common.hs @@ -1,5 +1,6 @@ -{-# LANGUAGE Arrows #-} -{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE Arrows #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE UndecidableInstances #-} -- | Types/functions shared between modules that implement "Hasura.RQL.DDL.Schema.Cache". Other -- modules should not import this module directly. @@ -11,16 +12,18 @@ import qualified Data.HashMap.Strict.Extended as M import qualified Data.HashMap.Strict.InsOrd as OMap import qualified Data.HashSet as HS import qualified Data.Sequence as Seq +import qualified Network.HTTP.Client as HTTP import Control.Arrow.Extended import Control.Lens +import Control.Monad.Trans.Control (MonadBaseControl) +import Control.Monad.Unique import Data.Text.Extended import qualified Hasura.Incremental as Inc import Hasura.Backends.Postgres.SQL.Types import Hasura.RQL.Types -import Hasura.RQL.Types.Run -- | 'InvalidationKeys' used to apply requested 'CacheInvalidations'. data InvalidationKeys = InvalidationKeys @@ -105,17 +108,53 @@ data BuildOutputs } $(makeLenses ''BuildOutputs) -data RebuildableSchemaCache m +-- | Parameters required for schema cache build +data CacheBuildParams + = CacheBuildParams + { _cbpManager :: !HTTP.Manager + , _cbpSqlGenCtx :: !SQLGenCtx + } + +-- | The monad in which @'RebuildableSchemaCache' is being run +newtype CacheBuild a + = CacheBuild {unCacheBuild :: ReaderT CacheBuildParams (LazyTxT QErr IO) a} + deriving ( Functor, Applicative, Monad + , MonadError QErr + , MonadReader CacheBuildParams + , MonadIO + , MonadTx + , MonadBase IO + , MonadBaseControl IO + , MonadUnique + ) + +instance HasHttpManager CacheBuild where + askHttpManager = asks _cbpManager + +instance HasSQLGenCtx CacheBuild where + askSQLGenCtx = asks _cbpSqlGenCtx + +runCacheBuild + :: ( MonadIO m + , HasHttpManager m + , HasSQLGenCtx m + , MonadTx m + ) + => CacheBuild a -> m a +runCacheBuild (CacheBuild m) = do + httpManager <- askHttpManager + sqlGenCtx <- askSQLGenCtx + let params = CacheBuildParams httpManager sqlGenCtx + liftTx $ lazyTxToQTx (runReaderT m params) + +data RebuildableSchemaCache = RebuildableSchemaCache { lastBuiltSchemaCache :: !SchemaCache , _rscInvalidationMap :: !InvalidationKeys - , _rscRebuild :: !(Inc.Rule (ReaderT BuildReason m) (Metadata, InvalidationKeys) SchemaCache) + , _rscRebuild :: !(Inc.Rule (ReaderT BuildReason CacheBuild) (Metadata, InvalidationKeys) SchemaCache) } $(makeLenses ''RebuildableSchemaCache) -type CacheBuildM = ReaderT BuildReason Run -type CacheBuildA = WriterA (Seq CollectedInfo) (Inc.Rule CacheBuildM) - bindErrorA :: (ArrowChoice arr, ArrowKleisli m arr, ArrowError e arr, MonadError e m) => arr (m a) a diff --git a/server/src-lib/Hasura/RQL/Types/Action.hs b/server/src-lib/Hasura/RQL/Types/Action.hs index b8d8a2c1747..f33a64fa0ff 100644 --- a/server/src-lib/Hasura/RQL/Types/Action.hs +++ b/server/src-lib/Hasura/RQL/Types/Action.hs @@ -50,6 +50,12 @@ module Hasura.RQL.Types.Action , ActionExecContext(..) , AsyncActionQueryFieldG(..) , AnnActionAsyncQuery(..) + + , ActionId(..) + , actionIdToText + , ActionLogItem(..) + , ActionLogResponse(..) + , AsyncActionStatus(..) ) where @@ -59,6 +65,8 @@ import qualified Data.Aeson as J import qualified Data.Aeson.Casing as J import qualified Data.Aeson.TH as J import qualified Data.HashMap.Strict as Map +import qualified Data.Time.Clock as UTC +import qualified Data.UUID as UUID import qualified Database.PG.Query as Q import qualified Language.GraphQL.Draft.Syntax as G import qualified Network.HTTP.Client as HTTP @@ -72,6 +80,7 @@ import Hasura.RQL.DDL.Headers import Hasura.RQL.IR.Select import Hasura.RQL.Types.Common import Hasura.RQL.Types.CustomTypes +import Hasura.RQL.Types.Error import Hasura.Session import Hasura.SQL.Backend @@ -300,7 +309,7 @@ type AsyncActionQueryFieldsG b v = Fields (AsyncActionQueryFieldG b v) data AnnActionAsyncQuery (b :: BackendType) v = AnnActionAsyncQuery { _aaaqName :: !ActionName - , _aaaqActionId :: !v + , _aaaqActionId :: !ActionId , _aaaqOutputType :: !GraphQLType , _aaaqFields :: !(AsyncActionQueryFieldsG b v) , _aaaqDefinitionList :: ![(Column b, ScalarType b)] @@ -313,3 +322,32 @@ data ActionExecContext , _aecHeaders :: !HTTP.RequestHeaders , _aecSessionVariables :: !SessionVariables } + +newtype ActionId = ActionId {unActionId :: UUID.UUID} + deriving (Show, Eq, Q.ToPrepArg, Q.FromCol, J.ToJSON, J.FromJSON) + +actionIdToText :: ActionId -> Text +actionIdToText = UUID.toText . unActionId + +data ActionLogItem + = ActionLogItem + { _aliId :: !ActionId + , _aliActionName :: !ActionName + , _aliRequestHeaders :: ![HTTP.Header] + , _aliSessionVariables :: !SessionVariables + , _aliInputPayload :: !J.Value + } deriving (Show, Eq) + +data ActionLogResponse + = ActionLogResponse + { _alrId :: !ActionId + , _alrCreatedAt :: !UTC.UTCTime + , _alrResponsePayload :: !(Maybe J.Value) + , _alrErrors :: !(Maybe J.Value) + , _alrSessionVariables :: !SessionVariables + } deriving (Show, Eq) +$(J.deriveJSON (J.aesonDrop 4 J.snakeCase) ''ActionLogResponse) + +data AsyncActionStatus + = AASCompleted !J.Value + | AASError !QErr diff --git a/server/src-lib/Hasura/RQL/Types/Run.hs b/server/src-lib/Hasura/RQL/Types/Run.hs index 61993934e92..17b93e40eb9 100644 --- a/server/src-lib/Hasura/RQL/Types/Run.hs +++ b/server/src-lib/Hasura/RQL/Types/Run.hs @@ -1,7 +1,7 @@ {-# LANGUAGE UndecidableInstances #-} module Hasura.RQL.Types.Run - ( Run(..) + ( RunT(..) , RunCtx(..) , peelRun ) where @@ -14,6 +14,7 @@ import qualified Network.HTTP.Client as HTTP import Control.Monad.Trans.Control (MonadBaseControl) import Control.Monad.Unique +import Hasura.Metadata.Class import Hasura.RQL.Types import qualified Hasura.Tracing as Tracing @@ -25,37 +26,43 @@ data RunCtx , _rcSqlGenCtx :: !SQLGenCtx } -newtype Run a - = Run { unRun :: ReaderT RunCtx (LazyTxT QErr IO) a } +newtype RunT m a + = RunT { unRunT :: ReaderT RunCtx (LazyTxT QErr m) a } deriving ( Functor, Applicative, Monad , MonadError QErr , MonadReader RunCtx , MonadTx , MonadIO - , MonadBase IO - , MonadBaseControl IO , MonadUnique + , MonadMetadataStorage ) -instance UserInfoM Run where +instance (MonadMetadataStorage m) => MonadScheduledEvents (RunT m) + +deriving instance (MonadIO m, MonadBase IO m) => MonadBase IO (RunT m) +deriving instance (MonadIO m, MonadBaseControl IO m) => MonadBaseControl IO (RunT m) + +instance (Monad m) => UserInfoM (RunT m) where askUserInfo = asks _rcUserInfo -instance HasHttpManager Run where +instance (Monad m) => HasHttpManager (RunT m) where askHttpManager = asks _rcHttpMgr -instance HasSQLGenCtx Run where +instance (Monad m) => HasSQLGenCtx (RunT m) where askSQLGenCtx = asks _rcSqlGenCtx peelRun - :: (MonadIO m) + :: ( MonadIO m + , MonadBaseControl IO m + ) => RunCtx -> PGExecCtx -> Q.TxAccess -> Maybe Tracing.TraceContext - -> Run a + -> RunT m a -> ExceptT QErr m a -peelRun runCtx pgExecCtx txAccess ctx (Run m) = - mapExceptT liftIO $ runLazyTx pgExecCtx txAccess $ +peelRun runCtx pgExecCtx txAccess ctx (RunT m) = + runLazyTx pgExecCtx txAccess $ maybe id withTraceContext ctx $ withUserInfo userInfo $ runReaderT m runCtx where userInfo = _rcUserInfo runCtx diff --git a/server/src-lib/Hasura/Server/API/Query.hs b/server/src-lib/Hasura/Server/API/Query.hs index cc29e17cb59..7f9b7048009 100644 --- a/server/src-lib/Hasura/Server/API/Query.hs +++ b/server/src-lib/Hasura/Server/API/Query.hs @@ -3,6 +3,7 @@ module Hasura.Server.API.Query where import Control.Lens +import Control.Monad.Trans.Control (MonadBaseControl) import Control.Monad.Unique import Data.Aeson import Data.Aeson.Casing @@ -14,6 +15,7 @@ import qualified Database.PG.Query as Q import qualified Network.HTTP.Client as HTTP import Hasura.EncJSON +import Hasura.Metadata.Class import Hasura.Prelude import Hasura.RQL.DDL.Action import Hasura.RQL.DDL.ComputedField @@ -175,55 +177,36 @@ $(deriveJSON ''RQLQueryV2 ) --- | Using @pg_notify@ function to publish schema sync events to other server --- instances via 'hasura_schema_update' channel. --- See Note [Schema Cache Sync] -notifySchemaCacheSync :: InstanceId -> CacheInvalidations -> Q.TxE QErr () -notifySchemaCacheSync instanceId invalidations = do - Q.Discard () <- Q.withQE defaultTxErrorHandler [Q.sql| - SELECT pg_notify('hasura_schema_update', json_build_object( - 'instance_id', $1, - 'occurred_at', NOW(), - 'invalidations', $2 - )::text - ) - |] (instanceId, Q.AltJ invalidations) True - pure () - runQuery - :: (HasVersion, MonadIO m, MonadError QErr m, Tracing.MonadTrace m) + :: ( HasVersion, MonadIO m, Tracing.MonadTrace m + , MonadBaseControl IO m, MonadMetadataStorage m + ) => Env.Environment -> PGExecCtx -> InstanceId - -> UserInfo -> RebuildableSchemaCache Run -> HTTP.Manager - -> SQLGenCtx -> RQLQuery -> m (EncJSON, RebuildableSchemaCache Run) + -> UserInfo -> RebuildableSchemaCache -> HTTP.Manager + -> SQLGenCtx -> RQLQuery -> m (EncJSON, RebuildableSchemaCache) runQuery env pgExecCtx instanceId userInfo sc hMgr sqlGenCtx query = do accessMode <- getQueryAccessMode query traceCtx <- Tracing.currentContext + metadata <- fetchMetadata result <- runQueryM env query & Tracing.interpTraceT \x -> do - ((js, meta), rsc, ci) <- - x & withMetadata + (((js, tracemeta), meta), rsc, ci) <- + x & runMetadataT metadata & runCacheRWT sc & peelRun runCtx pgExecCtx accessMode (Just traceCtx) & runExceptT & liftEitherM - pure ((js, rsc, ci), meta) + pure ((js, rsc, ci, meta), tracemeta) withReload result where runCtx = RunCtx userInfo hMgr sqlGenCtx - withMetadata :: (MonadTx m) => MetadataT m a -> m a - withMetadata m = do - metadata <- liftTx fetchMetadataFromCatalog - (r, modifiedMetadata) <- runMetadataT metadata m - when (queryModifiesSchemaCache query) $ - liftTx $ setMetadataInCatalog modifiedMetadata - pure r - - withReload (result, updatedCache, invalidations) = do + withReload (result, updatedCache, invalidations, updatedMetadata) = do when (queryModifiesSchemaCache query) $ do - e <- liftIO $ runExceptT $ runLazyTx pgExecCtx Q.ReadWrite $ liftTx $ - notifySchemaCacheSync instanceId invalidations - liftEither e - return (result, updatedCache) + -- set modified metadata in storage + setMetadata updatedMetadata + -- notify schema cache sync + notifySchemaCacheSync instanceId invalidations + pure (result, updatedCache) -- | A predicate that determines whether the given query might modify/rebuild the schema cache. If -- so, it needs to acquire the global lock on the schema cache so that other queries do not modify @@ -360,6 +343,7 @@ runQueryM , MonadIO m, MonadUnique m, HasHttpManager m, HasSQLGenCtx m , Tracing.MonadTrace m , MetadataM m + , MonadScheduledEvents m ) => Env.Environment -> RQLQuery diff --git a/server/src-lib/Hasura/Server/App.hs b/server/src-lib/Hasura/Server/App.hs index 836ab4222b6..e714ad017d1 100644 --- a/server/src-lib/Hasura/Server/App.hs +++ b/server/src-lib/Hasura/Server/App.hs @@ -26,7 +26,6 @@ import qualified Web.Spock.Core as Spock import Control.Concurrent.MVar.Lifted import Control.Exception (IOException, try) -import Control.Monad.Morph (hoist) import Control.Monad.Stateless import Control.Monad.Trans.Control (MonadBaseControl) import Data.Aeson hiding (json) @@ -54,9 +53,9 @@ import qualified Hasura.Tracing as Tracing import Hasura.EncJSON import Hasura.GraphQL.Logging (MonadQueryLog (..)) import Hasura.HTTP +import Hasura.Metadata.Class import Hasura.RQL.DDL.Schema import Hasura.RQL.Types -import Hasura.RQL.Types.Run import Hasura.Server.API.Config (runGetConfig) import Hasura.Server.API.Query import Hasura.Server.Auth (AuthMode (..), UserAuthentication (..)) @@ -87,7 +86,7 @@ data SchemaCacheRef -- the IORef) where we serve a request with a stale schemacache but I guess -- it is an okay trade-off to pay for a higher throughput (I remember doing a -- bunch of benchmarks to test this hypothesis). - , _scrCache :: IORef (RebuildableSchemaCache Run, SchemaCacheVer) + , _scrCache :: IORef (RebuildableSchemaCache, SchemaCacheVer) , _scrOnChange :: IO () -- ^ an action to run when schemacache changes } @@ -120,7 +119,7 @@ data HandlerCtx , hcSourceIpAddress :: !Wai.IpAddress } -type Handler m = ExceptT QErr (ReaderT HandlerCtx m) +type Handler m = ReaderT HandlerCtx (MetadataStorageT m) data APIResp = JSONResp !(HttpResponse EncJSON) @@ -130,7 +129,6 @@ data APIHandler m a = AHGet !(Handler m APIResp) | AHPost !(a -> Handler m APIResp) - boolToText :: Bool -> Text boolToText = bool "false" "true" @@ -147,7 +145,7 @@ logInconsObjs logger objs = withSCUpdate :: (MonadIO m, MonadBaseControl IO m) - => SchemaCacheRef -> L.Logger L.Hasura -> m (a, RebuildableSchemaCache Run) -> m a + => SchemaCacheRef -> L.Logger L.Hasura -> m (a, RebuildableSchemaCache) -> m a withSCUpdate scr logger action = withMVarMasked lk $ \() -> do (!res, !newSC) <- action @@ -194,13 +192,13 @@ parseBody reqBody = Left e -> throw400 InvalidJSON (T.pack e) Right jVal -> decodeValue jVal -onlyAdmin :: (Monad m) => Handler m () +onlyAdmin :: (MonadError QErr m, MonadReader HandlerCtx m) => m () onlyAdmin = do uRole <- asks (_uiRole . hcUser) when (uRole /= adminRoleName) $ throw400 AccessDenied "You have to be an admin to access this endpoint" -buildQCtx :: (MonadIO m) => Handler m QCtx +buildQCtx :: (MonadIO m, MonadReader HandlerCtx m) => m QCtx buildQCtx = do scRef <- asks (scCacheRef . hcServerCtx) userInfo <- asks hcUser @@ -213,11 +211,18 @@ setHeader (headerName, headerValue) = Spock.setHeader (bsToTxt $ CI.original headerName) (bsToTxt headerValue) -- | Typeclass representing the metadata API authorization effect -class Monad m => MetadataApiAuthorization m where - authorizeMetadataApi :: HasVersion => RQLQuery -> UserInfo -> Handler m () +class (Monad m) => MetadataApiAuthorization m where + authorizeMetadataApi + :: HasVersion => RQLQuery -> HandlerCtx -> m (Either QErr ()) + +instance MetadataApiAuthorization m => MetadataApiAuthorization (ReaderT r m) where + authorizeMetadataApi q hc = lift $ authorizeMetadataApi q hc + +instance MetadataApiAuthorization m => MetadataApiAuthorization (MetadataStorageT m) where + authorizeMetadataApi q hc = lift $ authorizeMetadataApi q hc instance MetadataApiAuthorization m => MetadataApiAuthorization (Tracing.TraceT m) where - authorizeMetadataApi q ui = hoist (hoist lift) $ authorizeMetadataApi q ui + authorizeMetadataApi q hc = lift $ authorizeMetadataApi q hc -- | The config API (/v1alpha1/config) handler class Monad m => MonadConfigApiHandler m where @@ -307,15 +312,16 @@ mkSpockAction serverCtx qErrEncoder qErrModifier apiHandler = do scResponseInternalErrorsConfig serverCtx limits <- lift askResourceLimits + let runHandler = runMetadataStorageT . flip runReaderT handlerState . runResourceLimits limits (serviceTime, (result, q)) <- withElapsedTime $ case apiHandler of AHGet handler -> do - res <- lift $ runReaderT (runExceptT (runResourceLimits limits handler)) handlerState + res <- lift $ runHandler handler return (res, Nothing) AHPost handler -> do parsedReqE <- runExceptT $ parseBody reqBody parsedReq <- onLeft parsedReqE (logErrorAndResp (Just userInfo) requestId req (reqBody, Nothing) includeInternal headers . qErrModifier) - res <- lift $ runReaderT (runExceptT . runResourceLimits limits $ handler parsedReq) handlerState + res <- lift $ runHandler $ handler parsedReq return (res, Just parsedReq) -- apply the error modifier @@ -364,12 +370,14 @@ mkSpockAction serverCtx qErrEncoder qErrModifier apiHandler = do v1QueryHandler - :: (HasVersion, MonadIO m, MonadBaseControl IO m, MetadataApiAuthorization m, Tracing.MonadTrace m) + :: ( HasVersion, MonadIO m, MonadBaseControl IO m, MetadataApiAuthorization m, Tracing.MonadTrace m + , MonadReader HandlerCtx m + , MonadMetadataStorage m + ) => RQLQuery - -> Handler m (HttpResponse EncJSON) + -> m (HttpResponse EncJSON) v1QueryHandler query = do - userInfo <- asks hcUser - authorizeMetadataApi query userInfo + (liftEitherM . authorizeMetadataApi query) =<< ask scRef <- asks (scCacheRef . hcServerCtx) logger <- asks (scLogger . hcServerCtx) res <- bool (fst <$> dbAction) (withSCUpdate scRef logger dbAction) $ queryModifiesSchemaCache query @@ -395,9 +403,12 @@ v1Alpha1GQHandler , Tracing.MonadTrace m , GH.MonadExecuteQuery m , EQ.MonadQueryInstrumentation m + , MonadError QErr m + , MonadReader HandlerCtx m + , MonadMetadataStorage (MetadataStorageT m) ) => E.GraphQLQueryType -> GH.GQLBatchedReqs GH.GQLQueryText - -> Handler m (HttpResponse EncJSON) + -> m (HttpResponse EncJSON) v1Alpha1GQHandler queryType query = do userInfo <- asks hcUser reqHeaders <- asks hcReqHeaders @@ -428,9 +439,12 @@ v1GQHandler , Tracing.MonadTrace m , GH.MonadExecuteQuery m , EQ.MonadQueryInstrumentation m + , MonadError QErr m + , MonadReader HandlerCtx m + , MonadMetadataStorage (MetadataStorageT m) ) => GH.GQLBatchedReqs GH.GQLQueryText - -> Handler m (HttpResponse EncJSON) + -> m (HttpResponse EncJSON) v1GQHandler = v1Alpha1GQHandler E.QueryHasura v1GQRelayHandler @@ -441,15 +455,22 @@ v1GQRelayHandler , Tracing.MonadTrace m , GH.MonadExecuteQuery m , EQ.MonadQueryInstrumentation m + , MonadError QErr m + , MonadReader HandlerCtx m + , MonadMetadataStorage (MetadataStorageT m) ) => GH.GQLBatchedReqs GH.GQLQueryText - -> Handler m (HttpResponse EncJSON) + -> m (HttpResponse EncJSON) v1GQRelayHandler = v1Alpha1GQHandler E.QueryRelay gqlExplainHandler - :: forall m. (MonadIO m) + :: forall m. ( MonadIO m + , MonadError QErr m + , MonadReader HandlerCtx m + , MonadMetadataStorage (MetadataStorageT m) + ) => GE.GQLExplain - -> Handler (Tracing.TraceT m) (HttpResponse EncJSON) + -> m (HttpResponse EncJSON) gqlExplainHandler query = do onlyAdmin scRef <- asks (scCacheRef . hcServerCtx) @@ -468,7 +489,7 @@ gqlExplainHandler query = do res <- GE.explainGQLQuery pgExecCtx sc query return $ HttpResponse res [] -v1Alpha1PGDumpHandler :: (MonadIO m) => PGD.PGDumpReqBody -> Handler m APIResp +v1Alpha1PGDumpHandler :: (MonadIO m, MonadError QErr m, MonadReader HandlerCtx m) => PGD.PGDumpReqBody -> m APIResp v1Alpha1PGDumpHandler b = do onlyAdmin ci <- asks (scConnInfo . hcServerCtx) @@ -518,9 +539,9 @@ renderHtmlTemplate template jVal = newtype LegacyQueryParser m = LegacyQueryParser - { getLegacyQueryParser :: PG.QualifiedTable -> Object -> Handler m RQLQueryV1 } + { getLegacyQueryParser :: PG.QualifiedTable -> Object -> m RQLQueryV1 } -queryParsers :: (Monad m) => M.HashMap Text (LegacyQueryParser m) +queryParsers :: (MonadError QErr m) => M.HashMap Text (LegacyQueryParser m) queryParsers = M.fromList [ ("select", mkLegacyQueryParser RQSelect) @@ -537,9 +558,12 @@ queryParsers = return $ f q legacyQueryHandler - :: (HasVersion, MonadIO m, MonadBaseControl IO m, MetadataApiAuthorization m, Tracing.MonadTrace m) + :: ( HasVersion, MonadIO m, MonadBaseControl IO m, MetadataApiAuthorization m, Tracing.MonadTrace m + , MonadReader HandlerCtx m + , MonadMetadataStorage m + ) => PG.TableName -> Text -> Object - -> Handler m (HttpResponse EncJSON) + -> m (HttpResponse EncJSON) legacyQueryHandler tn queryType req = case M.lookup queryType queryParsers of Just queryParser -> getLegacyQueryParser queryParser qt req >>= v1QueryHandler . RQV1 @@ -587,6 +611,7 @@ mkWaiApp , GH.MonadExecuteQuery m , EQ.MonadQueryInstrumentation m , HasResourceLimits m + , MonadMetadataStorage (MetadataStorageT m) ) => Env.Environment -- ^ Set of environment variables for reference in UIs @@ -621,7 +646,7 @@ mkWaiApp -> E.PlanCacheOptions -> ResponseInternalErrorsConfig -> Maybe EL.LiveQueryPostPollHook - -> RebuildableSchemaCache Run + -> RebuildableSchemaCache -> EKG.Store -> WS.ConnectionOptions -> KeepAliveDelay @@ -699,6 +724,7 @@ httpApp , Tracing.HasReporter m , GH.MonadExecuteQuery m , EQ.MonadQueryInstrumentation m + , MonadMetadataStorage (MetadataStorageT m) , HasResourceLimits m ) => CorsConfig diff --git a/server/src-lib/Hasura/Server/Migrate.hs b/server/src-lib/Hasura/Server/Migrate.hs index 1b71900449e..cadb2933752 100644 --- a/server/src-lib/Hasura/Server/Migrate.hs +++ b/server/src-lib/Hasura/Server/Migrate.hs @@ -32,7 +32,6 @@ import qualified Database.PG.Query.Connection as Q import qualified Language.Haskell.TH.Lib as TH import qualified Language.Haskell.TH.Syntax as TH -import Control.Monad.Unique import Data.Time.Clock (UTCTime) import System.Directory (doesFileExist) @@ -85,22 +84,24 @@ migrateCatalog . ( HasVersion , MonadIO m , MonadTx m - , MonadUnique m , HasHttpManager m , HasSQLGenCtx m ) => Env.Environment -> UTCTime - -> m (MigrationResult, RebuildableSchemaCache m) + -> m (MigrationResult, RebuildableSchemaCache) migrateCatalog env migrationTime = do - doesSchemaExist (SchemaName "hdb_catalog") >>= \case + migrationResult <- doesSchemaExist (SchemaName "hdb_catalog") >>= \case False -> initialize True True -> doesTableExist (SchemaName "hdb_catalog") (TableName "hdb_version") >>= \case False -> initialize False True -> migrateFrom =<< getCatalogVersion + metadata <- liftTx fetchMetadataFromCatalog + schemaCache <- buildRebuildableSchemaCache env metadata + pure (migrationResult, schemaCache) where -- initializes the catalog, creating the schema if necessary - initialize :: Bool -> m (MigrationResult, RebuildableSchemaCache m) + initialize :: Bool -> m MigrationResult initialize createSchema = do liftTx $ Q.catchE defaultTxErrorHandler $ when createSchema $ Q.unitQ "CREATE SCHEMA hdb_catalog" () False @@ -114,9 +115,8 @@ migrateCatalog env migrationTime = do <> "PostgreSQL server. Please make sure this extension is available." runTx $(Q.sqlFromFile "src-rsr/initialise.sql") - schemaCache <- buildRebuildableSchemaCache env updateCatalogVersion - pure (MRInitialized, schemaCache) + pure MRInitialized where needsPGCryptoError e@(Q.PGTxErr _ _ _ err) = case err of @@ -136,11 +136,9 @@ migrateCatalog env migrationTime = do <> " https://hasura.io/docs/1.0/graphql/manual/deployment/postgres-permissions.html" -- migrates an existing catalog to the latest version from an existing verion - migrateFrom :: Text -> m (MigrationResult, RebuildableSchemaCache m) + migrateFrom :: Text -> m MigrationResult migrateFrom previousVersion - | previousVersion == latestCatalogVersionString = do - schemaCache <- buildRebuildableSchemaCache env - pure (MRNothingToDo, schemaCache) + | previousVersion == latestCatalogVersionString = pure MRNothingToDo | [] <- neededMigrations = throw400 NotSupported $ "Cannot use database previously used with a newer version of graphql-engine (expected" @@ -148,9 +146,8 @@ migrateCatalog env migrationTime = do <> " is " <> previousVersion <> ")." | otherwise = do traverse_ (mpMigrate . snd) neededMigrations - schemaCache <- buildRebuildableSchemaCache env updateCatalogVersion - pure (MRMigrated previousVersion, schemaCache) + pure $ MRMigrated previousVersion where neededMigrations = dropWhile ((/= previousVersion) . fst) (migrations False) diff --git a/server/src-lib/Hasura/Server/SchemaUpdate.hs b/server/src-lib/Hasura/Server/SchemaUpdate.hs index 7cb8f4e9069..f42114cbe38 100644 --- a/server/src-lib/Hasura/Server/SchemaUpdate.hs +++ b/server/src-lib/Hasura/Server/SchemaUpdate.hs @@ -2,14 +2,16 @@ module Hasura.Server.SchemaUpdate ( startSchemaSyncListenerThread , startSchemaSyncProcessorThread + , EventPayload(..) , SchemaSyncCtx(..) ) where import Hasura.Backends.Postgres.Connection import Hasura.Logging +import Hasura.Metadata.Class import Hasura.Prelude -import Hasura.RQL.DDL.Schema (fetchMetadataFromCatalog, runCacheRWT) +import Hasura.RQL.DDL.Schema (runCacheRWT) import Hasura.RQL.Types import Hasura.RQL.Types.Run import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate) @@ -17,6 +19,7 @@ import Hasura.Server.Logging import Hasura.Server.Types (InstanceId (..)) import Hasura.Session +import Control.Monad.Trans.Control (MonadBaseControl) import Data.Aeson import Data.Aeson.Casing import Data.Aeson.TH @@ -74,7 +77,7 @@ $(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload) data SchemaSyncEvent = SSEListenStart !UTC.UTCTime - | SSEPayload !EventPayload + | SSEPayload !Value instance ToJSON SchemaSyncEvent where toJSON = \case @@ -82,7 +85,7 @@ instance ToJSON SchemaSyncEvent where SSEPayload payload -> toJSON payload data ThreadError - = TEJsonParse !Text + = TEPayloadParse !Text | TEQueryError !QErr $(deriveToJSON defaultOptions { constructorTagModifier = snakeCase . drop 2 @@ -177,7 +180,7 @@ startSchemaSyncListenerThread pool logger instanceId = do -- | An async thread which processes the schema sync events -- See Note [Schema Cache Sync] startSchemaSyncProcessorThread - :: (MonadIO m) + :: (C.ForkableMonadIO m, MonadMetadataStorage (MetadataStorageT m)) => SQLGenCtx -> PG.PGPool -> Logger Hasura @@ -190,7 +193,7 @@ startSchemaSyncProcessorThread startSchemaSyncProcessorThread sqlGenCtx pool logger httpMgr schemaSyncEventRef cacheRef instanceId cacheInitStartTime = do -- Start processor thread - processorThread <- liftIO $ C.forkImmortal "SchemeUpdate.processor" logger $ + processorThread <- C.forkImmortal "SchemeUpdate.processor" logger $ processor sqlGenCtx pool logger httpMgr schemaSyncEventRef cacheRef instanceId cacheInitStartTime logThreadStarted logger instanceId TTProcessor processorThread pure processorThread @@ -220,7 +223,7 @@ listener pool logger updateEventRef = STM.atomically $ STM.writeTVar updateEventRef $ Just $ SSEListenStart time PG.PNEPQNotify notif -> case eitherDecodeStrict $ PQ.notifyExtra notif of - Left e -> logError logger threadType $ TEJsonParse $ T.pack e + Left e -> logError logger threadType $ TEPayloadParse $ T.pack e Right payload -> do logInfo logger threadType $ object ["received_event" .= payload] #ifndef PROFILING @@ -239,7 +242,11 @@ listener pool logger updateEventRef = -- | An IO action that processes events from Queue, in a loop forever. processor - :: SQLGenCtx + :: forall m void. + ( C.ForkableMonadIO m + , MonadMetadataStorage (MetadataStorageT m) + ) + => SQLGenCtx -> PG.PGPool -> Logger Hasura -> HTTP.Manager @@ -247,15 +254,29 @@ processor -> SchemaCacheRef -> InstanceId -> UTC.UTCTime - -> IO void + -> m void processor sqlGenCtx pool logger httpMgr updateEventRef cacheRef instanceId cacheInitStartTime = -- Never exits forever $ do - event <- STM.atomically getLatestEvent + event <- liftIO $ STM.atomically getLatestEvent logInfo logger threadType $ object ["processed_event" .= event] - when (shouldReload event) $ - refreshSchemaCache sqlGenCtx pool logger httpMgr cacheRef (getCacheInvalidations event) + (shouldReload, cacheInvalidations) <- case event of + SSEListenStart time -> + -- If listening started after cache initialization, just refresh the schema cache unconditionally. + -- See Note [Schema Cache Sync] + pure $ (time > cacheInitStartTime, mempty) + SSEPayload payload -> do + eitherResult <- runMetadataStorageT $ processSchemaSyncEventPayload instanceId payload + case eitherResult of + Left e -> do + logError logger threadType $ TEPayloadParse $ qeError e + pure (False, mempty) + Right SchemaSyncEventProcessResult{..} -> + pure (_sseprShouldReload, _sseprCacheInvalidations) + + when shouldReload $ + refreshSchemaCache sqlGenCtx pool logger httpMgr cacheRef cacheInvalidations threadType "schema cache reloaded" where -- checks if there is an event @@ -269,36 +290,30 @@ processor sqlGenCtx pool logger httpMgr updateEventRef Nothing -> STM.retry threadType = TTProcessor - shouldReload = \case - SSEListenStart time -> - -- If listening started after cache initialization, just refresh the schema cache unconditionally. - -- See Note [Schema Cache Sync] - time > cacheInitStartTime - SSEPayload payload -> - -- When event is from other sever instance - _epInstanceId payload /= instanceId - - getCacheInvalidations = \case - SSEListenStart _ -> mempty - SSEPayload payload -> _epInvalidations payload - refreshSchemaCache - :: SQLGenCtx + :: ( MonadIO m + , MonadBaseControl IO m + , MonadMetadataStorage (MetadataStorageT m) + ) + => SQLGenCtx -> PG.PGPool -> Logger Hasura -> HTTP.Manager -> SchemaCacheRef -> CacheInvalidations -> ThreadType - -> Text -> IO () + -> Text -> m () refreshSchemaCache sqlGenCtx pool logger httpManager cacheRef invalidations threadType msg = do -- Reload schema cache from catalog - resE <- liftIO $ runExceptT $ withSCUpdate cacheRef logger do - rebuildableCache <- fst <$> liftIO (readIORef $ _scrCache cacheRef) - ((), cache, _) <- fetchMetadataAndBuildCache - & runCacheRWT rebuildableCache - & peelRun runCtx pgCtx PG.ReadWrite Nothing - pure ((), cache) + eitherMetadata <- runMetadataStorageT fetchMetadata + resE <- runExceptT $ do + metadata <- liftEither eitherMetadata + withSCUpdate cacheRef logger do + rebuildableCache <- fst <$> liftIO (readIORef $ _scrCache cacheRef) + ((), cache, _) <- buildSchemaCacheWithOptions CatalogSync invalidations metadata + & runCacheRWT rebuildableCache + & peelRun runCtx pgCtx PG.ReadWrite Nothing + pure ((), cache) case resE of Left e -> logError logger threadType $ TEQueryError e Right () -> logInfo logger threadType $ object ["message" .= msg] @@ -306,15 +321,11 @@ refreshSchemaCache sqlGenCtx pool logger httpManager cacheRef invalidations thre runCtx = RunCtx adminUserInfo httpManager sqlGenCtx pgCtx = mkPGExecCtx PG.Serializable pool - fetchMetadataAndBuildCache = do - metadata <- liftTx fetchMetadataFromCatalog - buildSchemaCacheWithOptions CatalogSync invalidations metadata - -logInfo :: Logger Hasura -> ThreadType -> Value -> IO () +logInfo :: (MonadIO m) => Logger Hasura -> ThreadType -> Value -> m () logInfo logger threadType val = unLogger logger $ SchemaSyncThreadLog LevelInfo threadType val -logError :: ToJSON a => Logger Hasura -> ThreadType -> a -> IO () +logError :: (MonadIO m, ToJSON a) => Logger Hasura -> ThreadType -> a -> m () logError logger threadType err = unLogger logger $ SchemaSyncThreadLog LevelError threadType $ object ["error" .= toJSON err] diff --git a/server/src-test/Hasura/Server/MigrateSpec.hs b/server/src-test/Hasura/Server/MigrateSpec.hs index de5f4b087b1..ea895bf4657 100644 --- a/server/src-test/Hasura/Server/MigrateSpec.hs +++ b/server/src-test/Hasura/Server/MigrateSpec.hs @@ -28,11 +28,11 @@ import Hasura.Server.Version (HasVersion) -- -- NOTE: downgrade test disabled for now (see #5273) newtype CacheRefT m a - = CacheRefT { runCacheRefT :: MVar (RebuildableSchemaCache m) -> m a } + = CacheRefT { runCacheRefT :: MVar RebuildableSchemaCache -> m a } deriving ( Functor, Applicative, Monad, MonadIO, MonadError e, MonadBase b, MonadBaseControl b , MonadTx, MonadUnique, UserInfoM, HasHttpManager, HasSQLGenCtx ) - via (ReaderT (MVar (RebuildableSchemaCache m)) m) + via (ReaderT (MVar RebuildableSchemaCache) m) instance MonadTrans CacheRefT where lift = CacheRefT . const @@ -41,7 +41,7 @@ instance (MonadBase IO m) => TableCoreInfoRM (CacheRefT m) instance (MonadBase IO m) => CacheRM (CacheRefT m) where askSchemaCache = CacheRefT (fmap lastBuiltSchemaCache . readMVar) -instance (MonadIO m, MonadBaseControl IO m, MonadTx m) => CacheRWM (CacheRefT m) where +instance (MonadIO m, MonadBaseControl IO m, MonadTx m, HasHttpManager m, HasSQLGenCtx m) => CacheRWM (CacheRefT m) where buildSchemaCacheWithOptions reason invalidations metadata = CacheRefT $ flip modifyMVar \schemaCache -> do ((), cache, _) <- runCacheRWT schemaCache (buildSchemaCacheWithOptions reason invalidations metadata) pure (cache, ()) @@ -60,7 +60,6 @@ spec , MonadIO m , MonadBaseControl IO m , MonadTx m - , MonadUnique m , HasHttpManager m , HasSQLGenCtx m ) diff --git a/server/src-test/Main.hs b/server/src-test/Main.hs index 33619ab486d..816fbce8b31 100644 --- a/server/src-test/Main.hs +++ b/server/src-test/Main.hs @@ -86,7 +86,7 @@ buildPostgresSpecs pgConnOptions = do httpManager <- HTTP.newManager HTTP.tlsManagerSettings let runContext = RunCtx adminUserInfo httpManager (SQLGenCtx False) - runAsAdmin :: Run a -> IO a + runAsAdmin :: RunT IO a -> IO a runAsAdmin = peelRun runContext pgContext Q.ReadWrite Nothing >>> runExceptT