From 0123fa339aae2f22dc62b76d72c0e08420a68e9a Mon Sep 17 00:00:00 2001 From: Tom Harding Date: Tue, 19 Mar 2024 12:51:58 +0100 Subject: [PATCH] Separate the query request tree and mutation request tree (#349) ## Description Following on from #347, this PR separates "query requests" and "mutation requests" at the level of the query plan, rather than the individual fields. The result of this is that we can separate the functions to plan and execute the two things entirely, which will make it easier in the forthcoming PR to add support for transactions. There's almost certainly some further refactoring that could be done here, but I'm choosing to wait until transactions are in place before getting too excited. V3_GIT_ORIGIN_REV_ID: 849f4682cc01d3b9e44d2748e4cc34382c8547a7 --- v3/engine/benches/execute.rs | 13 +- v3/engine/src/execute.rs | 68 ++++-- v3/engine/src/execute/explain.rs | 60 +++-- v3/engine/src/execute/ndc.rs | 2 +- .../src/execute/{query_plan.rs => plan.rs} | 205 +++++++++++++----- .../execute/{query_plan => plan}/commands.rs | 0 .../{query_plan => plan}/model_selection.rs | 0 .../{query_plan => plan}/relationships.rs | 0 .../{query_plan => plan}/selection_set.rs | 0 v3/engine/src/execute/process_response.rs | 2 +- v3/engine/src/execute/remote_joins.rs | 4 +- v3/engine/src/execute/remote_joins/types.rs | 2 +- 12 files changed, 251 insertions(+), 105 deletions(-) rename v3/engine/src/execute/{query_plan.rs => plan.rs} (78%) rename v3/engine/src/execute/{query_plan => plan}/commands.rs (100%) rename v3/engine/src/execute/{query_plan => plan}/model_selection.rs (100%) rename v3/engine/src/execute/{query_plan => plan}/relationships.rs (100%) rename v3/engine/src/execute/{query_plan => plan}/selection_set.rs (100%) diff --git a/v3/engine/benches/execute.rs b/v3/engine/benches/execute.rs index 3bc0f6c156f..dcc8214c684 100644 --- a/v3/engine/benches/execute.rs +++ b/v3/engine/benches/execute.rs @@ -1,5 +1,5 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; -use engine::execute::query_plan::{execute_query_plan, generate_query_plan}; +use engine::execute::plan::{execute_mutation_plan, execute_query_plan, generate_request_plan}; use engine::execute::{execute_query_internal, generate_ir}; use engine::schema::GDS; use hasura_authn_core::Identity; @@ -120,7 +120,7 @@ pub fn bench_execute( &(&runtime), |b, runtime| { b.to_async(*runtime) - .iter(|| async { generate_query_plan(&ir).unwrap() }) + .iter(|| async { generate_request_plan(&ir).unwrap() }) }, ); @@ -130,7 +130,14 @@ pub fn bench_execute( &(&runtime), |b, runtime| { b.to_async(*runtime).iter(|| async { - execute_query_plan(&http_client, generate_query_plan(&ir).unwrap(), None).await + match generate_request_plan(&ir).unwrap() { + engine::execute::plan::RequestPlan::QueryPlan(query_plan) => { + execute_query_plan(&http_client, query_plan, None).await + } + engine::execute::plan::RequestPlan::MutationPlan(mutation_plan) => { + execute_mutation_plan(&http_client, mutation_plan, None).await + } + } }) }, ); diff --git a/v3/engine/src/execute.rs b/v3/engine/src/execute.rs index 1a782172096..7fb02ce3127 100644 --- a/v3/engine/src/execute.rs +++ b/v3/engine/src/execute.rs @@ -24,8 +24,8 @@ pub mod global_id; pub mod ir; pub mod model_tracking; pub mod ndc; +pub mod plan; pub mod process_response; -pub mod query_plan; pub mod remote_joins; #[derive(Debug)] @@ -203,9 +203,9 @@ pub async fn execute_request_internal( generate_ir(schema, session, &normalized_request) })?; - // construct a query plan - let query_plan = tracer.in_span("plan", SpanVisibility::Internal, || { - query_plan::generate_query_plan(&ir) + // construct a plan to execute the request + let request_plan = tracer.in_span("plan", SpanVisibility::Internal, || { + plan::generate_request_plan(&ir) })?; // execute/explain the query plan @@ -224,12 +224,25 @@ pub async fn execute_request_internal( ); Box::pin(async { - let execute_query_result = query_plan::execute_query_plan( - http_client, - query_plan, - project_id, - ) - .await; + let execute_query_result = match request_plan { + plan::RequestPlan::MutationPlan(mutation_plan) => { + plan::execute_mutation_plan( + http_client, + mutation_plan, + project_id, + ) + .await + } + plan::RequestPlan::QueryPlan(query_plan) => { + plan::execute_query_plan( + http_client, + query_plan, + project_id, + ) + .await + } + }; + ExecuteOrExplainResponse::Execute(GraphQLResponse( execute_query_result.to_graphql_response(), )) @@ -241,19 +254,30 @@ pub async fn execute_request_internal( tracer .in_span_async("explain", SpanVisibility::Internal, || { Box::pin(async { + let request_result = match request_plan { + plan::RequestPlan::MutationPlan(mutation_plan) => { + crate::execute::explain::explain_mutation_plan( + http_client, + mutation_plan, + ) + .await + } + plan::RequestPlan::QueryPlan(query_plan) => { + crate::execute::explain::explain_query_plan( + http_client, + query_plan, + ) + .await + } + }; + // convert the query plan to explain step - let explain_response = - match crate::execute::explain::explain_query_plan( - http_client, - query_plan, - ) - .await - { - Ok(step) => step.make_explain_response(), - Err(e) => explain::types::ExplainResponse::error( - e.to_graphql_error(None), - ), - }; + let explain_response = match request_result { + Ok(step) => step.make_explain_response(), + Err(e) => explain::types::ExplainResponse::error( + e.to_graphql_error(None), + ), + }; ExecuteOrExplainResponse::Explain(explain_response) }) diff --git a/v3/engine/src/execute/explain.rs b/v3/engine/src/execute/explain.rs index 812f7363753..282d13e6491 100644 --- a/v3/engine/src/execute/explain.rs +++ b/v3/engine/src/execute/explain.rs @@ -1,8 +1,8 @@ use super::remote_joins::types::{JoinNode, RemoteJoinType}; use super::ExecuteOrExplainResponse; -use crate::execute::query_plan::{NodeQueryPlan, ProcessResponseAs}; +use crate::execute::plan::{NodeMutationPlan, NodeQueryPlan, ProcessResponseAs}; use crate::execute::remote_joins::types::{JoinId, JoinLocations, RemoteJoin}; -use crate::execute::{error, query_plan}; +use crate::execute::{error, plan}; use crate::metadata::resolved; use crate::schema::GDS; use crate::utils::async_map::AsyncMap; @@ -53,9 +53,10 @@ pub async fn execute_explain_internal( } } +/// Produce an /explain plan for a given GraphQL query. pub(crate) async fn explain_query_plan( http_client: &reqwest::Client, - query_plan: query_plan::QueryPlan<'_, '_, '_>, + query_plan: plan::QueryPlan<'_, '_, '_>, ) -> Result { let mut parallel_root_steps = vec![]; // Here, we are assuming that all root fields are executed in parallel. @@ -105,18 +106,6 @@ pub(crate) async fn explain_query_plan( "cannot explain relay queries with no execution plan".to_string(), )); } - NodeQueryPlan::NDCMutationExecution(ndc_mutation_execution) => { - let sequence_steps = get_execution_steps( - http_client, - alias, - &ndc_mutation_execution.process_response_as, - ndc_mutation_execution.join_locations, - types::NDCRequest::Mutation(ndc_mutation_execution.query), - ndc_mutation_execution.data_connector, - ) - .await; - parallel_root_steps.push(Box::new(types::Step::Sequence(sequence_steps))); - } } } // simplify the steps @@ -132,6 +121,47 @@ pub(crate) async fn explain_query_plan( } } +/// Produce an /explain plan for a given GraphQL mutation. +pub(crate) async fn explain_mutation_plan( + http_client: &reqwest::Client, + mutation_plan: plan::MutationPlan<'_, '_, '_>, +) -> Result { + let mut root_steps = vec![]; + + for (alias, node) in mutation_plan { + match node { + NodeMutationPlan::TypeName { .. } => { + return Err(error::Error::ExplainError( + "cannot explain introspection queries".to_string(), + )); + } + NodeMutationPlan::NDCMutationExecution(ndc_mutation_execution) => { + let sequence_steps = get_execution_steps( + http_client, + alias, + &ndc_mutation_execution.process_response_as, + ndc_mutation_execution.join_locations, + types::NDCRequest::Mutation(ndc_mutation_execution.query), + ndc_mutation_execution.data_connector, + ) + .await; + root_steps.push(Box::new(types::Step::Sequence(sequence_steps))); + } + } + } + + // simplify the steps + match NonEmpty::from_vec(root_steps) { + Some(root_steps) => { + let simplified_step = simplify_step(Box::new(types::Step::Sequence(root_steps))); + Ok(*simplified_step) + } + None => Err(error::Error::ExplainError( + "cannot explain mutation as there are no explainable root fields".to_string(), + )), + } +} + async fn get_execution_steps<'s>( http_client: &reqwest::Client, alias: gql::ast::common::Alias, diff --git a/v3/engine/src/execute/ndc.rs b/v3/engine/src/execute/ndc.rs index b853e52acd4..3de132cedc6 100644 --- a/v3/engine/src/execute/ndc.rs +++ b/v3/engine/src/execute/ndc.rs @@ -7,8 +7,8 @@ use lang_graphql::ast::common as ast; use ndc_client as ndc; use tracing_util::{set_attribute_on_active_span, AttributeVisibility, SpanVisibility}; +use super::plan::ProcessResponseAs; use super::process_response::process_command_mutation_response; -use super::query_plan::ProcessResponseAs; use super::{error, ProjectId}; use crate::metadata::resolved; use crate::schema::GDS; diff --git a/v3/engine/src/execute/query_plan.rs b/v3/engine/src/execute/plan.rs similarity index 78% rename from v3/engine/src/execute/query_plan.rs rename to v3/engine/src/execute/plan.rs index e41549d6270..facff7dce93 100644 --- a/v3/engine/src/execute/query_plan.rs +++ b/v3/engine/src/execute/plan.rs @@ -26,7 +26,14 @@ use crate::metadata::resolved::{self, subgraph}; use crate::schema::GDS; pub type QueryPlan<'n, 's, 'ir> = IndexMap>; -pub type MutationPlan<'n, 's, 'ir> = IndexMap>; +pub type MutationPlan<'n, 's, 'ir> = IndexMap>; + +// At least for now, requests are _either_ queries or mutations, and a mix of the two can be +// treated as an invalid request. We may want to change this in the future. +pub enum RequestPlan<'n, 's, 'ir> { + QueryPlan(QueryPlan<'n, 's, 'ir>), + MutationPlan(MutationPlan<'n, 's, 'ir>), +} /// Query plan of individual root field or node #[derive(Debug)] @@ -50,6 +57,13 @@ pub enum NodeQueryPlan<'n, 's, 'ir> { NDCQueryExecution(NDCQueryExecution<'s, 'ir>), /// NDC query for Relay 'node' to be executed RelayNodeSelect(Option>), +} + +/// Mutation plan of individual root field or node. +/// Note that __typename exists for mutations, but __schema and __type do not. +pub enum NodeMutationPlan<'n, 's, 'ir> { + /// __typename field on mutation root + TypeName { type_name: ast::TypeName }, /// NDC mutation to be executed NDCMutationExecution(NDCMutationExecution<'n, 's, 'ir>), } @@ -113,25 +127,65 @@ impl<'ir> ProcessResponseAs<'ir> { } } -pub fn generate_query_plan<'n, 's, 'ir>( +/// Build a plan to handle a given request. This plan will either be a mutation plan or a query +/// plan, but currently can't be both. This may change when we support protocols other than +/// GraphQL. +pub fn generate_request_plan<'n, 's, 'ir>( ir: &'ir IndexMap>, -) -> Result, error::Error> { - let mut query_plan = IndexMap::new(); +) -> Result, error::Error> { + let mut request_plan = None; + for (alias, field) in ir.into_iter() { - let field_plan = match field { - root_field::RootField::QueryRootField(field_ir) => plan_query(field_ir), - root_field::RootField::MutationRootField(field_ir) => plan_mutation(field_ir), - }?; - query_plan.insert(alias.clone(), field_plan); + match field { + root_field::RootField::QueryRootField(field_ir) => { + let mut query_plan = match request_plan { + Some(RequestPlan::MutationPlan(_)) => Err(error::Error::InternalError( + error::InternalError::Engine(error::InternalEngineError::InternalGeneric { + description: + "Parsed engine request contains mixed mutation/query operations" + .to_string(), + }), + ))?, + Some(RequestPlan::QueryPlan(query_plan)) => query_plan, + None => IndexMap::new(), + }; + + query_plan.insert(alias.clone(), plan_query(field_ir)?); + request_plan = Some(RequestPlan::QueryPlan(query_plan)); + } + + root_field::RootField::MutationRootField(field_ir) => { + let mut mutation_plan = match request_plan { + Some(RequestPlan::QueryPlan(_)) => Err(error::Error::InternalError( + error::InternalError::Engine(error::InternalEngineError::InternalGeneric { + description: + "Parsed engine request contains mixed mutation/query operations" + .to_string(), + }), + ))?, + Some(RequestPlan::MutationPlan(mutation_plan)) => mutation_plan, + None => IndexMap::new(), + }; + + mutation_plan.insert(alias.clone(), plan_mutation(field_ir)?); + request_plan = Some(RequestPlan::MutationPlan(mutation_plan)); + } + } } - Ok(query_plan) + + request_plan.ok_or(error::Error::InternalError(error::InternalError::Engine( + error::InternalEngineError::InternalGeneric { + description: "Parsed an empty request".to_string(), + }, + ))) } +// Given a singular root field of a mutation, plan the execution of that root field. fn plan_mutation<'n, 's, 'ir>( ir: &'ir root_field::MutationRootField<'n, 's>, -) -> Result, error::Error> { +) -> Result, error::Error> { let plan = match ir { - root_field::MutationRootField::TypeName { type_name } => NodeQueryPlan::TypeName { + root_field::MutationRootField::TypeName { type_name } => NodeMutationPlan::TypeName { type_name: type_name.clone(), }, root_field::MutationRootField::ProcedureBasedCommand { ir, selection_set } => { @@ -139,7 +193,7 @@ fn plan_mutation<'n, 's, 'ir>( let (ndc_ir, join_locations) = commands::ndc_mutation_ir(ir.procedure_name, ir, &mut join_id_counter)?; let join_locations_ids = assign_with_join_ids(join_locations)?; - NodeQueryPlan::NDCMutationExecution(NDCMutationExecution { + NodeMutationPlan::NDCMutationExecution(NDCMutationExecution { query: ndc_ir, join_locations: join_locations_ids, data_connector: ir.command_info.data_connector, @@ -156,6 +210,7 @@ fn plan_mutation<'n, 's, 'ir>( Ok(plan) } +// Given a singular root field of a query, plan the execution of that root field. fn plan_query<'n, 's, 'ir>( ir: &'ir root_field::QueryRootField<'n, 's>, ) -> Result, error::Error> { @@ -433,19 +488,20 @@ impl ExecuteQueryResult { } } -async fn execute_field_plan<'n, 's, 'ir>( +/// Execute a single root field's query plan to produce a result. +async fn execute_query_field_plan<'n, 's, 'ir>( http_client: &reqwest::Client, - field_plan: NodeQueryPlan<'n, 's, 'ir>, + query_plan: NodeQueryPlan<'n, 's, 'ir>, project_id: Option, ) -> RootFieldResult { let tracer = tracing_util::global_tracer(); tracer .in_span_async( - "execute_field_plan", + "execute_query_field_plan", tracing_util::SpanVisibility::User, || { Box::pin(async { - match field_plan { + match query_plan { NodeQueryPlan::TypeName { type_name } => { set_attribute_on_active_span( AttributeVisibility::Default, @@ -492,11 +548,6 @@ async fn execute_field_plan<'n, 's, 'ir>( &ndc_query.process_response_as.is_nullable(), resolve_ndc_query_execution(http_client, ndc_query, project_id).await, ), - NodeQueryPlan::NDCMutationExecution(ndc_query) => RootFieldResult::new( - &ndc_query.process_response_as.is_nullable(), - resolve_ndc_mutation_execution(http_client, ndc_query, project_id) - .await, - ), NodeQueryPlan::RelayNodeSelect(optional_query) => RootFieldResult::new( &optional_query.as_ref().map_or(true, |ndc_query| { ndc_query.process_response_as.is_nullable() @@ -511,27 +562,77 @@ async fn execute_field_plan<'n, 's, 'ir>( .await } -/// Extract the mutations from a query plan, so that we can run them sequentially. -pub fn separate_mutations<'n, 's, 'ir>( - query_plan: QueryPlan<'n, 's, 'ir>, -) -> (QueryPlan<'n, 's, 'ir>, MutationPlan<'n, 's, 'ir>) { - let mut mutations = IndexMap::new(); - let mut queries = IndexMap::new(); - - for (alias, field_plan) in query_plan.into_iter() { - match field_plan { - NodeQueryPlan::NDCMutationExecution(ndc_mutation_execution) => { - mutations.insert(alias, ndc_mutation_execution); - } - otherwise => { - queries.insert(alias, otherwise); - } - }; - } - - (queries, mutations) +/// Execute a single root field's mutation plan to produce a result. +async fn execute_mutation_field_plan<'n, 's, 'ir>( + http_client: &reqwest::Client, + mutation_plan: NodeMutationPlan<'n, 's, 'ir>, + project_id: Option, +) -> RootFieldResult { + let tracer = tracing_util::global_tracer(); + tracer + .in_span_async( + "execute_mutation_field_plan", + tracing_util::SpanVisibility::User, + || { + Box::pin(async { + match mutation_plan { + NodeMutationPlan::TypeName { type_name } => { + set_attribute_on_active_span( + AttributeVisibility::Default, + "field", + "__typename", + ); + RootFieldResult::new( + &false, // __typename: String! ; the __typename field is not nullable + resolve_type_name(type_name), + ) + } + NodeMutationPlan::NDCMutationExecution(ndc_mutation) => { + RootFieldResult::new( + &ndc_mutation.process_response_as.is_nullable(), + resolve_ndc_mutation_execution( + http_client, + ndc_mutation, + project_id, + ) + .await, + ) + } + } + }) + }, + ) + .await } +/// Given an entire plan for a mutation, produce a result. We do this by executing the singular +/// root fields of the mutation sequentially rather than concurrently, in the order defined by the +/// `IndexMap`'s keys. +pub async fn execute_mutation_plan<'n, 's, 'ir>( + http_client: &reqwest::Client, + mutation_plan: MutationPlan<'n, 's, 'ir>, + project_id: Option, +) -> ExecuteQueryResult { + let mut root_fields = IndexMap::new(); + let mut executed_root_fields = Vec::new(); + + for (alias, field_plan) in mutation_plan.into_iter() { + executed_root_fields.push(( + alias, + execute_mutation_field_plan(http_client, field_plan, project_id.clone()).await, + )); + } + + for executed_root_field in executed_root_fields.into_iter() { + let (alias, root_field) = executed_root_field; + root_fields.insert(alias, root_field); + } + + ExecuteQueryResult { root_fields } +} + +/// Given an entire plan for a query, produce a result. We do this by executing all the singular +/// root fields of the query in parallel, and joining the results back together. pub async fn execute_query_plan<'n, 's, 'ir>( http_client: &reqwest::Client, query_plan: QueryPlan<'n, 's, 'ir>, @@ -541,36 +642,20 @@ pub async fn execute_query_plan<'n, 's, 'ir>( let mut tasks: Vec<_> = Vec::with_capacity(query_plan.capacity()); - let (queries, mutations) = separate_mutations(query_plan); - - let mut executed_root_fields = Vec::new(); - - for (alias, field_plan) in mutations.into_iter() { - executed_root_fields.push(( - alias, - execute_field_plan( - http_client, - NodeQueryPlan::NDCMutationExecution(field_plan), - project_id.clone(), - ) - .await, - )); - } - - for (alias, field_plan) in queries.into_iter() { + for (alias, field_plan) in query_plan.into_iter() { // We are not running the field plans parallely here, we are just running them concurrently on a single thread. // To run the field plans parallely, we will need to use tokio::spawn for each field plan. let task = async { ( alias, - execute_field_plan(http_client, field_plan, project_id.clone()).await, + execute_query_field_plan(http_client, field_plan, project_id.clone()).await, ) }; tasks.push(task); } - executed_root_fields.extend(futures::future::join_all(tasks).await); + let executed_root_fields = futures::future::join_all(tasks).await; for executed_root_field in executed_root_fields.into_iter() { let (alias, root_field) = executed_root_field; diff --git a/v3/engine/src/execute/query_plan/commands.rs b/v3/engine/src/execute/plan/commands.rs similarity index 100% rename from v3/engine/src/execute/query_plan/commands.rs rename to v3/engine/src/execute/plan/commands.rs diff --git a/v3/engine/src/execute/query_plan/model_selection.rs b/v3/engine/src/execute/plan/model_selection.rs similarity index 100% rename from v3/engine/src/execute/query_plan/model_selection.rs rename to v3/engine/src/execute/plan/model_selection.rs diff --git a/v3/engine/src/execute/query_plan/relationships.rs b/v3/engine/src/execute/plan/relationships.rs similarity index 100% rename from v3/engine/src/execute/query_plan/relationships.rs rename to v3/engine/src/execute/plan/relationships.rs diff --git a/v3/engine/src/execute/query_plan/selection_set.rs b/v3/engine/src/execute/plan/selection_set.rs similarity index 100% rename from v3/engine/src/execute/query_plan/selection_set.rs rename to v3/engine/src/execute/plan/selection_set.rs diff --git a/v3/engine/src/execute/process_response.rs b/v3/engine/src/execute/process_response.rs index 1fbf923d68c..d99042591ae 100644 --- a/v3/engine/src/execute/process_response.rs +++ b/v3/engine/src/execute/process_response.rs @@ -16,7 +16,7 @@ use open_dds::types::FieldName; use super::error; use super::global_id::{global_id_col_format, GLOBAL_ID_VERSION}; use super::ndc::FUNCTION_IR_VALUE_COLUMN_NAME; -use super::query_plan::ProcessResponseAs; +use super::plan::ProcessResponseAs; use crate::metadata::resolved::subgraph::Qualified; use crate::schema::{ types::{Annotation, GlobalID, OutputAnnotation}, diff --git a/v3/engine/src/execute/remote_joins.rs b/v3/engine/src/execute/remote_joins.rs index b0f175d2f03..970ace9ca8d 100644 --- a/v3/engine/src/execute/remote_joins.rs +++ b/v3/engine/src/execute/remote_joins.rs @@ -71,7 +71,7 @@ //! //! 5. Perform join on LHS response and RHS response //! -//! [process_selection_set_ir]: crate::execute::query_plan::selection_set::process_selection_set_ir +//! [process_selection_set_ir]: crate::execute::plan::selection_set::process_selection_set_ir //! [generate_selection_set_ir]: crate::execute::ir::selection_set::generate_selection_set_ir //! [build_remote_relationship]: crate::execute::ir::relationship::build_remote_relationship //! [build_remote_command_relationship]: crate::execute::ir::relationship::build_remote_command_relationship @@ -89,7 +89,7 @@ use ndc_client as ndc; use self::types::{JoinNode, LocationKind, TargetField}; use super::ndc::{execute_ndc_query, FUNCTION_IR_VALUE_COLUMN_NAME}; -use super::query_plan::ProcessResponseAs; +use super::plan::ProcessResponseAs; use super::{error, ProjectId}; use types::{ diff --git a/v3/engine/src/execute/remote_joins/types.rs b/v3/engine/src/execute/remote_joins/types.rs index 804be5ed769..fa689ae0fd7 100644 --- a/v3/engine/src/execute/remote_joins/types.rs +++ b/v3/engine/src/execute/remote_joins/types.rs @@ -7,7 +7,7 @@ use open_dds::arguments::ArgumentName; use open_dds::types::FieldName; use std::collections::{BTreeMap, HashMap}; -use crate::execute::query_plan::ProcessResponseAs; +use crate::execute::plan::ProcessResponseAs; use crate::metadata::resolved; use crate::metadata::resolved::types::NdcColumnForComparison; use crate::utils::json_ext::ValueExt;