frontends/graphql crate (#1049)

<!-- The PR description should answer 2 important questions: -->

### What

This extracts a GraphQL frontend crate from the `execute` crate. This
contains all the top level functions, as opposed to what remains in
`execute`, which concerns itself with the creation and execution of
plans.

This only moves code around and organises it a little more neatly, and
so makes no functional changes. The follow up will be to create and
expose a neater API.

Next steps:

- Arguably what is left in `execute` is now better named `plan`, but
this PR was already big enough.
- Once we have more than one `frontend`, the functions in `steps.rs` in
this crate should probably be shared somewhere.

### How

Moving files around.

V3_GIT_ORIGIN_REV_ID: 0e54310a27d06905ed1967395e97ab47751b65dc
This commit is contained in:
Daniel Harvey 2024-09-10 16:09:22 +01:00 committed by hasura-bot
parent a275966b7b
commit 50e1ab5a67
60 changed files with 845 additions and 743 deletions

29
v3/Cargo.lock generated
View File

@ -1781,6 +1781,7 @@ dependencies = [
"execute",
"futures-util",
"goldenfile",
"graphql-frontend",
"hasura-authn-core",
"hasura-authn-jwt",
"hasura-authn-noauth",
@ -1868,10 +1869,8 @@ dependencies = [
"axum",
"base64 0.22.1",
"bytes",
"criterion",
"futures-ext",
"futures-util",
"goldenfile",
"hasura-authn-core",
"indexmap 2.5.0",
"ir",
@ -2148,6 +2147,32 @@ dependencies = [
"yansi 1.0.1",
]
[[package]]
name = "graphql-frontend"
version = "3.0.0"
dependencies = [
"async-recursion",
"axum",
"criterion",
"execute",
"goldenfile",
"hasura-authn-core",
"indexmap 2.5.0",
"ir",
"lang-graphql",
"metadata-resolve",
"ndc-models 0.1.6",
"ndc-models 0.2.0",
"nonempty",
"open-dds",
"reqwest",
"schema",
"serde",
"serde_json",
"thiserror",
"tracing-util",
]
[[package]]
name = "graphql-parser"
version = "0.4.0"

View File

@ -11,6 +11,7 @@ members = [
"crates/custom-connector",
"crates/engine",
"crates/execute",
"crates/frontends/*",
"crates/ir",
"crates/jsonapi",
"crates/lang-graphql",

View File

@ -23,6 +23,7 @@ harness = false
[dependencies]
axum-ext = { path = "../utils/axum-ext" }
execute = { path = "../execute" }
graphql-frontend = { path = "../frontends/graphql" }
hasura-authn-core = { path = "../auth/hasura-authn-core" }
hasura-authn-jwt = { path = "../auth/hasura-authn-jwt" }
hasura-authn-noauth = { path = "../auth/hasura-authn-noauth" }

View File

@ -1,7 +1,8 @@
use core::time::Duration;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, SamplingMode};
use execute::HttpContext;
use execute::{execute_mutation_plan, execute_query_plan, generate_request_plan};
use execute::{execute_query_internal, generate_ir, HttpContext};
use graphql_frontend::{execute_query_internal, generate_ir};
use hasura_authn_core::Identity;
use lang_graphql::http::RawRequest;
use open_dds::permissions::Role;

View File

@ -596,7 +596,7 @@ async fn handle_request(
|| {
{
Box::pin(
execute::execute_query(
graphql_frontend::execute_query(
state.expose_internal_errors,
&state.http_context,
&state.graphql_state,
@ -627,7 +627,7 @@ async fn handle_explain_request(
State(state): State<Arc<EngineState>>,
Extension(session): Extension<Session>,
Json(request): Json<gql::http::RawRequest>,
) -> execute::ExplainResponse {
) -> graphql_frontend::ExplainResponse {
let tracer = tracing_util::global_tracer();
let response = tracer
.in_span_async(
@ -636,7 +636,7 @@ async fn handle_explain_request(
SpanVisibility::User,
|| {
Box::pin(
execute::execute_explain(
graphql_frontend::execute_explain(
state.expose_internal_errors,
&state.http_context,
&state.graphql_state,

View File

@ -18,7 +18,8 @@ use std::{
path::PathBuf,
};
use execute::{execute_query, HttpContext};
use execute::HttpContext;
use graphql_frontend::execute_query;
use schema::GDS;
extern crate json_value_merge;
@ -513,7 +514,7 @@ pub fn test_execute_explain(
query,
variables: None,
};
let (_, raw_response) = execute::execute_explain(
let (_, raw_response) = graphql_frontend::execute_explain(
execute::ExposeInternalErrors::Expose,
&test_ctx.http_context,
&schema,
@ -523,7 +524,7 @@ pub fn test_execute_explain(
)
.await;
let response = execute::redact_ndc_explain(raw_response);
let response = graphql_frontend::redact_ndc_explain(raw_response);
let mut expected = test_ctx.mint.new_goldenfile_with_differ(
expected_response_file,

View File

@ -7,10 +7,6 @@ license.workspace = true
[lib]
bench = false
[[bench]]
name = "generate_ir"
harness = false
[dependencies]
hasura-authn-core = { path = "../auth/hasura-authn-core" }
ir = { path = "../ir" }
@ -41,8 +37,6 @@ transitive = { workspace = true }
url = { workspace = true }
[dev-dependencies]
criterion = { workspace = true, features = ["html_reports", "async_tokio"] }
goldenfile = { workspace = true }
mockito = { workspace = true }
pretty_assertions = { workspace = true }

View File

@ -1,671 +1,21 @@
pub mod error;
mod explain;
mod error;
pub mod ndc;
pub mod plan;
mod process_response;
mod query_usage;
mod remote_joins;
use gql::normalized_ast::Operation;
use hasura_authn_core::Session;
use lang_graphql as gql;
use lang_graphql::ast::common as ast;
use lang_graphql::{
http::{RawRequest, Response},
schema::Schema,
};
use plan::ExecuteQueryResult;
use schema::{GDSRoleNamespaceGetter, GDS};
use tracing_util::{
set_attribute_on_active_span, AttributeVisibility, ErrorVisibility, SpanVisibility, Traceable,
TraceableError,
};
mod types;
// we explicitly export things used by other crates
pub use explain::execute_explain;
pub use explain::types::{redact_ndc_explain, ExplainResponse};
pub use error::{FieldError, QueryUsageAnalyzeError, RequestError};
pub use ndc::fetch_from_data_connector;
pub use plan::{execute_mutation_plan, execute_query_plan, generate_request_plan, RequestPlan};
/// Context for making HTTP requests
#[derive(Debug, Clone)]
pub struct HttpContext {
/// The HTTP client to use for making requests
pub client: reqwest::Client,
/// Response size limit for NDC requests
pub ndc_response_size_limit: Option<usize>,
}
#[derive(Debug)]
/// A simple wrapper around a reference of GraphQL errors
pub struct GraphQLErrors<'a>(pub &'a nonempty::NonEmpty<gql::http::GraphQLError>);
impl<'a> std::fmt::Display for GraphQLErrors<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let messages = self
.0
.iter()
.map(|e| e.message.as_str())
.collect::<Vec<_>>();
write!(f, "{}", messages.join(", "))
}
}
/// Implement traceable error for GraphQL Errors
impl<'a> TraceableError for GraphQLErrors<'a> {
fn visibility(&self) -> ErrorVisibility {
// Traces related to GraphQL errors are always visible to the user
ErrorVisibility::User
}
}
/// A simple wrapper around a GraphQL HTTP response
pub struct GraphQLResponse(gql::http::Response);
impl GraphQLResponse {
pub fn from_result(
result: ExecuteQueryResult,
expose_internal_errors: ExposeInternalErrors,
) -> Self {
Self(result.to_graphql_response(expose_internal_errors))
}
pub fn from_error(
err: &error::RequestError,
expose_internal_errors: ExposeInternalErrors,
) -> Self {
Self(Response::error(
err.to_graphql_error(expose_internal_errors),
axum::http::HeaderMap::default(),
))
}
pub fn from_response(response: gql::http::Response) -> Self {
Self(response)
}
pub fn does_contain_error(&self) -> bool {
self.0.does_contains_error()
}
pub fn inner(self) -> gql::http::Response {
self.0
}
}
/// Implement traceable for GraphQL Response
impl Traceable for GraphQLResponse {
type ErrorType<'a> = GraphQLErrors<'a>;
fn get_error(&self) -> Option<GraphQLErrors<'_>> {
self.0.errors.as_ref().map(GraphQLErrors)
}
}
#[derive(Clone, Debug)]
pub struct ProjectId(pub String);
pub async fn execute_query(
expose_internal_errors: ExposeInternalErrors,
http_context: &HttpContext,
schema: &Schema<GDS>,
session: &Session,
request_headers: &reqwest::header::HeaderMap,
request: RawRequest,
project_id: Option<&ProjectId>,
) -> (Option<ast::OperationType>, GraphQLResponse) {
execute_query_internal(
expose_internal_errors,
http_context,
schema,
session,
request_headers,
request,
project_id,
)
.await
.map_or_else(
|e| {
(
None,
GraphQLResponse::from_error(&e, expose_internal_errors),
)
},
|(op_type, response)| (Some(op_type), response),
)
}
#[derive(Debug, thiserror::Error)]
#[error("{0}")]
struct GraphQlParseError(#[from] gql::ast::spanning::Positioned<gql::parser::Error>);
impl TraceableError for GraphQlParseError {
fn visibility(&self) -> ErrorVisibility {
ErrorVisibility::User
}
}
#[derive(Debug, thiserror::Error)]
#[error("{0}")]
struct GraphQlValidationError(#[from] gql::validation::Error);
impl TraceableError for GraphQlValidationError {
fn visibility(&self) -> ErrorVisibility {
ErrorVisibility::User
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ExposeInternalErrors {
Expose,
Censor,
}
/// Executes a GraphQL query
pub async fn execute_query_internal(
expose_internal_errors: ExposeInternalErrors,
http_context: &HttpContext,
schema: &gql::schema::Schema<GDS>,
session: &Session,
request_headers: &reqwest::header::HeaderMap,
raw_request: gql::http::RawRequest,
project_id: Option<&ProjectId>,
) -> Result<(ast::OperationType, GraphQLResponse), error::RequestError> {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(
"execute_query",
"Execute query request",
SpanVisibility::User,
|| {
if let Some(name) = &raw_request.operation_name {
tracing_util::set_attribute_on_active_span(
AttributeVisibility::Default,
"operation_name",
name.to_string(),
);
}
tracing_util::set_attribute_on_active_span(
AttributeVisibility::Default,
"session.role",
session.role.to_string(),
);
tracing_util::set_attribute_on_active_span(
AttributeVisibility::Default,
"request.graphql_query",
raw_request.query.clone(),
);
Box::pin(async {
// parse the raw request into a GQL query
let query = parse_query(&raw_request.query)?;
// normalize the parsed GQL query
let normalized_request =
normalize_request(schema, session, query, raw_request)?;
// generate IR
let ir = build_ir(schema, session, request_headers, &normalized_request)?;
// construct a plan to execute the request
let request_plan = build_request_plan(&ir)?;
let display_name = match normalized_request.name {
Some(ref name) => std::borrow::Cow::Owned(format!("Execute {name}")),
None => std::borrow::Cow::Borrowed("Execute request plan"),
};
// execute the query plan
let response = tracer
.in_span_async("execute", display_name, SpanVisibility::User, || {
let all_usage_counts = ir::get_all_usage_counts_in_query(&ir);
let serialized_data = serde_json::to_string(&all_usage_counts).unwrap();
set_attribute_on_active_span(
AttributeVisibility::Default,
"usage_counts",
serialized_data,
);
let execute_response = Box::pin(async {
let execute_query_result = match request_plan {
plan::RequestPlan::MutationPlan(mutation_plan) => {
plan::execute_mutation_plan(
http_context,
mutation_plan,
project_id,
)
.await
}
plan::RequestPlan::QueryPlan(query_plan) => {
plan::execute_query_plan(
http_context,
query_plan,
project_id,
)
.await
}
};
GraphQLResponse::from_result(
execute_query_result,
expose_internal_errors,
)
});
// Analyze the query usage
// It is attached to this span as an attribute
match analyze_query_usage(&normalized_request) {
Err(analyze_error) => {
// Set query usage analytics error as a span attribute
set_attribute_on_active_span(
AttributeVisibility::Internal,
"query_usage_analytics_error",
analyze_error.to_string(),
);
}
Ok(query_usage_analytics) => {
// Set query usage analytics as a span attribute
set_attribute_on_active_span(
AttributeVisibility::Internal,
"query_usage_analytics",
query_usage_analytics,
);
}
}
execute_response
})
.await;
Ok((normalized_request.ty, response))
})
},
)
.await
}
/// Explains (query plan) a GraphQL query
pub async fn explain_query_internal(
expose_internal_errors: ExposeInternalErrors,
http_context: &HttpContext,
schema: &gql::schema::Schema<GDS>,
session: &Session,
request_headers: &reqwest::header::HeaderMap,
raw_request: gql::http::RawRequest,
) -> Result<(ast::OperationType, explain::types::ExplainResponse), error::RequestError> {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(
"explain_query",
"Execute explain request",
SpanVisibility::User,
|| {
tracing_util::set_attribute_on_active_span(
AttributeVisibility::Default,
"session.role",
session.role.to_string(),
);
tracing_util::set_attribute_on_active_span(
AttributeVisibility::Default,
"request.graphql_query",
raw_request.query.to_string(),
);
Box::pin(async {
// parse the raw request into a GQL query
let query = parse_query(&raw_request.query)?;
// normalize the parsed GQL query
let normalized_request =
normalize_request(schema, session, query, raw_request)?;
// generate IR
let ir = build_ir(schema, session, request_headers, &normalized_request)?;
// construct a plan to execute the request
let request_plan = build_request_plan(&ir)?;
// explain the query plan
let response = tracer
.in_span_async(
"explain",
"Explain request plan",
SpanVisibility::Internal,
|| {
Box::pin(async {
let request_result = match request_plan {
plan::RequestPlan::MutationPlan(mutation_plan) => {
crate::explain::explain_mutation_plan(
expose_internal_errors,
http_context,
mutation_plan,
)
.await
}
plan::RequestPlan::QueryPlan(query_plan) => {
crate::explain::explain_query_plan(
expose_internal_errors,
http_context,
query_plan,
)
.await
}
};
// convert the query plan to explain step
match request_result {
Ok(step) => step.make_explain_response(),
Err(e) => explain::types::ExplainResponse::error(
e.to_graphql_error(expose_internal_errors),
),
}
})
},
)
.await;
Ok((normalized_request.ty, response))
})
},
)
.await
}
/// Parses a raw GraphQL request into a GQL query AST
pub(crate) fn parse_query(
query: &str,
) -> Result<
gql::ast::executable::ExecutableDocument,
gql::ast::spanning::Positioned<gql::parser::Error>,
> {
let tracer = tracing_util::global_tracer();
let query = tracer
.in_span(
"parse",
"Parse the raw request into a GraphQL query",
SpanVisibility::Internal,
|| {
gql::parser::Parser::new(query)
.parse_executable_document()
.map_err(GraphQlParseError)
},
)
.map_err(|e| e.0)?;
Ok(query)
}
/// Normalize the parsed GQL query
pub(crate) fn normalize_request<'s>(
schema: &'s gql::schema::Schema<GDS>,
session: &Session,
query: gql::ast::executable::ExecutableDocument,
raw_request: gql::http::RawRequest,
) -> Result<Operation<'s, GDS>, gql::validation::Error> {
let tracer = tracing_util::global_tracer();
let normalized_request = tracer
.in_span(
"validate",
"Normalize the parsed GraphQL query",
SpanVisibility::Internal,
|| {
// add the operation name even if validation fails
if let Some(name) = &raw_request.operation_name {
set_attribute_on_active_span(
AttributeVisibility::Default,
"operation_name",
name.to_string(),
);
}
let request = gql::http::Request {
operation_name: raw_request.operation_name,
query,
variables: raw_request.variables.unwrap_or_default(),
};
gql::validation::normalize_request(
&GDSRoleNamespaceGetter {
scope: session.role.clone(),
},
schema,
&request,
)
.map_err(GraphQlValidationError)
},
)
.map_err(|e| e.0)?;
Ok(normalized_request)
}
/// Generate IR for the request
pub(crate) fn build_ir<'n, 's>(
schema: &'s gql::schema::Schema<GDS>,
session: &Session,
request_headers: &reqwest::header::HeaderMap,
normalized_request: &'s Operation<'s, GDS>,
) -> Result<ir::IR<'n, 's>, ir::Error> {
let tracer = tracing_util::global_tracer();
let ir = tracer.in_span(
"generate_ir",
"Generate IR for the request",
SpanVisibility::Internal,
|| generate_ir(schema, session, request_headers, normalized_request),
)?;
Ok(ir)
}
/// Build a plan to execute the request
pub(crate) fn build_request_plan<'n, 's, 'ir>(
ir: &'ir ir::IR<'n, 's>,
) -> Result<plan::RequestPlan<'n, 's, 'ir>, plan::error::Error> {
let tracer = tracing_util::global_tracer();
let plan = tracer.in_span(
"plan",
"Construct a plan to execute the request",
SpanVisibility::Internal,
|| plan::generate_request_plan(ir),
)?;
Ok(plan)
}
pub fn generate_ir<'n, 's>(
schema: &'s gql::schema::Schema<GDS>,
session: &Session,
request_headers: &reqwest::header::HeaderMap,
normalized_request: &'s Operation<'s, GDS>,
) -> Result<ir::IR<'n, 's>, ir::Error> {
match &normalized_request.ty {
ast::OperationType::Query => {
let query_ir = ir::generate_query_ir(
schema,
session,
request_headers,
&normalized_request.selection_set,
)?;
Ok(ir::IR::Query(query_ir))
}
ast::OperationType::Mutation => {
let mutation_ir = ir::generate_mutation_ir(
&normalized_request.selection_set,
&session.variables,
request_headers,
)?;
Ok(ir::IR::Mutation(mutation_ir))
}
ast::OperationType::Subscription => {
Err(ir::InternalDeveloperError::SubscriptionsNotSupported)?
}
}
}
fn analyze_query_usage<'s>(
normalized_request: &'s Operation<'s, GDS>,
) -> Result<String, error::QueryUsageAnalyzeError> {
let tracer = tracing_util::global_tracer();
tracer.in_span(
"analyze_query_usage",
"Analyze query usage",
SpanVisibility::Internal,
|| {
let query_usage_analytics = query_usage::analyze_query_usage(normalized_request);
Ok(serde_json::to_string(&query_usage_analytics)?)
},
)
}
#[cfg(test)]
mod tests {
use goldenfile::{differs::text_diff, Mint};
use hasura_authn_core::{Identity, Role, Session, SessionVariableValue};
use lang_graphql::http::Request;
use lang_graphql::{parser::Parser, validation::normalize_request};
use open_dds::session_variables::{SessionVariable, SESSION_VARIABLE_ROLE};
use serde_json as json;
use std::{
collections::HashMap,
fs::{self, File},
io::Write,
path::PathBuf,
};
use super::generate_ir;
use super::query_usage::analyze_query_usage;
use schema::GDS;
#[test]
fn test_generate_ir() -> Result<(), Box<dyn std::error::Error>> {
let test_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tests");
let mut mint = Mint::new(&test_dir);
let schema = fs::read_to_string(test_dir.join("schema.json"))?;
let gds = GDS::new_with_default_flags(open_dds::Metadata::from_json_str(&schema)?)?;
let schema = GDS::build_schema(&gds)?;
for input_file in fs::read_dir(test_dir.join("generate_ir"))? {
let path = input_file?.path();
assert!(path.is_dir());
let request_headers = reqwest::header::HeaderMap::new();
let test_name = path
.file_name()
.ok_or_else(|| format!("{path:?} is not a normal file or directory"))?;
let raw_request = fs::read_to_string(path.join("request.gql"))?;
let expected_path = PathBuf::from("generate_ir")
.join(test_name)
.join("expected.json");
let session_vars_path = path.join("session_variables.json");
let session = resolve_session(session_vars_path);
let query = Parser::new(&raw_request).parse_executable_document()?;
let request = Request {
operation_name: None,
query,
variables: HashMap::new(),
};
let normalized_request = normalize_request(
&schema::GDSRoleNamespaceGetter {
scope: session.role.clone(),
},
&schema,
&request,
)?;
let ir = generate_ir(&schema, &session, &request_headers, &normalized_request)?;
let mut expected = mint.new_goldenfile_with_differ(
expected_path,
Box::new(|file1, file2| {
let json1: serde_json::Value =
serde_json::from_reader(File::open(file1).unwrap()).unwrap();
let json2: serde_json::Value =
serde_json::from_reader(File::open(file2).unwrap()).unwrap();
if json1 != json2 {
text_diff(file1, file2);
}
}),
)?;
write!(expected, "{}", serde_json::to_string_pretty(&ir)?)?;
}
Ok(())
}
#[test]
fn test_query_usage_analytics() -> Result<(), Box<dyn std::error::Error>> {
let test_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("query_usage_analytics");
let mut mint = Mint::new(&test_dir);
let schema = fs::read_to_string(test_dir.join("schema.json"))?;
let gds = GDS::new_with_default_flags(open_dds::Metadata::from_json_str(&schema)?)?;
let schema = GDS::build_schema(&gds)?;
for test_dir in fs::read_dir(test_dir)? {
let path = test_dir?.path();
if !path.is_dir() {
continue;
}
let test_name = path
.file_name()
.ok_or_else(|| format!("{path:?} is not a normal file or directory"))?;
let raw_request = fs::read_to_string(path.join("request.gql"))?;
let expected_path = PathBuf::from(test_name).join("expected.json");
let session_vars_path = path.join("session_variables.json");
let session = resolve_session(session_vars_path);
let query = Parser::new(&raw_request).parse_executable_document()?;
let request = Request {
operation_name: None,
query,
variables: HashMap::new(),
};
let normalized_request = normalize_request(
&schema::GDSRoleNamespaceGetter {
scope: session.role.clone(),
},
&schema,
&request,
)?;
let query_usage = analyze_query_usage(&normalized_request);
let mut expected = mint.new_goldenfile_with_differ(
expected_path,
Box::new(|file1, file2| {
let json1: serde_json::Value =
serde_json::from_reader(File::open(file1).unwrap()).unwrap();
let json2: serde_json::Value =
serde_json::from_reader(File::open(file2).unwrap()).unwrap();
if json1 != json2 {
text_diff(file1, file2);
}
}),
)?;
write!(expected, "{}", serde_json::to_string_pretty(&query_usage)?)?;
}
Ok(())
}
// TODO: remove duplication between this function and 'add_session'
fn resolve_session(session_vars_path: PathBuf) -> Session {
let authorization = Identity::admin(Role::new("admin"));
let session_variables: HashMap<SessionVariable, SessionVariableValue> = {
if session_vars_path.exists() {
json::from_str(fs::read_to_string(session_vars_path).unwrap().as_ref()).unwrap()
} else {
HashMap::new()
}
};
let role = session_variables
.get(&SESSION_VARIABLE_ROLE)
.map(|v| Role::new(&v.0));
authorization
.get_role_authorization(role.as_ref())
.unwrap()
.build_session(session_variables)
}
}
pub use plan::error::Error as PlanError;
pub use plan::filter::plan_remote_predicate;
pub use plan::query::ResolvedQueryExecutionPlan;
pub use plan::{
execute_mutation_plan, execute_query_plan, generate_request_plan, ExecuteQueryResult,
RequestPlan,
};
pub use query_usage::analyze_query_usage;
pub use remote_joins::types::{JoinId, JoinLocations, JoinNode, RemoteJoin, RemoteJoinType};
pub use types::{ExposeInternalErrors, HttpContext, ProjectId};

View File

@ -14,7 +14,7 @@ pub use arguments::{Argument, MutationArgument};
pub use field::ResolvedField;
pub use filter::{plan_expression, resolve_expression, ResolvedFilterExpression};
pub use mutation::ResolvedMutationExecutionPlan;
pub use query::{ResolvedQueryExecutionPlan, ResolvedQueryNode};
pub use query::{ResolvedQueryExecutionPlan, ResolvedQueryNode, UnresolvedQueryNode};
pub use relationships::Relationship;
use gql::normalized_ast;

View File

@ -10,7 +10,7 @@ use query_usage_analytics::{
};
use schema::GDS;
pub(crate) fn analyze_query_usage<'s>(normalized_request: &'s Operation<'s, GDS>) -> GqlOperation {
pub fn analyze_query_usage<'s>(normalized_request: &'s Operation<'s, GDS>) -> GqlOperation {
let operation_name = normalized_request
.name
.as_ref()

View File

@ -0,0 +1,17 @@
/// Context for making HTTP requests
#[derive(Debug, Clone)]
pub struct HttpContext {
/// The HTTP client to use for making requests
pub client: reqwest::Client,
/// Response size limit for NDC requests
pub ndc_response_size_limit: Option<usize>,
}
#[derive(Clone, Debug)]
pub struct ProjectId(pub String);
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ExposeInternalErrors {
Expose,
Censor,
}

View File

@ -0,0 +1,40 @@
[package]
name = "graphql-frontend"
version.workspace = true
edition.workspace = true
license.workspace = true
[lib]
bench = false
[[bench]]
name = "generate_ir"
harness = false
[dependencies]
execute = { path = "../../execute" }
hasura-authn-core = { path = "../../auth/hasura-authn-core" }
ir = { path = "../../ir" }
lang-graphql = { path = "../../lang-graphql" }
open-dds = { path = "../../open-dds" }
schema = { path = "../../schema" }
tracing-util = { path = "../../utils/tracing-util" }
metadata-resolve = {path = "../../metadata-resolve" }
async-recursion = { workspace = true }
axum = { workspace = true }
indexmap = { workspace = true, features = ["serde"] }
ndc-models = { workspace = true }
ndc-models-v01 = { workspace = true }
nonempty = { workspace = true }
reqwest = { workspace = true, features = ["json", "multipart"] }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
[dev-dependencies]
criterion = { workspace = true, features = ["html_reports", "async_tokio"] }
goldenfile = { workspace = true }
[lints]
workspace = true

View File

@ -64,8 +64,13 @@ pub fn bench_generate_ir(c: &mut Criterion) {
&(&schema, &normalized_request),
|b, (schema, normalized_request)| {
b.iter(|| {
execute::generate_ir(schema, &session, &request_headers, normalized_request)
.unwrap()
graphql_frontend::generate_ir(
schema,
&session,
&request_headers,
normalized_request,
)
.unwrap()
});
},
);

View File

@ -0,0 +1,3 @@
# graphql frontend
The "front door" for executing a graphql query, as such

View File

@ -1,32 +1,33 @@
mod predicate;
pub mod types;
use super::steps;
use std::borrow::Cow;
use super::remote_joins::types::{JoinNode, RemoteJoinType};
use super::HttpContext;
use crate::ndc::client as ndc_client;
use crate::plan::{ApolloFederationSelect, NDCQueryExecution, NodeQueryPlan, ProcessResponseAs};
use crate::remote_joins::types::{JoinId, JoinLocations, RemoteJoin};
use crate::{error, plan};
use async_recursion::async_recursion;
use execute::ndc::client as ndc_client;
use execute::plan::{
self, ApolloFederationSelect, NDCQueryExecution, NodeQueryPlan, ProcessResponseAs,
};
use execute::HttpContext;
use execute::{JoinId, JoinLocations, JoinNode, RemoteJoin, RemoteJoinType};
use hasura_authn_core::Session;
use lang_graphql as gql;
use lang_graphql::ast::common as ast;
use lang_graphql::{http::RawRequest, schema::Schema};
use nonempty::NonEmpty;
use schema::GDS;
use tracing_util::SpanVisibility;
use tracing_util::{AttributeVisibility, SpanVisibility};
pub async fn execute_explain(
expose_internal_errors: crate::ExposeInternalErrors,
expose_internal_errors: execute::ExposeInternalErrors,
http_context: &HttpContext,
schema: &Schema<GDS>,
session: &Session,
request_headers: &reqwest::header::HeaderMap,
request: RawRequest,
) -> (Option<ast::OperationType>, types::ExplainResponse) {
super::explain_query_internal(
explain_query_internal(
expose_internal_errors,
http_context,
schema,
@ -46,12 +47,97 @@ pub async fn execute_explain(
)
}
/// Explains (query plan) a GraphQL query
async fn explain_query_internal(
expose_internal_errors: execute::ExposeInternalErrors,
http_context: &HttpContext,
schema: &gql::schema::Schema<GDS>,
session: &Session,
request_headers: &reqwest::header::HeaderMap,
raw_request: gql::http::RawRequest,
) -> Result<(ast::OperationType, types::ExplainResponse), execute::RequestError> {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(
"explain_query",
"Execute explain request",
SpanVisibility::User,
|| {
tracing_util::set_attribute_on_active_span(
AttributeVisibility::Default,
"session.role",
session.role.to_string(),
);
tracing_util::set_attribute_on_active_span(
AttributeVisibility::Default,
"request.graphql_query",
raw_request.query.to_string(),
);
Box::pin(async {
// parse the raw request into a GQL query
let query = steps::parse_query(&raw_request.query)?;
// normalize the parsed GQL query
let normalized_request =
steps::normalize_request(schema, session, query, raw_request)?;
// generate IR
let ir =
steps::build_ir(schema, session, request_headers, &normalized_request)?;
// construct a plan to execute the request
let request_plan = steps::build_request_plan(&ir)?;
// explain the query plan
let response = tracer
.in_span_async(
"explain",
"Explain request plan",
SpanVisibility::Internal,
|| {
Box::pin(async {
let request_result = match request_plan {
plan::RequestPlan::MutationPlan(mutation_plan) => {
explain_mutation_plan(
expose_internal_errors,
http_context,
mutation_plan,
)
.await
}
plan::RequestPlan::QueryPlan(query_plan) => {
explain_query_plan(
expose_internal_errors,
http_context,
query_plan,
)
.await
}
};
// convert the query plan to explain step
match request_result {
Ok(step) => step.make_explain_response(),
Err(e) => types::ExplainResponse::error(
e.to_graphql_error(expose_internal_errors),
),
}
})
},
)
.await;
Ok((normalized_request.ty, response))
})
},
)
.await
}
/// Produce an /explain plan for a given GraphQL query.
pub(crate) async fn explain_query_plan(
expose_internal_errors: crate::ExposeInternalErrors,
expose_internal_errors: execute::ExposeInternalErrors,
http_context: &HttpContext,
query_plan: plan::QueryPlan<'_, '_, '_>,
) -> Result<types::Step, error::RequestError> {
) -> Result<types::Step, execute::RequestError> {
let mut parallel_root_steps = vec![];
// Here, we are assuming that all root fields are executed in parallel.
for (alias, node) in query_plan {
@ -77,12 +163,12 @@ pub(crate) async fn explain_query_plan(
.query_execution_plan
.resolve(http_context)
.await
.map_err(|e| error::RequestError::ExplainError(e.to_string()))?;
.map_err(|e| execute::RequestError::ExplainError(e.to_string()))?;
let data_connector = resolved_execution_plan.data_connector;
let ndc_request =
plan::ndc_request::make_ndc_query_request(resolved_execution_plan)
.map_err(|e| error::RequestError::ExplainError(e.to_string()))?;
.map_err(|e| execute::RequestError::ExplainError(e.to_string()))?;
let sequence_steps = get_execution_steps(
expose_internal_errors,
@ -122,12 +208,12 @@ pub(crate) async fn explain_query_plan(
.query_execution_plan
.resolve(http_context)
.await
.map_err(|e| error::RequestError::ExplainError(e.to_string()))?;
.map_err(|e| execute::RequestError::ExplainError(e.to_string()))?;
let data_connector = resolved_execution_plan.data_connector;
let ndc_request =
plan::ndc_request::make_ndc_query_request(resolved_execution_plan)
.map_err(|e| error::RequestError::ExplainError(e.to_string()))?;
.map_err(|e| execute::RequestError::ExplainError(e.to_string()))?;
let sequence_steps = get_execution_steps(
expose_internal_errors,
@ -157,12 +243,12 @@ pub(crate) async fn explain_query_plan(
| NodeQueryPlan::ApolloFederationSelect(ApolloFederationSelect::ServiceField {
..
}) => {
return Err(error::RequestError::ExplainError(
return Err(execute::RequestError::ExplainError(
"cannot explain introspection queries".to_string(),
));
}
NodeQueryPlan::RelayNodeSelect(None) => {
return Err(error::RequestError::ExplainError(
return Err(execute::RequestError::ExplainError(
"cannot explain relay queries with no execution plan".to_string(),
));
}
@ -175,7 +261,7 @@ pub(crate) async fn explain_query_plan(
simplify_step(Box::new(types::Step::Parallel(parallel_root_steps)));
Ok(*simplified_step)
}
None => Err(error::RequestError::ExplainError(
None => Err(execute::RequestError::ExplainError(
"cannot explain query as there are no explainable root field".to_string(),
)),
}
@ -183,14 +269,14 @@ pub(crate) async fn explain_query_plan(
/// Produce an /explain plan for a given GraphQL mutation.
pub(crate) async fn explain_mutation_plan(
expose_internal_errors: crate::ExposeInternalErrors,
expose_internal_errors: execute::ExposeInternalErrors,
http_context: &HttpContext,
mutation_plan: plan::MutationPlan<'_, '_, '_>,
) -> Result<types::Step, error::RequestError> {
) -> Result<types::Step, execute::RequestError> {
let mut root_steps = vec![];
if !mutation_plan.type_names.is_empty() {
return Err(error::RequestError::ExplainError(
return Err(execute::RequestError::ExplainError(
"cannot explain introspection queries".to_string(),
));
}
@ -213,11 +299,11 @@ pub(crate) async fn explain_mutation_plan(
.execution_node
.resolve(http_context)
.await
.map_err(|e| error::RequestError::ExplainError(e.to_string()))?;
.map_err(|e| execute::RequestError::ExplainError(e.to_string()))?;
let mutation_request =
plan::ndc_request::make_ndc_mutation_request(resolved_execution_plan)
.map_err(|e| error::RequestError::ExplainError(e.to_string()))?;
.map_err(|e| execute::RequestError::ExplainError(e.to_string()))?;
let sequence_steps = get_execution_steps(
expose_internal_errors,
@ -243,21 +329,21 @@ pub(crate) async fn explain_mutation_plan(
let simplified_step = simplify_step(Box::new(types::Step::Sequence(root_steps)));
Ok(*simplified_step)
}
None => Err(error::RequestError::ExplainError(
None => Err(execute::RequestError::ExplainError(
"cannot explain mutation as there are no explainable root fields".to_string(),
)),
}
}
async fn get_execution_steps<'s>(
expose_internal_errors: crate::ExposeInternalErrors,
expose_internal_errors: execute::ExposeInternalErrors,
http_context: &HttpContext,
alias: gql::ast::common::Alias,
process_response_as: &ProcessResponseAs<'s>,
join_locations: JoinLocations<(RemoteJoin<'s, '_>, JoinId)>,
ndc_request: types::NDCRequest,
data_connector: &metadata_resolve::DataConnectorLink,
) -> Result<NonEmpty<Box<types::Step>>, error::RequestError> {
) -> Result<NonEmpty<Box<types::Step>>, execute::RequestError> {
let mut sequence_steps = match process_response_as {
ProcessResponseAs::CommandResponse { .. } => {
// A command execution node
@ -309,10 +395,10 @@ async fn get_execution_steps<'s>(
/// TODO: Currently the steps are sequential, we should make them parallel once the executor supports it.
#[async_recursion]
async fn get_join_steps(
expose_internal_errors: crate::ExposeInternalErrors,
expose_internal_errors: execute::ExposeInternalErrors,
join_locations: JoinLocations<(RemoteJoin<'async_recursion, 'async_recursion>, JoinId)>,
http_context: &HttpContext,
) -> Result<Option<NonEmpty<Box<types::Step>>>, error::RequestError> {
) -> Result<Option<NonEmpty<Box<types::Step>>>, execute::RequestError> {
let mut sequence_join_steps = vec![];
for (alias, location) in join_locations.locations {
let mut sequence_steps = vec![];
@ -321,12 +407,12 @@ async fn get_join_steps(
.target_ndc_execution
.resolve(http_context)
.await
.map_err(|e| error::RequestError::ExplainError(e.to_string()))?;
.map_err(|e| execute::RequestError::ExplainError(e.to_string()))?;
resolved_execution_plan.variables = Some(vec![]);
let target_data_connector = resolved_execution_plan.data_connector;
let query_request = plan::ndc_request::make_ndc_query_request(resolved_execution_plan)
.map_err(|e| error::RequestError::ExplainError(e.to_string()))?;
.map_err(|e| execute::RequestError::ExplainError(e.to_string()))?;
let ndc_request = types::NDCRequest::Query(query_request);
@ -401,7 +487,7 @@ fn simplify_step(step: Box<types::Step>) -> Box<types::Step> {
}
pub(crate) async fn fetch_explain_from_data_connector(
expose_internal_errors: crate::ExposeInternalErrors,
expose_internal_errors: execute::ExposeInternalErrors,
http_context: &HttpContext,
ndc_request: &types::NDCRequest,
data_connector: &metadata_resolve::DataConnectorLink,
@ -427,7 +513,7 @@ pub(crate) async fn fetch_explain_from_data_connector(
ndc_client::explain_query_post(ndc_config, query_request)
.await
.map(Some)
.map_err(error::FieldError::from)
.map_err(execute::FieldError::from)
} else {
Ok(None)
}
@ -437,7 +523,7 @@ pub(crate) async fn fetch_explain_from_data_connector(
ndc_client::explain_mutation_post(ndc_config, mutation_request)
.await
.map(Some)
.map_err(error::FieldError::from)
.map_err(execute::FieldError::from)
} else {
Ok(None)
}

View File

@ -1,22 +1,21 @@
use super::fetch_explain_from_data_connector;
use super::types;
use super::HttpContext;
use crate::error;
use crate::explain::fetch_explain_from_data_connector;
use crate::plan;
use crate::plan::field::{UnresolvedField, UnresolvedNestedField};
use crate::plan::query::UnresolvedQueryNode;
use async_recursion::async_recursion;
use execute::plan;
use execute::plan::field::{UnresolvedField, UnresolvedNestedField};
use execute::plan::UnresolvedQueryNode;
use execute::HttpContext;
use indexmap::IndexMap;
use ir::NdcFieldAlias;
use std::collections::BTreeMap;
#[async_recursion]
pub(crate) async fn explain_query_predicate_node<'s>(
expose_internal_errors: &crate::ExposeInternalErrors,
expose_internal_errors: &execute::ExposeInternalErrors,
http_context: &HttpContext,
node: &UnresolvedQueryNode<'s>,
steps: &mut Vec<types::Step>,
) -> Result<(), error::RequestError> {
) -> Result<(), execute::RequestError> {
// Generate explain steps for involved remote relationships in the predicate
if let Some(filter_predicate) = &node.predicate {
explain_query_predicate(
@ -40,11 +39,11 @@ pub(crate) async fn explain_query_predicate_node<'s>(
}
async fn explain_query_predicate_fields<'s, 'a>(
expose_internal_errors: &crate::ExposeInternalErrors,
expose_internal_errors: &execute::ExposeInternalErrors,
http_context: &HttpContext,
fields: Option<&'a IndexMap<NdcFieldAlias, UnresolvedField<'s>>>,
steps: &mut Vec<types::Step>,
) -> Result<(), error::RequestError> {
) -> Result<(), execute::RequestError> {
if let Some(fields) = fields {
for field in fields.values() {
match field {
@ -74,11 +73,11 @@ async fn explain_query_predicate_fields<'s, 'a>(
#[async_recursion]
pub(crate) async fn explain_query_predicate_nested_field<'s, 'a>(
expose_internal_errors: &crate::ExposeInternalErrors,
expose_internal_errors: &execute::ExposeInternalErrors,
http_context: &HttpContext,
nested_field: Option<&'a UnresolvedNestedField<'s>>,
steps: &mut Vec<types::Step>,
) -> Result<(), error::RequestError> {
) -> Result<(), execute::RequestError> {
if let Some(nested_field) = nested_field {
match nested_field {
UnresolvedNestedField::Object(nested_object) => {
@ -106,11 +105,11 @@ pub(crate) async fn explain_query_predicate_nested_field<'s, 'a>(
#[async_recursion]
async fn explain_query_predicate<'s>(
expose_internal_errors: &crate::ExposeInternalErrors,
expose_internal_errors: &execute::ExposeInternalErrors,
http_context: &HttpContext,
predicate: &ir::Expression<'s>,
steps: &mut Vec<types::Step>,
) -> Result<(), error::RequestError> {
) -> Result<(), execute::RequestError> {
match predicate {
ir::Expression::And { expressions } => {
for expression in expressions {
@ -133,8 +132,6 @@ async fn explain_query_predicate<'s>(
| ir::Expression::RelationshipNdcPushdown { .. }
| ir::Expression::LocalNestedArray { .. } => Ok(()),
// Needs to be resolved in the Engine by making a request to the target data connector,
// for which we need to explain the corresponding NDC request.
ir::Expression::RelationshipEngineResolved {
relationship: _,
target_model_name,
@ -143,8 +140,8 @@ async fn explain_query_predicate<'s>(
predicate,
} => {
let (remote_query_node, collection_relationships) =
plan::filter::plan_remote_predicate(ndc_column_mapping, predicate)
.map_err(|e| error::RequestError::ExplainError(e.to_string()))?;
execute::plan_remote_predicate(ndc_column_mapping, predicate)
.map_err(|e| execute::RequestError::ExplainError(e.to_string()))?;
explain_query_predicate_node(
expose_internal_errors,
@ -157,9 +154,9 @@ async fn explain_query_predicate<'s>(
let resolved_query_node = remote_query_node
.resolve(http_context)
.await
.map_err(|e| error::RequestError::ExplainError(e.to_string()))?;
.map_err(|e| execute::RequestError::ExplainError(e.to_string()))?;
let query_execution_plan = plan::query::ResolvedQueryExecutionPlan {
let query_execution_plan = execute::ResolvedQueryExecutionPlan {
query_node: resolved_query_node,
collection: target_model_source.collection.clone(),
arguments: BTreeMap::new(),
@ -169,7 +166,7 @@ async fn explain_query_predicate<'s>(
};
let ndc_query_request = plan::ndc_request::make_ndc_query_request(query_execution_plan)
.map_err(|e| error::RequestError::ExplainError(e.to_string()))?;
.map_err(|e| execute::RequestError::ExplainError(e.to_string()))?;
let ndc_request = types::NDCRequest::Query(ndc_query_request);
let data_connector_explain = fetch_explain_from_data_connector(

View File

@ -8,9 +8,8 @@ use ndc_models as ndc_models_v02;
use ndc_models_v01;
use tracing_util::Traceable;
use crate::error;
use crate::ndc;
use crate::GraphQLErrors;
use super::super::types::GraphQLErrors;
use execute::ndc;
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
@ -110,8 +109,8 @@ pub(crate) enum NDCRequest {
impl NDCExplainResponse {
pub(crate) fn error(
error: &error::FieldError,
expose_internal_errors: crate::ExposeInternalErrors,
error: &execute::FieldError,
expose_internal_errors: execute::ExposeInternalErrors,
) -> Self {
Self::Error(error.to_graphql_error(expose_internal_errors, None))
}

View File

@ -0,0 +1,174 @@
mod explain;
mod query;
mod steps;
mod types;
pub use explain::execute_explain;
pub use explain::types::{redact_ndc_explain, ExplainResponse};
pub use query::{execute_query, execute_query_internal};
pub use steps::generate_ir;
pub use types::{GraphQLErrors, GraphQLResponse};
#[cfg(test)]
mod tests {
use goldenfile::{differs::text_diff, Mint};
use hasura_authn_core::{Identity, Role, Session, SessionVariableValue};
use lang_graphql::http::Request;
use lang_graphql::{parser::Parser, validation::normalize_request};
use open_dds::session_variables::{SessionVariable, SESSION_VARIABLE_ROLE};
use serde_json as json;
use std::{
collections::HashMap,
fs::{self, File},
io::Write,
path::PathBuf,
};
use crate::generate_ir;
use execute::analyze_query_usage;
use schema::GDS;
#[test]
fn test_generate_ir() -> Result<(), Box<dyn std::error::Error>> {
let test_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tests");
let mut mint = Mint::new(&test_dir);
let schema = fs::read_to_string(test_dir.join("schema.json"))?;
let gds = GDS::new_with_default_flags(open_dds::Metadata::from_json_str(&schema)?)?;
let schema = GDS::build_schema(&gds)?;
for input_file in fs::read_dir(test_dir.join("generate_ir"))? {
let path = input_file?.path();
assert!(path.is_dir());
let request_headers = reqwest::header::HeaderMap::new();
let test_name = path
.file_name()
.ok_or_else(|| format!("{path:?} is not a normal file or directory"))?;
let raw_request = fs::read_to_string(path.join("request.gql"))?;
let expected_path = PathBuf::from("generate_ir")
.join(test_name)
.join("expected.json");
let session_vars_path = path.join("session_variables.json");
let session = resolve_session(session_vars_path);
let query = Parser::new(&raw_request).parse_executable_document()?;
let request = Request {
operation_name: None,
query,
variables: HashMap::new(),
};
let normalized_request = normalize_request(
&schema::GDSRoleNamespaceGetter {
scope: session.role.clone(),
},
&schema,
&request,
)?;
let ir = generate_ir(&schema, &session, &request_headers, &normalized_request)?;
let mut expected = mint.new_goldenfile_with_differ(
expected_path,
Box::new(|file1, file2| {
let json1: serde_json::Value =
serde_json::from_reader(File::open(file1).unwrap()).unwrap();
let json2: serde_json::Value =
serde_json::from_reader(File::open(file2).unwrap()).unwrap();
if json1 != json2 {
text_diff(file1, file2);
}
}),
)?;
write!(expected, "{}", serde_json::to_string_pretty(&ir)?)?;
}
Ok(())
}
#[test]
fn test_query_usage_analytics() -> Result<(), Box<dyn std::error::Error>> {
let test_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests")
.join("query_usage_analytics");
let mut mint = Mint::new(&test_dir);
let schema = fs::read_to_string(test_dir.join("schema.json"))?;
let gds = GDS::new_with_default_flags(open_dds::Metadata::from_json_str(&schema)?)?;
let schema = GDS::build_schema(&gds)?;
for test_dir in fs::read_dir(test_dir)? {
let path = test_dir?.path();
if !path.is_dir() {
continue;
}
let test_name = path
.file_name()
.ok_or_else(|| format!("{path:?} is not a normal file or directory"))?;
let raw_request = fs::read_to_string(path.join("request.gql"))?;
let expected_path = PathBuf::from(test_name).join("expected.json");
let session_vars_path = path.join("session_variables.json");
let session = resolve_session(session_vars_path);
let query = Parser::new(&raw_request).parse_executable_document()?;
let request = Request {
operation_name: None,
query,
variables: HashMap::new(),
};
let normalized_request = normalize_request(
&schema::GDSRoleNamespaceGetter {
scope: session.role.clone(),
},
&schema,
&request,
)?;
let query_usage = analyze_query_usage(&normalized_request);
let mut expected = mint.new_goldenfile_with_differ(
expected_path,
Box::new(|file1, file2| {
let json1: serde_json::Value =
serde_json::from_reader(File::open(file1).unwrap()).unwrap();
let json2: serde_json::Value =
serde_json::from_reader(File::open(file2).unwrap()).unwrap();
if json1 != json2 {
text_diff(file1, file2);
}
}),
)?;
write!(expected, "{}", serde_json::to_string_pretty(&query_usage)?)?;
}
Ok(())
}
// TODO: remove duplication between this function and 'add_session'
fn resolve_session(session_vars_path: PathBuf) -> Session {
let authorization = Identity::admin(Role::new("admin"));
let session_variables: HashMap<SessionVariable, SessionVariableValue> = {
if session_vars_path.exists() {
json::from_str(fs::read_to_string(session_vars_path).unwrap().as_ref()).unwrap()
} else {
HashMap::new()
}
};
let role = session_variables
.get(&SESSION_VARIABLE_ROLE)
.map(|v| Role::new(&v.0));
authorization
.get_role_authorization(role.as_ref())
.unwrap()
.build_session(session_variables)
}
}

View File

@ -0,0 +1,165 @@
use super::steps;
use execute::plan;
use super::types::GraphQLResponse;
use execute::{HttpContext, ProjectId};
use hasura_authn_core::Session;
use lang_graphql as gql;
use lang_graphql::ast::common as ast;
use lang_graphql::{http::RawRequest, schema::Schema};
use schema::GDS;
use tracing_util::{set_attribute_on_active_span, AttributeVisibility, SpanVisibility};
pub async fn execute_query(
expose_internal_errors: execute::ExposeInternalErrors,
http_context: &HttpContext,
schema: &Schema<GDS>,
session: &Session,
request_headers: &reqwest::header::HeaderMap,
request: RawRequest,
project_id: Option<&ProjectId>,
) -> (Option<ast::OperationType>, GraphQLResponse) {
execute_query_internal(
expose_internal_errors,
http_context,
schema,
session,
request_headers,
request,
project_id,
)
.await
.map_or_else(
|e| {
(
None,
GraphQLResponse::from_error(&e, expose_internal_errors),
)
},
|(op_type, response)| (Some(op_type), response),
)
}
/// Executes a GraphQL query
pub async fn execute_query_internal(
expose_internal_errors: execute::ExposeInternalErrors,
http_context: &HttpContext,
schema: &gql::schema::Schema<GDS>,
session: &Session,
request_headers: &reqwest::header::HeaderMap,
raw_request: gql::http::RawRequest,
project_id: Option<&ProjectId>,
) -> Result<(ast::OperationType, GraphQLResponse), execute::RequestError> {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(
"execute_query",
"Execute query request",
SpanVisibility::User,
|| {
if let Some(name) = &raw_request.operation_name {
tracing_util::set_attribute_on_active_span(
AttributeVisibility::Default,
"operation_name",
name.to_string(),
);
}
tracing_util::set_attribute_on_active_span(
AttributeVisibility::Default,
"session.role",
session.role.to_string(),
);
tracing_util::set_attribute_on_active_span(
AttributeVisibility::Default,
"request.graphql_query",
raw_request.query.clone(),
);
Box::pin(async {
// parse the raw request into a GQL query
let query = steps::parse_query(&raw_request.query)?;
// normalize the parsed GQL query
let normalized_request =
steps::normalize_request(schema, session, query, raw_request)?;
// generate IR
let ir =
steps::build_ir(schema, session, request_headers, &normalized_request)?;
// construct a plan to execute the request
let request_plan = steps::build_request_plan(&ir)?;
let display_name = match normalized_request.name {
Some(ref name) => std::borrow::Cow::Owned(format!("Execute {name}")),
None => std::borrow::Cow::Borrowed("Execute request plan"),
};
// execute the query plan
let response = tracer
.in_span_async("execute", display_name, SpanVisibility::User, || {
let all_usage_counts = ir::get_all_usage_counts_in_query(&ir);
let serialized_data = serde_json::to_string(&all_usage_counts).unwrap();
set_attribute_on_active_span(
AttributeVisibility::Default,
"usage_counts",
serialized_data,
);
let execute_response = Box::pin(async {
let execute_query_result = match request_plan {
plan::RequestPlan::MutationPlan(mutation_plan) => {
plan::execute_mutation_plan(
http_context,
mutation_plan,
project_id,
)
.await
}
plan::RequestPlan::QueryPlan(query_plan) => {
plan::execute_query_plan(
http_context,
query_plan,
project_id,
)
.await
}
};
GraphQLResponse::from_result(
execute_query_result,
expose_internal_errors,
)
});
// Analyze the query usage
// It is attached to this span as an attribute
match steps::analyze_query_usage(&normalized_request) {
Err(analyze_error) => {
// Set query usage analytics error as a span attribute
set_attribute_on_active_span(
AttributeVisibility::Internal,
"query_usage_analytics_error",
analyze_error.to_string(),
);
}
Ok(query_usage_analytics) => {
// Set query usage analytics as a span attribute
set_attribute_on_active_span(
AttributeVisibility::Internal,
"query_usage_analytics",
query_usage_analytics,
);
}
}
execute_response
})
.await;
Ok((normalized_request.ty, response))
})
},
)
.await
}

View File

@ -0,0 +1,149 @@
use super::types::{GraphQlParseError, GraphQlValidationError};
use gql::normalized_ast::Operation;
use hasura_authn_core::Session;
use lang_graphql as gql;
use lang_graphql::ast::common as ast;
use schema::{GDSRoleNamespaceGetter, GDS};
use tracing_util::{set_attribute_on_active_span, AttributeVisibility, SpanVisibility};
/// Parses a raw GraphQL request into a GQL query AST
pub(crate) fn parse_query(
query: &str,
) -> Result<
gql::ast::executable::ExecutableDocument,
gql::ast::spanning::Positioned<gql::parser::Error>,
> {
let tracer = tracing_util::global_tracer();
let query = tracer
.in_span(
"parse",
"Parse the raw request into a GraphQL query",
SpanVisibility::Internal,
|| {
gql::parser::Parser::new(query)
.parse_executable_document()
.map_err(GraphQlParseError)
},
)
.map_err(|e| e.0)?;
Ok(query)
}
/// Normalize the parsed GQL query
pub(crate) fn normalize_request<'s>(
schema: &'s gql::schema::Schema<GDS>,
session: &Session,
query: gql::ast::executable::ExecutableDocument,
raw_request: gql::http::RawRequest,
) -> Result<Operation<'s, GDS>, gql::validation::Error> {
let tracer = tracing_util::global_tracer();
let normalized_request = tracer
.in_span(
"validate",
"Normalize the parsed GraphQL query",
SpanVisibility::Internal,
|| {
// add the operation name even if validation fails
if let Some(name) = &raw_request.operation_name {
set_attribute_on_active_span(
AttributeVisibility::Default,
"operation_name",
name.to_string(),
);
}
let request = gql::http::Request {
operation_name: raw_request.operation_name,
query,
variables: raw_request.variables.unwrap_or_default(),
};
gql::validation::normalize_request(
&GDSRoleNamespaceGetter {
scope: session.role.clone(),
},
schema,
&request,
)
.map_err(GraphQlValidationError)
},
)
.map_err(|e| e.0)?;
Ok(normalized_request)
}
/// Generate IR for the request
pub(crate) fn build_ir<'n, 's>(
schema: &'s gql::schema::Schema<GDS>,
session: &Session,
request_headers: &reqwest::header::HeaderMap,
normalized_request: &'s Operation<'s, GDS>,
) -> Result<ir::IR<'n, 's>, ir::Error> {
let tracer = tracing_util::global_tracer();
let ir = tracer.in_span(
"generate_ir",
"Generate IR for the request",
SpanVisibility::Internal,
|| generate_ir(schema, session, request_headers, normalized_request),
)?;
Ok(ir)
}
/// Build a plan to execute the request
pub(crate) fn build_request_plan<'n, 's, 'ir>(
ir: &'ir ir::IR<'n, 's>,
) -> Result<execute::RequestPlan<'n, 's, 'ir>, execute::PlanError> {
let tracer = tracing_util::global_tracer();
let plan = tracer.in_span(
"plan",
"Construct a plan to execute the request",
SpanVisibility::Internal,
|| execute::generate_request_plan(ir),
)?;
Ok(plan)
}
pub fn generate_ir<'n, 's>(
schema: &'s gql::schema::Schema<GDS>,
session: &Session,
request_headers: &reqwest::header::HeaderMap,
normalized_request: &'s Operation<'s, GDS>,
) -> Result<ir::IR<'n, 's>, ir::Error> {
match &normalized_request.ty {
ast::OperationType::Query => {
let query_ir = ir::generate_query_ir(
schema,
session,
request_headers,
&normalized_request.selection_set,
)?;
Ok(ir::IR::Query(query_ir))
}
ast::OperationType::Mutation => {
let mutation_ir = ir::generate_mutation_ir(
&normalized_request.selection_set,
&session.variables,
request_headers,
)?;
Ok(ir::IR::Mutation(mutation_ir))
}
ast::OperationType::Subscription => {
Err(ir::InternalDeveloperError::SubscriptionsNotSupported)?
}
}
}
pub fn analyze_query_usage<'s>(
normalized_request: &'s Operation<'s, GDS>,
) -> Result<String, execute::QueryUsageAnalyzeError> {
let tracer = tracing_util::global_tracer();
tracer.in_span(
"analyze_query_usage",
"Analyze query usage",
SpanVisibility::Internal,
|| {
let query_usage_analytics = execute::analyze_query_usage(normalized_request);
Ok(serde_json::to_string(&query_usage_analytics)?)
},
)
}

View File

@ -0,0 +1,88 @@
use lang_graphql as gql;
use lang_graphql::http::Response;
use tracing_util::{ErrorVisibility, Traceable, TraceableError};
#[derive(Debug)]
/// A simple wrapper around a reference of GraphQL errors
pub struct GraphQLErrors<'a>(pub &'a nonempty::NonEmpty<gql::http::GraphQLError>);
impl<'a> std::fmt::Display for GraphQLErrors<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let messages = self
.0
.iter()
.map(|e| e.message.as_str())
.collect::<Vec<_>>();
write!(f, "{}", messages.join(", "))
}
}
/// Implement traceable error for GraphQL Errors
impl<'a> TraceableError for GraphQLErrors<'a> {
fn visibility(&self) -> ErrorVisibility {
// Traces related to GraphQL errors are always visible to the user
ErrorVisibility::User
}
}
/// A simple wrapper around a GraphQL HTTP response
pub struct GraphQLResponse(gql::http::Response);
impl GraphQLResponse {
pub fn from_result(
result: execute::ExecuteQueryResult,
expose_internal_errors: execute::ExposeInternalErrors,
) -> Self {
Self(result.to_graphql_response(expose_internal_errors))
}
pub fn from_error(
err: &execute::RequestError,
expose_internal_errors: execute::ExposeInternalErrors,
) -> Self {
Self(Response::error(
err.to_graphql_error(expose_internal_errors),
axum::http::HeaderMap::default(),
))
}
pub fn from_response(response: gql::http::Response) -> Self {
Self(response)
}
pub fn does_contain_error(&self) -> bool {
self.0.does_contains_error()
}
pub fn inner(self) -> gql::http::Response {
self.0
}
}
/// Implement traceable for GraphQL Response
impl Traceable for GraphQLResponse {
type ErrorType<'a> = GraphQLErrors<'a>;
fn get_error(&self) -> Option<GraphQLErrors<'_>> {
self.0.errors.as_ref().map(GraphQLErrors)
}
}
#[derive(Debug, thiserror::Error)]
#[error("{0}")]
pub struct GraphQlParseError(#[from] pub gql::ast::spanning::Positioned<gql::parser::Error>);
impl TraceableError for GraphQlParseError {
fn visibility(&self) -> ErrorVisibility {
ErrorVisibility::User
}
}
#[derive(Debug, thiserror::Error)]
#[error("{0}")]
pub struct GraphQlValidationError(#[from] pub gql::validation::Error);
impl TraceableError for GraphQlValidationError {
fn visibility(&self) -> ErrorVisibility {
ErrorVisibility::User
}
}

View File

@ -23,6 +23,8 @@ crates
│   ├── types
├── ir
├── execute
├── frontends
│   ├── graphql
├── engine
│   ├── bin
│   │   ├── engine
@ -126,8 +128,12 @@ intermediate representation ready to plan a request.
### `execute`
Responsible for the core operation of the engine in the context of a user
provided metadata, including the web server, requests processing, executing
requests, etc.
provided metadata, including requests processing, executing requests, etc.
### `frontends/graphql`
Entrypoints for GraphQL requests. Orchestrates parsing, validation and planning
requests.
#### `engine/bin`