Separate the query request tree and mutation request tree (#349)

## Description

Following on from #347, this PR separates "query requests" and "mutation
requests" at the level of the query plan, rather than the individual
fields. The result of this is that we can separate the functions to plan
and execute the two things entirely, which will make it easier in the
forthcoming PR to add support for transactions.

There's almost certainly some further refactoring that could be done
here, but I'm choosing to wait until transactions are in place before
getting too excited.

V3_GIT_ORIGIN_REV_ID: 849f4682cc01d3b9e44d2748e4cc34382c8547a7
This commit is contained in:
Tom Harding 2024-03-19 12:51:58 +01:00 committed by hasura-bot
parent 7c7e50505f
commit 0123fa339a
12 changed files with 251 additions and 105 deletions

View File

@ -1,5 +1,5 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use engine::execute::query_plan::{execute_query_plan, generate_query_plan};
use engine::execute::plan::{execute_mutation_plan, execute_query_plan, generate_request_plan};
use engine::execute::{execute_query_internal, generate_ir};
use engine::schema::GDS;
use hasura_authn_core::Identity;
@ -120,7 +120,7 @@ pub fn bench_execute(
&(&runtime),
|b, runtime| {
b.to_async(*runtime)
.iter(|| async { generate_query_plan(&ir).unwrap() })
.iter(|| async { generate_request_plan(&ir).unwrap() })
},
);
@ -130,7 +130,14 @@ pub fn bench_execute(
&(&runtime),
|b, runtime| {
b.to_async(*runtime).iter(|| async {
execute_query_plan(&http_client, generate_query_plan(&ir).unwrap(), None).await
match generate_request_plan(&ir).unwrap() {
engine::execute::plan::RequestPlan::QueryPlan(query_plan) => {
execute_query_plan(&http_client, query_plan, None).await
}
engine::execute::plan::RequestPlan::MutationPlan(mutation_plan) => {
execute_mutation_plan(&http_client, mutation_plan, None).await
}
}
})
},
);

View File

@ -24,8 +24,8 @@ pub mod global_id;
pub mod ir;
pub mod model_tracking;
pub mod ndc;
pub mod plan;
pub mod process_response;
pub mod query_plan;
pub mod remote_joins;
#[derive(Debug)]
@ -203,9 +203,9 @@ pub async fn execute_request_internal(
generate_ir(schema, session, &normalized_request)
})?;
// construct a query plan
let query_plan = tracer.in_span("plan", SpanVisibility::Internal, || {
query_plan::generate_query_plan(&ir)
// construct a plan to execute the request
let request_plan = tracer.in_span("plan", SpanVisibility::Internal, || {
plan::generate_request_plan(&ir)
})?;
// execute/explain the query plan
@ -224,12 +224,25 @@ pub async fn execute_request_internal(
);
Box::pin(async {
let execute_query_result = query_plan::execute_query_plan(
http_client,
query_plan,
project_id,
)
.await;
let execute_query_result = match request_plan {
plan::RequestPlan::MutationPlan(mutation_plan) => {
plan::execute_mutation_plan(
http_client,
mutation_plan,
project_id,
)
.await
}
plan::RequestPlan::QueryPlan(query_plan) => {
plan::execute_query_plan(
http_client,
query_plan,
project_id,
)
.await
}
};
ExecuteOrExplainResponse::Execute(GraphQLResponse(
execute_query_result.to_graphql_response(),
))
@ -241,19 +254,30 @@ pub async fn execute_request_internal(
tracer
.in_span_async("explain", SpanVisibility::Internal, || {
Box::pin(async {
let request_result = match request_plan {
plan::RequestPlan::MutationPlan(mutation_plan) => {
crate::execute::explain::explain_mutation_plan(
http_client,
mutation_plan,
)
.await
}
plan::RequestPlan::QueryPlan(query_plan) => {
crate::execute::explain::explain_query_plan(
http_client,
query_plan,
)
.await
}
};
// convert the query plan to explain step
let explain_response =
match crate::execute::explain::explain_query_plan(
http_client,
query_plan,
)
.await
{
Ok(step) => step.make_explain_response(),
Err(e) => explain::types::ExplainResponse::error(
e.to_graphql_error(None),
),
};
let explain_response = match request_result {
Ok(step) => step.make_explain_response(),
Err(e) => explain::types::ExplainResponse::error(
e.to_graphql_error(None),
),
};
ExecuteOrExplainResponse::Explain(explain_response)
})

View File

@ -1,8 +1,8 @@
use super::remote_joins::types::{JoinNode, RemoteJoinType};
use super::ExecuteOrExplainResponse;
use crate::execute::query_plan::{NodeQueryPlan, ProcessResponseAs};
use crate::execute::plan::{NodeMutationPlan, NodeQueryPlan, ProcessResponseAs};
use crate::execute::remote_joins::types::{JoinId, JoinLocations, RemoteJoin};
use crate::execute::{error, query_plan};
use crate::execute::{error, plan};
use crate::metadata::resolved;
use crate::schema::GDS;
use crate::utils::async_map::AsyncMap;
@ -53,9 +53,10 @@ pub async fn execute_explain_internal(
}
}
/// Produce an /explain plan for a given GraphQL query.
pub(crate) async fn explain_query_plan(
http_client: &reqwest::Client,
query_plan: query_plan::QueryPlan<'_, '_, '_>,
query_plan: plan::QueryPlan<'_, '_, '_>,
) -> Result<types::Step, error::Error> {
let mut parallel_root_steps = vec![];
// Here, we are assuming that all root fields are executed in parallel.
@ -105,18 +106,6 @@ pub(crate) async fn explain_query_plan(
"cannot explain relay queries with no execution plan".to_string(),
));
}
NodeQueryPlan::NDCMutationExecution(ndc_mutation_execution) => {
let sequence_steps = get_execution_steps(
http_client,
alias,
&ndc_mutation_execution.process_response_as,
ndc_mutation_execution.join_locations,
types::NDCRequest::Mutation(ndc_mutation_execution.query),
ndc_mutation_execution.data_connector,
)
.await;
parallel_root_steps.push(Box::new(types::Step::Sequence(sequence_steps)));
}
}
}
// simplify the steps
@ -132,6 +121,47 @@ pub(crate) async fn explain_query_plan(
}
}
/// Produce an /explain plan for a given GraphQL mutation.
pub(crate) async fn explain_mutation_plan(
http_client: &reqwest::Client,
mutation_plan: plan::MutationPlan<'_, '_, '_>,
) -> Result<types::Step, error::Error> {
let mut root_steps = vec![];
for (alias, node) in mutation_plan {
match node {
NodeMutationPlan::TypeName { .. } => {
return Err(error::Error::ExplainError(
"cannot explain introspection queries".to_string(),
));
}
NodeMutationPlan::NDCMutationExecution(ndc_mutation_execution) => {
let sequence_steps = get_execution_steps(
http_client,
alias,
&ndc_mutation_execution.process_response_as,
ndc_mutation_execution.join_locations,
types::NDCRequest::Mutation(ndc_mutation_execution.query),
ndc_mutation_execution.data_connector,
)
.await;
root_steps.push(Box::new(types::Step::Sequence(sequence_steps)));
}
}
}
// simplify the steps
match NonEmpty::from_vec(root_steps) {
Some(root_steps) => {
let simplified_step = simplify_step(Box::new(types::Step::Sequence(root_steps)));
Ok(*simplified_step)
}
None => Err(error::Error::ExplainError(
"cannot explain mutation as there are no explainable root fields".to_string(),
)),
}
}
async fn get_execution_steps<'s>(
http_client: &reqwest::Client,
alias: gql::ast::common::Alias,

View File

@ -7,8 +7,8 @@ use lang_graphql::ast::common as ast;
use ndc_client as ndc;
use tracing_util::{set_attribute_on_active_span, AttributeVisibility, SpanVisibility};
use super::plan::ProcessResponseAs;
use super::process_response::process_command_mutation_response;
use super::query_plan::ProcessResponseAs;
use super::{error, ProjectId};
use crate::metadata::resolved;
use crate::schema::GDS;

View File

@ -26,7 +26,14 @@ use crate::metadata::resolved::{self, subgraph};
use crate::schema::GDS;
pub type QueryPlan<'n, 's, 'ir> = IndexMap<ast::Alias, NodeQueryPlan<'n, 's, 'ir>>;
pub type MutationPlan<'n, 's, 'ir> = IndexMap<ast::Alias, NDCMutationExecution<'n, 's, 'ir>>;
pub type MutationPlan<'n, 's, 'ir> = IndexMap<ast::Alias, NodeMutationPlan<'n, 's, 'ir>>;
// At least for now, requests are _either_ queries or mutations, and a mix of the two can be
// treated as an invalid request. We may want to change this in the future.
pub enum RequestPlan<'n, 's, 'ir> {
QueryPlan(QueryPlan<'n, 's, 'ir>),
MutationPlan(MutationPlan<'n, 's, 'ir>),
}
/// Query plan of individual root field or node
#[derive(Debug)]
@ -50,6 +57,13 @@ pub enum NodeQueryPlan<'n, 's, 'ir> {
NDCQueryExecution(NDCQueryExecution<'s, 'ir>),
/// NDC query for Relay 'node' to be executed
RelayNodeSelect(Option<NDCQueryExecution<'s, 'ir>>),
}
/// Mutation plan of individual root field or node.
/// Note that __typename exists for mutations, but __schema and __type do not.
pub enum NodeMutationPlan<'n, 's, 'ir> {
/// __typename field on mutation root
TypeName { type_name: ast::TypeName },
/// NDC mutation to be executed
NDCMutationExecution(NDCMutationExecution<'n, 's, 'ir>),
}
@ -113,25 +127,65 @@ impl<'ir> ProcessResponseAs<'ir> {
}
}
pub fn generate_query_plan<'n, 's, 'ir>(
/// Build a plan to handle a given request. This plan will either be a mutation plan or a query
/// plan, but currently can't be both. This may change when we support protocols other than
/// GraphQL.
pub fn generate_request_plan<'n, 's, 'ir>(
ir: &'ir IndexMap<ast::Alias, root_field::RootField<'n, 's>>,
) -> Result<QueryPlan<'n, 's, 'ir>, error::Error> {
let mut query_plan = IndexMap::new();
) -> Result<RequestPlan<'n, 's, 'ir>, error::Error> {
let mut request_plan = None;
for (alias, field) in ir.into_iter() {
let field_plan = match field {
root_field::RootField::QueryRootField(field_ir) => plan_query(field_ir),
root_field::RootField::MutationRootField(field_ir) => plan_mutation(field_ir),
}?;
query_plan.insert(alias.clone(), field_plan);
match field {
root_field::RootField::QueryRootField(field_ir) => {
let mut query_plan = match request_plan {
Some(RequestPlan::MutationPlan(_)) => Err(error::Error::InternalError(
error::InternalError::Engine(error::InternalEngineError::InternalGeneric {
description:
"Parsed engine request contains mixed mutation/query operations"
.to_string(),
}),
))?,
Some(RequestPlan::QueryPlan(query_plan)) => query_plan,
None => IndexMap::new(),
};
query_plan.insert(alias.clone(), plan_query(field_ir)?);
request_plan = Some(RequestPlan::QueryPlan(query_plan));
}
root_field::RootField::MutationRootField(field_ir) => {
let mut mutation_plan = match request_plan {
Some(RequestPlan::QueryPlan(_)) => Err(error::Error::InternalError(
error::InternalError::Engine(error::InternalEngineError::InternalGeneric {
description:
"Parsed engine request contains mixed mutation/query operations"
.to_string(),
}),
))?,
Some(RequestPlan::MutationPlan(mutation_plan)) => mutation_plan,
None => IndexMap::new(),
};
mutation_plan.insert(alias.clone(), plan_mutation(field_ir)?);
request_plan = Some(RequestPlan::MutationPlan(mutation_plan));
}
}
}
Ok(query_plan)
request_plan.ok_or(error::Error::InternalError(error::InternalError::Engine(
error::InternalEngineError::InternalGeneric {
description: "Parsed an empty request".to_string(),
},
)))
}
// Given a singular root field of a mutation, plan the execution of that root field.
fn plan_mutation<'n, 's, 'ir>(
ir: &'ir root_field::MutationRootField<'n, 's>,
) -> Result<NodeQueryPlan<'n, 's, 'ir>, error::Error> {
) -> Result<NodeMutationPlan<'n, 's, 'ir>, error::Error> {
let plan = match ir {
root_field::MutationRootField::TypeName { type_name } => NodeQueryPlan::TypeName {
root_field::MutationRootField::TypeName { type_name } => NodeMutationPlan::TypeName {
type_name: type_name.clone(),
},
root_field::MutationRootField::ProcedureBasedCommand { ir, selection_set } => {
@ -139,7 +193,7 @@ fn plan_mutation<'n, 's, 'ir>(
let (ndc_ir, join_locations) =
commands::ndc_mutation_ir(ir.procedure_name, ir, &mut join_id_counter)?;
let join_locations_ids = assign_with_join_ids(join_locations)?;
NodeQueryPlan::NDCMutationExecution(NDCMutationExecution {
NodeMutationPlan::NDCMutationExecution(NDCMutationExecution {
query: ndc_ir,
join_locations: join_locations_ids,
data_connector: ir.command_info.data_connector,
@ -156,6 +210,7 @@ fn plan_mutation<'n, 's, 'ir>(
Ok(plan)
}
// Given a singular root field of a query, plan the execution of that root field.
fn plan_query<'n, 's, 'ir>(
ir: &'ir root_field::QueryRootField<'n, 's>,
) -> Result<NodeQueryPlan<'n, 's, 'ir>, error::Error> {
@ -433,19 +488,20 @@ impl ExecuteQueryResult {
}
}
async fn execute_field_plan<'n, 's, 'ir>(
/// Execute a single root field's query plan to produce a result.
async fn execute_query_field_plan<'n, 's, 'ir>(
http_client: &reqwest::Client,
field_plan: NodeQueryPlan<'n, 's, 'ir>,
query_plan: NodeQueryPlan<'n, 's, 'ir>,
project_id: Option<ProjectId>,
) -> RootFieldResult {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(
"execute_field_plan",
"execute_query_field_plan",
tracing_util::SpanVisibility::User,
|| {
Box::pin(async {
match field_plan {
match query_plan {
NodeQueryPlan::TypeName { type_name } => {
set_attribute_on_active_span(
AttributeVisibility::Default,
@ -492,11 +548,6 @@ async fn execute_field_plan<'n, 's, 'ir>(
&ndc_query.process_response_as.is_nullable(),
resolve_ndc_query_execution(http_client, ndc_query, project_id).await,
),
NodeQueryPlan::NDCMutationExecution(ndc_query) => RootFieldResult::new(
&ndc_query.process_response_as.is_nullable(),
resolve_ndc_mutation_execution(http_client, ndc_query, project_id)
.await,
),
NodeQueryPlan::RelayNodeSelect(optional_query) => RootFieldResult::new(
&optional_query.as_ref().map_or(true, |ndc_query| {
ndc_query.process_response_as.is_nullable()
@ -511,27 +562,77 @@ async fn execute_field_plan<'n, 's, 'ir>(
.await
}
/// Extract the mutations from a query plan, so that we can run them sequentially.
pub fn separate_mutations<'n, 's, 'ir>(
query_plan: QueryPlan<'n, 's, 'ir>,
) -> (QueryPlan<'n, 's, 'ir>, MutationPlan<'n, 's, 'ir>) {
let mut mutations = IndexMap::new();
let mut queries = IndexMap::new();
for (alias, field_plan) in query_plan.into_iter() {
match field_plan {
NodeQueryPlan::NDCMutationExecution(ndc_mutation_execution) => {
mutations.insert(alias, ndc_mutation_execution);
}
otherwise => {
queries.insert(alias, otherwise);
}
};
}
(queries, mutations)
/// Execute a single root field's mutation plan to produce a result.
async fn execute_mutation_field_plan<'n, 's, 'ir>(
http_client: &reqwest::Client,
mutation_plan: NodeMutationPlan<'n, 's, 'ir>,
project_id: Option<ProjectId>,
) -> RootFieldResult {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(
"execute_mutation_field_plan",
tracing_util::SpanVisibility::User,
|| {
Box::pin(async {
match mutation_plan {
NodeMutationPlan::TypeName { type_name } => {
set_attribute_on_active_span(
AttributeVisibility::Default,
"field",
"__typename",
);
RootFieldResult::new(
&false, // __typename: String! ; the __typename field is not nullable
resolve_type_name(type_name),
)
}
NodeMutationPlan::NDCMutationExecution(ndc_mutation) => {
RootFieldResult::new(
&ndc_mutation.process_response_as.is_nullable(),
resolve_ndc_mutation_execution(
http_client,
ndc_mutation,
project_id,
)
.await,
)
}
}
})
},
)
.await
}
/// Given an entire plan for a mutation, produce a result. We do this by executing the singular
/// root fields of the mutation sequentially rather than concurrently, in the order defined by the
/// `IndexMap`'s keys.
pub async fn execute_mutation_plan<'n, 's, 'ir>(
http_client: &reqwest::Client,
mutation_plan: MutationPlan<'n, 's, 'ir>,
project_id: Option<ProjectId>,
) -> ExecuteQueryResult {
let mut root_fields = IndexMap::new();
let mut executed_root_fields = Vec::new();
for (alias, field_plan) in mutation_plan.into_iter() {
executed_root_fields.push((
alias,
execute_mutation_field_plan(http_client, field_plan, project_id.clone()).await,
));
}
for executed_root_field in executed_root_fields.into_iter() {
let (alias, root_field) = executed_root_field;
root_fields.insert(alias, root_field);
}
ExecuteQueryResult { root_fields }
}
/// Given an entire plan for a query, produce a result. We do this by executing all the singular
/// root fields of the query in parallel, and joining the results back together.
pub async fn execute_query_plan<'n, 's, 'ir>(
http_client: &reqwest::Client,
query_plan: QueryPlan<'n, 's, 'ir>,
@ -541,36 +642,20 @@ pub async fn execute_query_plan<'n, 's, 'ir>(
let mut tasks: Vec<_> = Vec::with_capacity(query_plan.capacity());
let (queries, mutations) = separate_mutations(query_plan);
let mut executed_root_fields = Vec::new();
for (alias, field_plan) in mutations.into_iter() {
executed_root_fields.push((
alias,
execute_field_plan(
http_client,
NodeQueryPlan::NDCMutationExecution(field_plan),
project_id.clone(),
)
.await,
));
}
for (alias, field_plan) in queries.into_iter() {
for (alias, field_plan) in query_plan.into_iter() {
// We are not running the field plans parallely here, we are just running them concurrently on a single thread.
// To run the field plans parallely, we will need to use tokio::spawn for each field plan.
let task = async {
(
alias,
execute_field_plan(http_client, field_plan, project_id.clone()).await,
execute_query_field_plan(http_client, field_plan, project_id.clone()).await,
)
};
tasks.push(task);
}
executed_root_fields.extend(futures::future::join_all(tasks).await);
let executed_root_fields = futures::future::join_all(tasks).await;
for executed_root_field in executed_root_fields.into_iter() {
let (alias, root_field) = executed_root_field;

View File

@ -16,7 +16,7 @@ use open_dds::types::FieldName;
use super::error;
use super::global_id::{global_id_col_format, GLOBAL_ID_VERSION};
use super::ndc::FUNCTION_IR_VALUE_COLUMN_NAME;
use super::query_plan::ProcessResponseAs;
use super::plan::ProcessResponseAs;
use crate::metadata::resolved::subgraph::Qualified;
use crate::schema::{
types::{Annotation, GlobalID, OutputAnnotation},

View File

@ -71,7 +71,7 @@
//!
//! 5. Perform join on LHS response and RHS response
//!
//! [process_selection_set_ir]: crate::execute::query_plan::selection_set::process_selection_set_ir
//! [process_selection_set_ir]: crate::execute::plan::selection_set::process_selection_set_ir
//! [generate_selection_set_ir]: crate::execute::ir::selection_set::generate_selection_set_ir
//! [build_remote_relationship]: crate::execute::ir::relationship::build_remote_relationship
//! [build_remote_command_relationship]: crate::execute::ir::relationship::build_remote_command_relationship
@ -89,7 +89,7 @@ use ndc_client as ndc;
use self::types::{JoinNode, LocationKind, TargetField};
use super::ndc::{execute_ndc_query, FUNCTION_IR_VALUE_COLUMN_NAME};
use super::query_plan::ProcessResponseAs;
use super::plan::ProcessResponseAs;
use super::{error, ProjectId};
use types::{

View File

@ -7,7 +7,7 @@ use open_dds::arguments::ArgumentName;
use open_dds::types::FieldName;
use std::collections::{BTreeMap, HashMap};
use crate::execute::query_plan::ProcessResponseAs;
use crate::execute::plan::ProcessResponseAs;
use crate::metadata::resolved;
use crate::metadata::resolved::types::NdcColumnForComparison;
use crate::utils::json_ext::ValueExt;