Merge branch 'master' into 5363-default-bounded-plan-cache

This commit is contained in:
Brandon Simmons 2020-07-28 20:23:26 -04:00 committed by GitHub
commit 2a0768d7ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 478 additions and 54 deletions

View File

@ -9,6 +9,7 @@
- server: bugfix to allow HASURA_GRAPHQL_QUERY_PLAN_CACHE_SIZE of 0 (#5363) - server: bugfix to allow HASURA_GRAPHQL_QUERY_PLAN_CACHE_SIZE of 0 (#5363)
- server: support only a bounded plan cache, with a default size of 4000 (closes #5363) - server: support only a bounded plan cache, with a default size of 4000 (closes #5363)
- console: update sidebar icons for different action and trigger types - console: update sidebar icons for different action and trigger types
- server: add request/response sizes in event triggers (and scheduled trigger) logs
## `v1.3.0` ## `v1.3.0`

View File

@ -17,6 +17,212 @@ import (
type CustomQuery linq.Query type CustomQuery linq.Query
func (q CustomQuery) MergeCustomTypes(squashList *database.CustomList) error {
actionPermissionsTransition := transition.New(&cronTriggerConfig{})
actionPermissionsTransition.Initial("new")
actionPermissionsTransition.State("created")
actionPermissionsTransition.Event(setCustomTypes).To("created").From("new", "created")
next := q.Iterate()
for item, ok := next(); ok; item, ok = next() {
g := item.(linq.Group)
if g.Key == "" {
continue
}
var first *list.Element
for ind, val := range g.Group {
element := val.(*list.Element)
switch obj := element.Value.(type) {
case *setCustomTypesInput:
if ind == 0 {
first = element
continue
}
first.Value = obj
squashList.Remove(element)
}
}
}
return nil
}
func (q CustomQuery) MergeActionPermissions(squashList *database.CustomList) error {
actionPermissionsTransition := transition.New(&actionPermissionConfig{})
actionPermissionsTransition.Initial("new")
actionPermissionsTransition.State("created")
actionPermissionsTransition.State("deleted")
actionPermissionsTransition.Event(createActionPermission).To("created").From("new", "deleted")
actionPermissionsTransition.Event(dropActionPermission).To("deleted").From("new", "created")
next := q.Iterate()
for item, ok := next(); ok; item, ok = next() {
g := item.(linq.Group)
if g.Key == "" {
continue
}
key := g.Key.(string)
cfg := actionPermissionConfig{
action: key,
}
prevElems := make([]*list.Element, 0)
for _, val := range g.Group {
element := val.(*list.Element)
switch element.Value.(type) {
case *createActionPermissionInput:
err := actionPermissionsTransition.Trigger(createActionPermission, &cfg, nil)
if err != nil {
return err
}
prevElems = append(prevElems, element)
case *dropActionPermissionInput:
if cfg.GetState() == "created" {
prevElems = append(prevElems, element)
}
err := actionPermissionsTransition.Trigger(dropActionPermission, &cfg, nil)
if err != nil {
return err
}
for _, e := range prevElems {
squashList.Remove(e)
}
}
}
}
return nil
}
func (q CustomQuery) MergeActions(squashList *database.CustomList) error {
actionTransition := transition.New(&actionConfig{})
actionTransition.Initial("new")
actionTransition.State("created")
actionTransition.State("updated")
actionTransition.State("deleted")
actionTransition.Event(createAction).To("created").From("new", "deleted")
actionTransition.Event(updateAction).To("updated").From("new", "created", "updated", "deleted")
actionTransition.Event(dropAction).To("deleted").From("new", "created", "updated")
next := q.Iterate()
for item, ok := next(); ok; item, ok = next() {
g := item.(linq.Group)
if g.Key == "" {
continue
}
key, ok := g.Key.(string)
if !ok {
continue
}
cfg := actionConfig{
name: key,
}
prevElems := make([]*list.Element, 0)
for _, val := range g.Group {
element := val.(*list.Element)
switch obj := element.Value.(type) {
case *createActionInput:
err := actionTransition.Trigger(createAction, &cfg, nil)
if err != nil {
return errors.Wrapf(err, "error squashin Action: %v", obj.Name)
}
prevElems = append(prevElems, element)
case *updateActionInput:
if len(prevElems) != 0 {
if _, ok := prevElems[0].Value.(*createActionInput); ok {
prevElems[0].Value = &createActionInput{
actionDefinition: obj.actionDefinition,
}
prevElems = prevElems[:1]
err := actionTransition.Trigger(dropAction, &cfg, nil)
if err != nil {
return errors.Wrapf(err, "error squashing action: %v", obj.Name)
}
squashList.Remove(element)
err = actionTransition.Trigger(createAction, &cfg, nil)
if err != nil {
return errors.Wrapf(err, "error squashing action: %v", obj.Name)
}
continue
}
for _, e := range prevElems {
squashList.Remove(e)
}
prevElems = prevElems[:0]
err := actionTransition.Trigger(dropAction, &cfg, nil)
if err != nil {
return errors.Wrapf(err, "error squashing action: %v", obj.Name)
}
}
prevElems = append(prevElems, element)
err := actionTransition.Trigger(updateAction, &cfg, nil)
if err != nil {
return errors.Wrapf(err, "error squashing: %v", obj.Name)
}
case *dropActionInput:
if cfg.GetState() == "created" {
prevElems = append(prevElems, element)
// drop action permissions as well
actionPermissionGroup := CustomQuery(linq.FromIterable(squashList).GroupByT(
func(element *list.Element) string {
switch args := element.Value.(type) {
case *createActionPermissionInput:
if v, ok := args.Action.(string); ok {
return v
}
case *dropActionPermissionInput:
if v, ok := args.Action.(string); ok {
return v
}
}
return ""
}, func(element *list.Element) *list.Element {
return element
},
))
next := actionPermissionGroup.Iterate()
for item, ok := next(); ok; item, ok = next() {
g := item.(linq.Group)
if g.Key == "" {
continue
}
key, ok := g.Key.(string)
if !ok {
continue
}
if key == obj.Name {
for _, val := range g.Group {
element := val.(*list.Element)
squashList.Remove(element)
}
}
}
}
err := actionTransition.Trigger(dropAction, &cfg, nil)
if err != nil {
return err
}
for _, e := range prevElems {
squashList.Remove(e)
}
prevElems = prevElems[:0]
}
}
}
return nil
}
func (q CustomQuery) MergeCronTriggers(squashList *database.CustomList) error { func (q CustomQuery) MergeCronTriggers(squashList *database.CustomList) error {
cronTriggersTransition := transition.New(&cronTriggerConfig{}) cronTriggersTransition := transition.New(&cronTriggerConfig{})
cronTriggersTransition.Initial("new") cronTriggersTransition.Initial("new")
@ -975,6 +1181,21 @@ func (h *HasuraDB) PushToList(migration io.Reader, fileType string, l *database.
} }
l.PushBack(o) l.PushBack(o)
} }
case *createActionInput, *updateActionInput:
if v.Type == updateAction {
o, ok := v.Args.(*updateActionInput)
if !ok {
break
}
l.PushBack(o)
}
if v.Type == createAction {
o, ok := v.Args.(*createActionInput)
if !ok {
break
}
l.PushBack(o)
}
default: default:
l.PushBack(actionType) l.PushBack(actionType)
} }
@ -1437,6 +1658,70 @@ func (h *HasuraDB) Squash(l *database.CustomList, ret chan<- interface{}) {
ret <- err ret <- err
} }
customTypesGroup := CustomQuery(linq.FromIterable(l).GroupByT(
func(element *list.Element) string {
switch element.Value.(type) {
case *setCustomTypesInput:
return setCustomTypes
}
return ""
}, func(element *list.Element) *list.Element {
return element
},
))
err = customTypesGroup.MergeCustomTypes(l)
if err != nil {
ret <- err
}
actionGroup := CustomQuery(linq.FromIterable(l).GroupByT(
func(element *list.Element) string {
switch args := element.Value.(type) {
case *createActionInput:
if v, ok := args.Name.(string); ok {
return v
}
case *updateActionInput:
if v, ok := args.Name.(string); ok {
return v
}
case *dropActionInput:
if v, ok := args.Name.(string); ok {
return v
}
}
return ""
}, func(element *list.Element) *list.Element {
return element
},
))
err = actionGroup.MergeActions(l)
if err != nil {
ret <- err
}
actionPermissionGroup := CustomQuery(linq.FromIterable(l).GroupByT(
func(element *list.Element) string {
switch args := element.Value.(type) {
case *createActionPermissionInput:
if v, ok := args.Action.(string); ok {
return v
}
case *dropActionPermissionInput:
if v, ok := args.Action.(string); ok {
return v
}
}
return ""
}, func(element *list.Element) *list.Element {
return element
},
))
err = actionPermissionGroup.MergeActionPermissions(l)
if err != nil {
ret <- err
}
for e := l.Front(); e != nil; e = e.Next() { for e := l.Front(); e != nil; e = e.Next() {
q := HasuraInterfaceQuery{ q := HasuraInterfaceQuery{
Args: e.Value, Args: e.Value,
@ -1520,10 +1805,23 @@ func (h *HasuraDB) Squash(l *database.CustomList, ret chan<- interface{}) {
q.Type = createCronTrigger q.Type = createCronTrigger
case *deleteCronTriggerInput: case *deleteCronTriggerInput:
q.Type = deleteCronTrigger q.Type = deleteCronTrigger
case *createActionInput:
q.Type = createAction
case *updateActionInput:
q.Type = updateAction
case *dropActionInput:
q.Type = dropAction
case *createActionPermissionInput:
q.Type = createActionPermission
case *dropActionPermissionInput:
q.Type = dropActionPermission
case *setCustomTypesInput:
q.Type = setCustomTypes
case *RunSQLInput: case *RunSQLInput:
ret <- []byte(args.SQL) ret <- []byte(args.SQL)
continue continue
default: default:
h.logger.Debug("cannot find metadata type for:", args)
ret <- fmt.Errorf("invalid metadata action") ret <- fmt.Errorf("invalid metadata action")
return return
} }

View File

@ -80,6 +80,51 @@ type deleteCronTriggerInput struct {
Name string `json:"name" yaml:"name"` Name string `json:"name" yaml:"name"`
} }
type actionDefinition struct {
Name interface{} `json:"name,omitempty" yaml:"name,omitempty"`
Definition interface{} `json:"definition,omitempty" yaml:"definition,omitempty"`
}
type createActionInput struct {
actionDefinition
Comment string `json:"comment,omitempty" yaml:"comment,omitempty"`
}
type actionAndPermission struct {
actionDefinition
Permissions []PermissionDefinition `json:"permissions" yaml:"permissions"`
}
type dropActionInput struct {
Name interface{} `json:"name,omitempty" yaml:"name,omitempty"`
ClearData bool `json:"clear_data,omitempty" yaml:"clear_data,omitempty"`
}
type updateActionInput struct {
actionDefinition
}
type PermissionDefinition struct {
Role interface{} `json:"role,omitempty" yaml:"role,omitempty"`
Comment string `json:"comment,omitempty" yaml:"comment,omitempty"`
}
type createActionPermissionInput struct {
Action interface{} `json:"action,omitempty" yaml:"action,omitempty"`
PermissionDefinition
}
type dropActionPermissionInput struct {
Action interface{} `json:"action,omitempty" yaml:"action,omitempty"`
PermissionDefinition
}
type setCustomTypesInput struct {
InputObjects interface{} `json:"input_objects,omitempty" yaml:"input_objects,omitempty"`
Objects interface{} `json:"objects,omitempty" yaml:"objects,omitempty"`
Scalars interface{} `json:"scalars,omitempty" yaml:"scalars,omitempty"`
Enums interface{} `json:"enums,omitempty" yaml:"enums,omitempty"`
}
func (h *newHasuraIntefaceQuery) UnmarshalJSON(b []byte) error { func (h *newHasuraIntefaceQuery) UnmarshalJSON(b []byte) error {
type t newHasuraIntefaceQuery type t newHasuraIntefaceQuery
var q t var q t
@ -175,6 +220,18 @@ func (h *newHasuraIntefaceQuery) UnmarshalJSON(b []byte) error {
q.Args = &createCronTriggerInput{} q.Args = &createCronTriggerInput{}
case deleteCronTrigger: case deleteCronTrigger:
q.Args = &deleteCronTriggerInput{} q.Args = &deleteCronTriggerInput{}
case createAction:
q.Args = &createActionInput{}
case dropAction:
q.Args = &dropActionInput{}
case updateAction:
q.Args = &updateActionInput{}
case createActionPermission:
q.Args = &createActionPermissionInput{}
case dropActionPermission:
q.Args = &dropActionPermissionInput{}
case setCustomTypes:
q.Args = &setCustomTypesInput{}
default: default:
return fmt.Errorf("cannot squash type %s", q.Type) return fmt.Errorf("cannot squash type %s", q.Type)
} }
@ -357,6 +414,12 @@ const (
deleteRemoteRelationship = "delete_remote_relationship" deleteRemoteRelationship = "delete_remote_relationship"
createCronTrigger = "create_cron_trigger" createCronTrigger = "create_cron_trigger"
deleteCronTrigger = "delete_cron_trigger" deleteCronTrigger = "delete_cron_trigger"
createAction = "create_action"
dropAction = "drop_action"
updateAction = "update_action"
createActionPermission = "create_action_permission"
dropActionPermission = "drop_action_permission"
setCustomTypes = "set_custom_types"
) )
type tableMap struct { type tableMap struct {
@ -700,6 +763,8 @@ type replaceMetadataInput struct {
AllowList []*addCollectionToAllowListInput `json:"allowlist" yaml:"allowlist"` AllowList []*addCollectionToAllowListInput `json:"allowlist" yaml:"allowlist"`
RemoteSchemas []*addRemoteSchemaInput `json:"remote_schemas" yaml:"remote_schemas"` RemoteSchemas []*addRemoteSchemaInput `json:"remote_schemas" yaml:"remote_schemas"`
CronTriggers []*createCronTriggerInput `json:"cron_triggers" yaml:"cron_triggers"` CronTriggers []*createCronTriggerInput `json:"cron_triggers" yaml:"cron_triggers"`
Actions []*actionAndPermission `json:"actions" yaml:"actions"`
CustomTypes *setCustomTypesInput `json:"custom_types" yaml:"custom_types"`
} }
func (rmi *replaceMetadataInput) convertToMetadataActions(l *database.CustomList) { func (rmi *replaceMetadataInput) convertToMetadataActions(l *database.CustomList) {
@ -806,15 +871,17 @@ func (rmi *replaceMetadataInput) convertToMetadataActions(l *database.CustomList
} }
for _, table := range rmi.Tables { for _, table := range rmi.Tables {
for _, remoteRelationship := range *table.RemoteRelationships { if table.RemoteRelationships != nil {
r := createRemoteRelationshipInput{ for _, remoteRelationship := range *table.RemoteRelationships {
remoteRelationshipDefinition: remoteRelationship.Definiton, r := createRemoteRelationshipInput{
Table: tableSchema{ remoteRelationshipDefinition: remoteRelationship.Definiton,
Name: table.Table.Name, Table: tableSchema{
Schema: table.Table.Schema, Name: table.Table.Name,
}, Schema: table.Table.Schema,
},
}
l.PushBack(r)
} }
l.PushBack(r)
} }
} }
@ -842,6 +909,26 @@ func (rmi *replaceMetadataInput) convertToMetadataActions(l *database.CustomList
for _, ct := range rmi.CronTriggers { for _, ct := range rmi.CronTriggers {
l.PushBack(ct) l.PushBack(ct)
} }
// track actions
for _, action := range rmi.Actions {
// action definition
a := &createActionInput{
actionDefinition: action.actionDefinition,
}
l.PushBack(a)
// permission
for _, permission := range action.Permissions {
p := &createActionPermissionInput{
Action: action.Name,
PermissionDefinition: permission,
}
l.PushBack(p)
}
}
if rmi.CustomTypes != nil {
l.PushBack(rmi.CustomTypes)
}
} }
type InconsistentMetadata struct { type InconsistentMetadata struct {
@ -1022,7 +1109,17 @@ type remoteRelationshipConfig struct {
tableName, schemaName, name string tableName, schemaName, name string
transition.Transition transition.Transition
} }
type cronTriggerConfig struct { type cronTriggerConfig struct {
name string name string
transition.Transition transition.Transition
} }
type actionConfig struct {
name string
transition.Transition
}
type actionPermissionConfig struct {
action string
transition.Transition
}

View File

@ -38,6 +38,9 @@ func getLatestVersion() (*semver.Version, *semver.Version, error) {
if err != nil { if err != nil {
return nil, nil, errors.Wrap(err, "decoding update check response") return nil, nil, errors.Wrap(err, "decoding update check response")
} }
if response.Latest == nil && response.PreRelease == nil {
return nil,nil, fmt.Errorf("expected version info not found at %s", updateCheckURL)
}
return response.Latest, response.PreRelease, nil return response.Latest, response.PreRelease, nil
} }

View File

@ -64,6 +64,7 @@ import Hasura.SQL.Types
import qualified Hasura.Tracing as Tracing import qualified Hasura.Tracing as Tracing
import qualified Control.Concurrent.Async.Lifted.Safe as LA import qualified Control.Concurrent.Async.Lifted.Safe as LA
import qualified Data.ByteString.Lazy as LBS
import qualified Data.HashMap.Strict as M import qualified Data.HashMap.Strict as M
import qualified Data.TByteString as TBS import qualified Data.TByteString as TBS
import qualified Data.Text as T import qualified Data.Text as T
@ -272,9 +273,11 @@ processEventQueue logger logenv httpMgr pool getSchemaCache eeCtx@EventEngineCtx
etHeaders = map encodeHeader headerInfos etHeaders = map encodeHeader headerInfos
headers = addDefaultHeaders etHeaders headers = addDefaultHeaders etHeaders
ep = createEventPayload retryConf e ep = createEventPayload retryConf e
payload = encode $ toJSON ep
extraLogCtx = ExtraLogContext Nothing (epId ep) -- avoiding getting current time here to avoid another IO call with each event call extraLogCtx = ExtraLogContext Nothing (epId ep) -- avoiding getting current time here to avoid another IO call with each event call
res <- runExceptT $ tryWebhook headers responseTimeout (toJSON ep) webhook requestDetails = RequestDetails $ LBS.length payload
logHTTPForET res extraLogCtx res <- runExceptT $ tryWebhook headers responseTimeout payload webhook
logHTTPForET res extraLogCtx requestDetails
let decodedHeaders = map (decodeHeader logenv headerInfos) headers let decodedHeaders = map (decodeHeader logenv headerInfos) headers
either either
(processError pool e retryConf decodedHeaders ep) (processError pool e retryConf decodedHeaders ep)

View File

@ -17,6 +17,7 @@ module Hasura.Eventing.HTTP
, logHTTPForET , logHTTPForET
, logHTTPForST , logHTTPForST
, ExtraLogContext(..) , ExtraLogContext(..)
, RequestDetails (..)
, EventId , EventId
, Invocation(..) , Invocation(..)
, InvocationVersion , InvocationVersion
@ -46,9 +47,9 @@ import qualified Data.TByteString as TBS
import qualified Data.Text as T import qualified Data.Text as T
import qualified Data.Text.Encoding as TE import qualified Data.Text.Encoding as TE
import qualified Data.Text.Encoding.Error as TE import qualified Data.Text.Encoding.Error as TE
import qualified Data.Time.Clock as Time
import qualified Network.HTTP.Client as HTTP import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTP import qualified Network.HTTP.Types as HTTP
import qualified Data.Time.Clock as Time
import Control.Exception (try) import Control.Exception (try)
import Data.Aeson import Data.Aeson
@ -56,6 +57,7 @@ import Data.Aeson.Casing
import Data.Aeson.TH import Data.Aeson.TH
import Data.Either import Data.Either
import Data.Has import Data.Has
import Data.Int (Int64)
import Hasura.Logging import Hasura.Logging
import Hasura.Prelude import Hasura.Prelude
import Hasura.RQL.DDL.Headers import Hasura.RQL.DDL.Headers
@ -146,6 +148,7 @@ data HTTPResp (a :: TriggerTypes)
{ hrsStatus :: !Int { hrsStatus :: !Int
, hrsHeaders :: ![HeaderConf] , hrsHeaders :: ![HeaderConf]
, hrsBody :: !TBS.TByteString , hrsBody :: !TBS.TByteString
, hrsSize :: !Int64
} deriving (Show, Eq) } deriving (Show, Eq)
$(deriveToJSON (aesonDrop 3 snakeCase){omitNothingFields=True} ''HTTPResp) $(deriveToJSON (aesonDrop 3 snakeCase){omitNothingFields=True} ''HTTPResp)
@ -189,28 +192,37 @@ mkHTTPResp resp =
HTTPResp HTTPResp
{ hrsStatus = HTTP.statusCode $ HTTP.responseStatus resp { hrsStatus = HTTP.statusCode $ HTTP.responseStatus resp
, hrsHeaders = map decodeHeader $ HTTP.responseHeaders resp , hrsHeaders = map decodeHeader $ HTTP.responseHeaders resp
, hrsBody = TBS.fromLBS $ HTTP.responseBody resp , hrsBody = TBS.fromLBS respBody
, hrsSize = LBS.length respBody
} }
where where
respBody = HTTP.responseBody resp
decodeBS = TE.decodeUtf8With TE.lenientDecode decodeBS = TE.decodeUtf8With TE.lenientDecode
decodeHeader (hdrName, hdrVal) decodeHeader (hdrName, hdrVal)
= HeaderConf (decodeBS $ CI.original hdrName) (HVValue (decodeBS hdrVal)) = HeaderConf (decodeBS $ CI.original hdrName) (HVValue (decodeBS hdrVal))
newtype RequestDetails
= RequestDetails { _rdSize :: Int64 }
$(deriveToJSON (aesonDrop 3 snakeCase) ''RequestDetails)
data HTTPRespExtra (a :: TriggerTypes) data HTTPRespExtra (a :: TriggerTypes)
= HTTPRespExtra = HTTPRespExtra
{ _hreResponse :: Either (HTTPErr a) (HTTPResp a) { _hreResponse :: !(Either (HTTPErr a) (HTTPResp a))
, _hreContext :: ExtraLogContext , _hreContext :: !ExtraLogContext
, _hreRequest :: !RequestDetails
} }
instance ToJSON (HTTPRespExtra a) where instance ToJSON (HTTPRespExtra a) where
toJSON (HTTPRespExtra resp ctxt) = do toJSON (HTTPRespExtra resp ctxt req) =
case resp of case resp of
Left errResp -> Left errResp ->
object [ "response" .= toJSON errResp object [ "response" .= toJSON errResp
, "request" .= toJSON req
, "context" .= toJSON ctxt , "context" .= toJSON ctxt
] ]
Right rsp -> Right rsp ->
object [ "response" .= toJSON rsp object [ "response" .= toJSON rsp
, "request" .= toJSON req
, "context" .= toJSON ctxt , "context" .= toJSON ctxt
] ]
@ -260,20 +272,26 @@ logHTTPForET
, Has (Logger Hasura) r , Has (Logger Hasura) r
, MonadIO m , MonadIO m
) )
=> Either (HTTPErr 'EventType) (HTTPResp 'EventType) -> ExtraLogContext -> m () => Either (HTTPErr 'EventType) (HTTPResp 'EventType)
logHTTPForET eitherResp extraLogCtx = do -> ExtraLogContext
-> RequestDetails
-> m ()
logHTTPForET eitherResp extraLogCtx reqDetails = do
logger :: Logger Hasura <- asks getter logger :: Logger Hasura <- asks getter
unLogger logger $ HTTPRespExtra eitherResp extraLogCtx unLogger logger $ HTTPRespExtra eitherResp extraLogCtx reqDetails
logHTTPForST logHTTPForST
:: ( MonadReader r m :: ( MonadReader r m
, Has (Logger Hasura) r , Has (Logger Hasura) r
, MonadIO m , MonadIO m
) )
=> Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType) -> ExtraLogContext -> m () => Either (HTTPErr 'ScheduledType) (HTTPResp 'ScheduledType)
logHTTPForST eitherResp extraLogCtx = do -> ExtraLogContext
-> RequestDetails
-> m ()
logHTTPForST eitherResp extraLogCtx reqDetails = do
logger :: Logger Hasura <- asks getter logger :: Logger Hasura <- asks getter
unLogger logger $ HTTPRespExtra eitherResp extraLogCtx unLogger logger $ HTTPRespExtra eitherResp extraLogCtx reqDetails
runHTTP :: (MonadIO m) => HTTP.Manager -> HTTP.Request -> m (Either (HTTPErr a) (HTTPResp a)) runHTTP :: (MonadIO m) => HTTP.Manager -> HTTP.Request -> m (Either (HTTPErr a) (HTTPResp a))
runHTTP manager req = do runHTTP manager req = do
@ -289,10 +307,13 @@ tryWebhook ::
) )
=> [HTTP.Header] => [HTTP.Header]
-> HTTP.ResponseTimeout -> HTTP.ResponseTimeout
-> Value -> LBS.ByteString
-- ^ the request body. It is passed as a 'BL.Bytestring' because we need to
-- log the request size. As the logging happens outside the function, we pass
-- it the final request body, instead of 'Value'
-> String -> String
-> m (HTTPResp a) -> m (HTTPResp a)
tryWebhook headers timeout payload webhook = traceHttpRequest (T.pack webhook) do tryWebhook headers timeout payload webhook = do
initReqE <- liftIO $ try $ HTTP.parseRequest webhook initReqE <- liftIO $ try $ HTTP.parseRequest webhook
manager <- asks getter manager <- asks getter
case initReqE of case initReqE of
@ -302,10 +323,10 @@ tryWebhook headers timeout payload webhook = traceHttpRequest (T.pack webhook) d
initReq initReq
{ HTTP.method = "POST" { HTTP.method = "POST"
, HTTP.requestHeaders = headers , HTTP.requestHeaders = headers
, HTTP.requestBody = HTTP.RequestBodyLBS (encode payload) , HTTP.requestBody = HTTP.RequestBodyLBS payload
, HTTP.responseTimeout = timeout , HTTP.responseTimeout = timeout
} }
pure $ SuspendedRequest req \req' -> do tracedHttpRequest req $ \req' -> do
eitherResp <- runHTTP manager req' eitherResp <- runHTTP manager req'
onLeft eitherResp throwError onLeft eitherResp throwError

View File

@ -95,6 +95,7 @@ import System.Cron
import qualified Data.Aeson as J import qualified Data.Aeson as J
import qualified Data.Aeson.Casing as J import qualified Data.Aeson.Casing as J
import qualified Data.Aeson.TH as J import qualified Data.Aeson.TH as J
import qualified Data.ByteString.Lazy as BL
import qualified Data.Environment as Env import qualified Data.Environment as Env
import qualified Data.HashMap.Strict as Map import qualified Data.HashMap.Strict as Map
import qualified Data.Set as Set import qualified Data.Set as Set
@ -489,8 +490,10 @@ processScheduledEvent
webhookReqPayload = webhookReqPayload =
ScheduledEventWebhookPayload sefId sefName sefScheduledTime sefPayload sefComment currentTime ScheduledEventWebhookPayload sefId sefName sefScheduledTime sefPayload sefComment currentTime
webhookReqBodyJson = J.toJSON webhookReqPayload webhookReqBodyJson = J.toJSON webhookReqPayload
res <- runExceptT $ tryWebhook headers httpTimeout webhookReqBodyJson (T.unpack sefWebhook) webhookReqBody = J.encode webhookReqBodyJson
logHTTPForST res extraLogCtx requestDetails = RequestDetails $ BL.length webhookReqBody
res <- runExceptT $ tryWebhook headers httpTimeout webhookReqBody (T.unpack sefWebhook)
logHTTPForST res extraLogCtx requestDetails
let decodedHeaders = map (decodeHeader logEnv sefHeaders) headers let decodedHeaders = map (decodeHeader logEnv sefHeaders) headers
either either
(processError pgpool se decodedHeaders type' webhookReqBodyJson) (processError pgpool se decodedHeaders type' webhookReqBodyJson)

View File

@ -359,7 +359,7 @@ execRemoteGQ'
-> RemoteSchemaInfo -> RemoteSchemaInfo
-> G.OperationType -> G.OperationType
-> m (DiffTime, [N.Header], BL.ByteString) -> m (DiffTime, [N.Header], BL.ByteString)
execRemoteGQ' env manager userInfo reqHdrs q rsi opType = Tracing.traceHttpRequest (T.pack (show url)) $ do execRemoteGQ' env manager userInfo reqHdrs q rsi opType = do
when (opType == G.OperationTypeSubscription) $ when (opType == G.OperationTypeSubscription) $
throw400 NotSupported "subscription to remote server is not supported" throw400 NotSupported "subscription to remote server is not supported"
confHdrs <- makeHeadersFromConf env hdrConf confHdrs <- makeHeadersFromConf env hdrConf
@ -380,7 +380,7 @@ execRemoteGQ' env manager userInfo reqHdrs q rsi opType = Tracing.traceHttpReque
, HTTP.requestBody = HTTP.RequestBodyLBS (J.encode q) , HTTP.requestBody = HTTP.RequestBodyLBS (J.encode q)
, HTTP.responseTimeout = HTTP.responseTimeoutMicro (timeout * 1000000) , HTTP.responseTimeout = HTTP.responseTimeoutMicro (timeout * 1000000)
} }
pure $ Tracing.SuspendedRequest req \req' -> do Tracing.tracedHttpRequest req \req' -> do
(time, res) <- withElapsedTime $ liftIO $ try $ HTTP.httpLbs req' manager (time, res) <- withElapsedTime $ liftIO $ try $ HTTP.httpLbs req' manager
resp <- either httpThrow return res resp <- either httpThrow return res
pure (time, mkSetCookieHeaders resp, resp ^. Wreq.responseBody) pure (time, mkSetCookieHeaders resp, resp ^. Wreq.responseBody)

View File

@ -503,13 +503,13 @@ callWebhook env manager outputType outputFields reqHeaders confHeaders
hdrs = contentType : (Map.toList . Map.fromList) (resolvedConfHeaders <> clientHeaders) hdrs = contentType : (Map.toList . Map.fromList) (resolvedConfHeaders <> clientHeaders)
postPayload = J.toJSON actionWebhookPayload postPayload = J.toJSON actionWebhookPayload
url = unResolvedWebhook resolvedWebhook url = unResolvedWebhook resolvedWebhook
httpResponse <- Tracing.traceHttpRequest url do httpResponse <- do
initReq <- liftIO $ HTTP.parseRequest (T.unpack url) initReq <- liftIO $ HTTP.parseRequest (T.unpack url)
let req = initReq { HTTP.method = "POST" let req = initReq { HTTP.method = "POST"
, HTTP.requestHeaders = addDefaultHeaders hdrs , HTTP.requestHeaders = addDefaultHeaders hdrs
, HTTP.requestBody = HTTP.RequestBodyLBS (J.encode postPayload) , HTTP.requestBody = HTTP.RequestBodyLBS (J.encode postPayload)
} }
pure $ Tracing.SuspendedRequest req \req' -> Tracing.tracedHttpRequest req \req' ->
liftIO . try $ HTTP.httpLbs req' manager liftIO . try $ HTTP.httpLbs req' manager
let requestInfo = ActionRequestInfo url postPayload $ let requestInfo = ActionRequestInfo url postPayload $
confHeaders <> toHeadersConf clientHeaders confHeaders <> toHeadersConf clientHeaders

View File

@ -275,9 +275,13 @@ mkSpockAction serverCtx qErrEncoder qErrModifier apiHandler = do
tracingCtx tracingCtx
(fromString (B8.unpack pathInfo)) (fromString (B8.unpack pathInfo))
requestId <- getRequestId headers requestId <- getRequestId headers
mapActionT runTraceT $ do mapActionT runTraceT $ do
-- Add the request ID to the tracing metadata so that we
-- can correlate requests and traces
lift $ Tracing.attachMetadata [("request_id", unRequestId requestId)]
userInfoE <- fmap fst <$> lift (resolveUserInfo logger manager headers authMode) userInfoE <- fmap fst <$> lift (resolveUserInfo logger manager headers authMode)
userInfo <- either (logErrorAndResp Nothing requestId req (Left reqBody) False headers . qErrModifier) userInfo <- either (logErrorAndResp Nothing requestId req (Left reqBody) False headers . qErrModifier)
return userInfoE return userInfoE

View File

@ -165,10 +165,10 @@ updateJwkRef (Logger logger) manager url jwkRef = do
let urlT = T.pack $ show url let urlT = T.pack $ show url
infoMsg = "refreshing JWK from endpoint: " <> urlT infoMsg = "refreshing JWK from endpoint: " <> urlT
liftIO $ logger $ JwkRefreshLog LevelInfo (Just infoMsg) Nothing liftIO $ logger $ JwkRefreshLog LevelInfo (Just infoMsg) Nothing
res <- try $ Tracing.traceHttpRequest urlT do res <- try $ do
initReq <- liftIO $ HTTP.parseRequest $ show url initReq <- liftIO $ HTTP.parseRequest $ show url
let req = initReq { HTTP.requestHeaders = addDefaultHeaders (HTTP.requestHeaders initReq) } let req = initReq { HTTP.requestHeaders = addDefaultHeaders (HTTP.requestHeaders initReq) }
pure $ Tracing.SuspendedRequest req \req' -> do Tracing.tracedHttpRequest req \req' -> do
liftIO $ HTTP.httpLbs req' manager liftIO $ HTTP.httpLbs req' manager
resp <- either logAndThrowHttp return res resp <- either logAndThrowHttp return res
let status = resp ^. Wreq.responseStatus let status = resp ^. Wreq.responseStatus

View File

@ -75,10 +75,10 @@ userInfoFromAuthHook logger manager hook reqHeaders = do
mkUserInfoFromResp logger (ahUrl hook) (hookMethod hook) status respBody mkUserInfoFromResp logger (ahUrl hook) (hookMethod hook) status respBody
where where
performHTTPRequest :: m (Wreq.Response BL.ByteString) performHTTPRequest :: m (Wreq.Response BL.ByteString)
performHTTPRequest = Tracing.traceHttpRequest (ahUrl hook) do performHTTPRequest = do
let url = T.unpack $ ahUrl hook let url = T.unpack $ ahUrl hook
req <- liftIO $ H.parseRequest url req <- liftIO $ H.parseRequest url
pure $ Tracing.SuspendedRequest req \req' -> liftIO do Tracing.tracedHttpRequest req \req' -> liftIO do
case ahType hook of case ahType hook of
AHTGet -> do AHTGet -> do
let isCommonHeader = (`elem` commonClientHeadersIgnored) let isCommonHeader = (`elem` commonClientHeadersIgnored)

View File

@ -14,9 +14,8 @@ module Hasura.Tracing
, noReporter , noReporter
, HasReporter(..) , HasReporter(..)
, TracingMetadata , TracingMetadata
, SuspendedRequest(..)
, extractHttpContext , extractHttpContext
, traceHttpRequest , tracedHttpRequest
, injectEventContext , injectEventContext
, extractEventContext , extractEventContext
) where ) where
@ -198,17 +197,13 @@ instance MonadTrace m => MonadTrace (ExceptT e m) where
currentReporter = lift currentReporter currentReporter = lift currentReporter
attachMetadata = lift . attachMetadata attachMetadata = lift . attachMetadata
-- | A HTTP request, which can be modified before execution.
data SuspendedRequest m a = SuspendedRequest HTTP.Request (HTTP.Request -> m a)
-- | Inject the trace context as a set of HTTP headers. -- | Inject the trace context as a set of HTTP headers.
injectHttpContext :: TraceContext -> [HTTP.Header] injectHttpContext :: TraceContext -> [HTTP.Header]
injectHttpContext TraceContext{..} = injectHttpContext TraceContext{..} =
[ ("X-Hasura-TraceId", fromString (show tcCurrentTrace)) [ ("X-Hasura-TraceId", fromString (show tcCurrentTrace))
, ("X-Hasura-SpanId", fromString (show tcCurrentSpan)) , ("X-Hasura-SpanId", fromString (show tcCurrentSpan))
] ]
-- | Extract the trace and parent span headers from a HTTP request -- | Extract the trace and parent span headers from a HTTP request
-- and create a new 'TraceContext'. The new context will contain -- and create a new 'TraceContext'. The new context will contain
-- a fresh span ID, and the provided span ID will be assigned as -- a fresh span ID, and the provided span ID will be assigned as
@ -239,16 +234,15 @@ extractEventContext e = do
<*> pure freshSpanId <*> pure freshSpanId
<*> pure (e ^? JL.key "trace_context" . JL.key "span_id" . JL._Integral) <*> pure (e ^? JL.key "trace_context" . JL.key "span_id" . JL._Integral)
traceHttpRequest -- | Perform HTTP request which supports Trace headers
:: MonadTrace m tracedHttpRequest
=> Text :: MonadTrace m
-- ^ human-readable name for this block of code => HTTP.Request
-> m (SuspendedRequest m a) -- ^ http request that needs to be made
-- ^ an action which yields the request about to be executed and suspends -> (HTTP.Request -> m a)
-- before actually executing it -- ^ a function that takes the traced request and executes it
-> m a -> m a
traceHttpRequest name f = trace name do tracedHttpRequest req f = trace (bsToTxt (HTTP.path req)) do
SuspendedRequest req next <- f
let reqBytes = case HTTP.requestBody req of let reqBytes = case HTTP.requestBody req of
HTTP.RequestBodyBS bs -> Just (fromIntegral (BS.length bs)) HTTP.RequestBodyBS bs -> Just (fromIntegral (BS.length bs))
HTTP.RequestBodyLBS bs -> Just (BL.length bs) HTTP.RequestBodyLBS bs -> Just (BL.length bs)
@ -261,4 +255,4 @@ traceHttpRequest name f = trace name do
let req' = req { HTTP.requestHeaders = let req' = req { HTTP.requestHeaders =
injectHttpContext ctx <> HTTP.requestHeaders req injectHttpContext ctx <> HTTP.requestHeaders req
} }
next req' f req'