diff --git a/v3/engine/benches/execute.rs b/v3/engine/benches/execute.rs index 3bc0f6c156f..dcc8214c684 100644 --- a/v3/engine/benches/execute.rs +++ b/v3/engine/benches/execute.rs @@ -1,5 +1,5 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; -use engine::execute::query_plan::{execute_query_plan, generate_query_plan}; +use engine::execute::plan::{execute_mutation_plan, execute_query_plan, generate_request_plan}; use engine::execute::{execute_query_internal, generate_ir}; use engine::schema::GDS; use hasura_authn_core::Identity; @@ -120,7 +120,7 @@ pub fn bench_execute( &(&runtime), |b, runtime| { b.to_async(*runtime) - .iter(|| async { generate_query_plan(&ir).unwrap() }) + .iter(|| async { generate_request_plan(&ir).unwrap() }) }, ); @@ -130,7 +130,14 @@ pub fn bench_execute( &(&runtime), |b, runtime| { b.to_async(*runtime).iter(|| async { - execute_query_plan(&http_client, generate_query_plan(&ir).unwrap(), None).await + match generate_request_plan(&ir).unwrap() { + engine::execute::plan::RequestPlan::QueryPlan(query_plan) => { + execute_query_plan(&http_client, query_plan, None).await + } + engine::execute::plan::RequestPlan::MutationPlan(mutation_plan) => { + execute_mutation_plan(&http_client, mutation_plan, None).await + } + } }) }, ); diff --git a/v3/engine/src/execute.rs b/v3/engine/src/execute.rs index 1a782172096..7fb02ce3127 100644 --- a/v3/engine/src/execute.rs +++ b/v3/engine/src/execute.rs @@ -24,8 +24,8 @@ pub mod global_id; pub mod ir; pub mod model_tracking; pub mod ndc; +pub mod plan; pub mod process_response; -pub mod query_plan; pub mod remote_joins; #[derive(Debug)] @@ -203,9 +203,9 @@ pub async fn execute_request_internal( generate_ir(schema, session, &normalized_request) })?; - // construct a query plan - let query_plan = tracer.in_span("plan", SpanVisibility::Internal, || { - query_plan::generate_query_plan(&ir) + // construct a plan to execute the request + let request_plan = tracer.in_span("plan", SpanVisibility::Internal, || { + plan::generate_request_plan(&ir) })?; // execute/explain the query plan @@ -224,12 +224,25 @@ pub async fn execute_request_internal( ); Box::pin(async { - let execute_query_result = query_plan::execute_query_plan( - http_client, - query_plan, - project_id, - ) - .await; + let execute_query_result = match request_plan { + plan::RequestPlan::MutationPlan(mutation_plan) => { + plan::execute_mutation_plan( + http_client, + mutation_plan, + project_id, + ) + .await + } + plan::RequestPlan::QueryPlan(query_plan) => { + plan::execute_query_plan( + http_client, + query_plan, + project_id, + ) + .await + } + }; + ExecuteOrExplainResponse::Execute(GraphQLResponse( execute_query_result.to_graphql_response(), )) @@ -241,19 +254,30 @@ pub async fn execute_request_internal( tracer .in_span_async("explain", SpanVisibility::Internal, || { Box::pin(async { + let request_result = match request_plan { + plan::RequestPlan::MutationPlan(mutation_plan) => { + crate::execute::explain::explain_mutation_plan( + http_client, + mutation_plan, + ) + .await + } + plan::RequestPlan::QueryPlan(query_plan) => { + crate::execute::explain::explain_query_plan( + http_client, + query_plan, + ) + .await + } + }; + // convert the query plan to explain step - let explain_response = - match crate::execute::explain::explain_query_plan( - http_client, - query_plan, - ) - .await - { - Ok(step) => step.make_explain_response(), - Err(e) => explain::types::ExplainResponse::error( - e.to_graphql_error(None), - ), - }; + let explain_response = match request_result { + Ok(step) => step.make_explain_response(), + Err(e) => explain::types::ExplainResponse::error( + e.to_graphql_error(None), + ), + }; ExecuteOrExplainResponse::Explain(explain_response) }) diff --git a/v3/engine/src/execute/explain.rs b/v3/engine/src/execute/explain.rs index 812f7363753..282d13e6491 100644 --- a/v3/engine/src/execute/explain.rs +++ b/v3/engine/src/execute/explain.rs @@ -1,8 +1,8 @@ use super::remote_joins::types::{JoinNode, RemoteJoinType}; use super::ExecuteOrExplainResponse; -use crate::execute::query_plan::{NodeQueryPlan, ProcessResponseAs}; +use crate::execute::plan::{NodeMutationPlan, NodeQueryPlan, ProcessResponseAs}; use crate::execute::remote_joins::types::{JoinId, JoinLocations, RemoteJoin}; -use crate::execute::{error, query_plan}; +use crate::execute::{error, plan}; use crate::metadata::resolved; use crate::schema::GDS; use crate::utils::async_map::AsyncMap; @@ -53,9 +53,10 @@ pub async fn execute_explain_internal( } } +/// Produce an /explain plan for a given GraphQL query. pub(crate) async fn explain_query_plan( http_client: &reqwest::Client, - query_plan: query_plan::QueryPlan<'_, '_, '_>, + query_plan: plan::QueryPlan<'_, '_, '_>, ) -> Result { let mut parallel_root_steps = vec![]; // Here, we are assuming that all root fields are executed in parallel. @@ -105,18 +106,6 @@ pub(crate) async fn explain_query_plan( "cannot explain relay queries with no execution plan".to_string(), )); } - NodeQueryPlan::NDCMutationExecution(ndc_mutation_execution) => { - let sequence_steps = get_execution_steps( - http_client, - alias, - &ndc_mutation_execution.process_response_as, - ndc_mutation_execution.join_locations, - types::NDCRequest::Mutation(ndc_mutation_execution.query), - ndc_mutation_execution.data_connector, - ) - .await; - parallel_root_steps.push(Box::new(types::Step::Sequence(sequence_steps))); - } } } // simplify the steps @@ -132,6 +121,47 @@ pub(crate) async fn explain_query_plan( } } +/// Produce an /explain plan for a given GraphQL mutation. +pub(crate) async fn explain_mutation_plan( + http_client: &reqwest::Client, + mutation_plan: plan::MutationPlan<'_, '_, '_>, +) -> Result { + let mut root_steps = vec![]; + + for (alias, node) in mutation_plan { + match node { + NodeMutationPlan::TypeName { .. } => { + return Err(error::Error::ExplainError( + "cannot explain introspection queries".to_string(), + )); + } + NodeMutationPlan::NDCMutationExecution(ndc_mutation_execution) => { + let sequence_steps = get_execution_steps( + http_client, + alias, + &ndc_mutation_execution.process_response_as, + ndc_mutation_execution.join_locations, + types::NDCRequest::Mutation(ndc_mutation_execution.query), + ndc_mutation_execution.data_connector, + ) + .await; + root_steps.push(Box::new(types::Step::Sequence(sequence_steps))); + } + } + } + + // simplify the steps + match NonEmpty::from_vec(root_steps) { + Some(root_steps) => { + let simplified_step = simplify_step(Box::new(types::Step::Sequence(root_steps))); + Ok(*simplified_step) + } + None => Err(error::Error::ExplainError( + "cannot explain mutation as there are no explainable root fields".to_string(), + )), + } +} + async fn get_execution_steps<'s>( http_client: &reqwest::Client, alias: gql::ast::common::Alias, diff --git a/v3/engine/src/execute/ndc.rs b/v3/engine/src/execute/ndc.rs index b853e52acd4..3de132cedc6 100644 --- a/v3/engine/src/execute/ndc.rs +++ b/v3/engine/src/execute/ndc.rs @@ -7,8 +7,8 @@ use lang_graphql::ast::common as ast; use ndc_client as ndc; use tracing_util::{set_attribute_on_active_span, AttributeVisibility, SpanVisibility}; +use super::plan::ProcessResponseAs; use super::process_response::process_command_mutation_response; -use super::query_plan::ProcessResponseAs; use super::{error, ProjectId}; use crate::metadata::resolved; use crate::schema::GDS; diff --git a/v3/engine/src/execute/query_plan.rs b/v3/engine/src/execute/plan.rs similarity index 78% rename from v3/engine/src/execute/query_plan.rs rename to v3/engine/src/execute/plan.rs index e41549d6270..facff7dce93 100644 --- a/v3/engine/src/execute/query_plan.rs +++ b/v3/engine/src/execute/plan.rs @@ -26,7 +26,14 @@ use crate::metadata::resolved::{self, subgraph}; use crate::schema::GDS; pub type QueryPlan<'n, 's, 'ir> = IndexMap>; -pub type MutationPlan<'n, 's, 'ir> = IndexMap>; +pub type MutationPlan<'n, 's, 'ir> = IndexMap>; + +// At least for now, requests are _either_ queries or mutations, and a mix of the two can be +// treated as an invalid request. We may want to change this in the future. +pub enum RequestPlan<'n, 's, 'ir> { + QueryPlan(QueryPlan<'n, 's, 'ir>), + MutationPlan(MutationPlan<'n, 's, 'ir>), +} /// Query plan of individual root field or node #[derive(Debug)] @@ -50,6 +57,13 @@ pub enum NodeQueryPlan<'n, 's, 'ir> { NDCQueryExecution(NDCQueryExecution<'s, 'ir>), /// NDC query for Relay 'node' to be executed RelayNodeSelect(Option>), +} + +/// Mutation plan of individual root field or node. +/// Note that __typename exists for mutations, but __schema and __type do not. +pub enum NodeMutationPlan<'n, 's, 'ir> { + /// __typename field on mutation root + TypeName { type_name: ast::TypeName }, /// NDC mutation to be executed NDCMutationExecution(NDCMutationExecution<'n, 's, 'ir>), } @@ -113,25 +127,65 @@ impl<'ir> ProcessResponseAs<'ir> { } } -pub fn generate_query_plan<'n, 's, 'ir>( +/// Build a plan to handle a given request. This plan will either be a mutation plan or a query +/// plan, but currently can't be both. This may change when we support protocols other than +/// GraphQL. +pub fn generate_request_plan<'n, 's, 'ir>( ir: &'ir IndexMap>, -) -> Result, error::Error> { - let mut query_plan = IndexMap::new(); +) -> Result, error::Error> { + let mut request_plan = None; + for (alias, field) in ir.into_iter() { - let field_plan = match field { - root_field::RootField::QueryRootField(field_ir) => plan_query(field_ir), - root_field::RootField::MutationRootField(field_ir) => plan_mutation(field_ir), - }?; - query_plan.insert(alias.clone(), field_plan); + match field { + root_field::RootField::QueryRootField(field_ir) => { + let mut query_plan = match request_plan { + Some(RequestPlan::MutationPlan(_)) => Err(error::Error::InternalError( + error::InternalError::Engine(error::InternalEngineError::InternalGeneric { + description: + "Parsed engine request contains mixed mutation/query operations" + .to_string(), + }), + ))?, + Some(RequestPlan::QueryPlan(query_plan)) => query_plan, + None => IndexMap::new(), + }; + + query_plan.insert(alias.clone(), plan_query(field_ir)?); + request_plan = Some(RequestPlan::QueryPlan(query_plan)); + } + + root_field::RootField::MutationRootField(field_ir) => { + let mut mutation_plan = match request_plan { + Some(RequestPlan::QueryPlan(_)) => Err(error::Error::InternalError( + error::InternalError::Engine(error::InternalEngineError::InternalGeneric { + description: + "Parsed engine request contains mixed mutation/query operations" + .to_string(), + }), + ))?, + Some(RequestPlan::MutationPlan(mutation_plan)) => mutation_plan, + None => IndexMap::new(), + }; + + mutation_plan.insert(alias.clone(), plan_mutation(field_ir)?); + request_plan = Some(RequestPlan::MutationPlan(mutation_plan)); + } + } } - Ok(query_plan) + + request_plan.ok_or(error::Error::InternalError(error::InternalError::Engine( + error::InternalEngineError::InternalGeneric { + description: "Parsed an empty request".to_string(), + }, + ))) } +// Given a singular root field of a mutation, plan the execution of that root field. fn plan_mutation<'n, 's, 'ir>( ir: &'ir root_field::MutationRootField<'n, 's>, -) -> Result, error::Error> { +) -> Result, error::Error> { let plan = match ir { - root_field::MutationRootField::TypeName { type_name } => NodeQueryPlan::TypeName { + root_field::MutationRootField::TypeName { type_name } => NodeMutationPlan::TypeName { type_name: type_name.clone(), }, root_field::MutationRootField::ProcedureBasedCommand { ir, selection_set } => { @@ -139,7 +193,7 @@ fn plan_mutation<'n, 's, 'ir>( let (ndc_ir, join_locations) = commands::ndc_mutation_ir(ir.procedure_name, ir, &mut join_id_counter)?; let join_locations_ids = assign_with_join_ids(join_locations)?; - NodeQueryPlan::NDCMutationExecution(NDCMutationExecution { + NodeMutationPlan::NDCMutationExecution(NDCMutationExecution { query: ndc_ir, join_locations: join_locations_ids, data_connector: ir.command_info.data_connector, @@ -156,6 +210,7 @@ fn plan_mutation<'n, 's, 'ir>( Ok(plan) } +// Given a singular root field of a query, plan the execution of that root field. fn plan_query<'n, 's, 'ir>( ir: &'ir root_field::QueryRootField<'n, 's>, ) -> Result, error::Error> { @@ -433,19 +488,20 @@ impl ExecuteQueryResult { } } -async fn execute_field_plan<'n, 's, 'ir>( +/// Execute a single root field's query plan to produce a result. +async fn execute_query_field_plan<'n, 's, 'ir>( http_client: &reqwest::Client, - field_plan: NodeQueryPlan<'n, 's, 'ir>, + query_plan: NodeQueryPlan<'n, 's, 'ir>, project_id: Option, ) -> RootFieldResult { let tracer = tracing_util::global_tracer(); tracer .in_span_async( - "execute_field_plan", + "execute_query_field_plan", tracing_util::SpanVisibility::User, || { Box::pin(async { - match field_plan { + match query_plan { NodeQueryPlan::TypeName { type_name } => { set_attribute_on_active_span( AttributeVisibility::Default, @@ -492,11 +548,6 @@ async fn execute_field_plan<'n, 's, 'ir>( &ndc_query.process_response_as.is_nullable(), resolve_ndc_query_execution(http_client, ndc_query, project_id).await, ), - NodeQueryPlan::NDCMutationExecution(ndc_query) => RootFieldResult::new( - &ndc_query.process_response_as.is_nullable(), - resolve_ndc_mutation_execution(http_client, ndc_query, project_id) - .await, - ), NodeQueryPlan::RelayNodeSelect(optional_query) => RootFieldResult::new( &optional_query.as_ref().map_or(true, |ndc_query| { ndc_query.process_response_as.is_nullable() @@ -511,27 +562,77 @@ async fn execute_field_plan<'n, 's, 'ir>( .await } -/// Extract the mutations from a query plan, so that we can run them sequentially. -pub fn separate_mutations<'n, 's, 'ir>( - query_plan: QueryPlan<'n, 's, 'ir>, -) -> (QueryPlan<'n, 's, 'ir>, MutationPlan<'n, 's, 'ir>) { - let mut mutations = IndexMap::new(); - let mut queries = IndexMap::new(); - - for (alias, field_plan) in query_plan.into_iter() { - match field_plan { - NodeQueryPlan::NDCMutationExecution(ndc_mutation_execution) => { - mutations.insert(alias, ndc_mutation_execution); - } - otherwise => { - queries.insert(alias, otherwise); - } - }; - } - - (queries, mutations) +/// Execute a single root field's mutation plan to produce a result. +async fn execute_mutation_field_plan<'n, 's, 'ir>( + http_client: &reqwest::Client, + mutation_plan: NodeMutationPlan<'n, 's, 'ir>, + project_id: Option, +) -> RootFieldResult { + let tracer = tracing_util::global_tracer(); + tracer + .in_span_async( + "execute_mutation_field_plan", + tracing_util::SpanVisibility::User, + || { + Box::pin(async { + match mutation_plan { + NodeMutationPlan::TypeName { type_name } => { + set_attribute_on_active_span( + AttributeVisibility::Default, + "field", + "__typename", + ); + RootFieldResult::new( + &false, // __typename: String! ; the __typename field is not nullable + resolve_type_name(type_name), + ) + } + NodeMutationPlan::NDCMutationExecution(ndc_mutation) => { + RootFieldResult::new( + &ndc_mutation.process_response_as.is_nullable(), + resolve_ndc_mutation_execution( + http_client, + ndc_mutation, + project_id, + ) + .await, + ) + } + } + }) + }, + ) + .await } +/// Given an entire plan for a mutation, produce a result. We do this by executing the singular +/// root fields of the mutation sequentially rather than concurrently, in the order defined by the +/// `IndexMap`'s keys. +pub async fn execute_mutation_plan<'n, 's, 'ir>( + http_client: &reqwest::Client, + mutation_plan: MutationPlan<'n, 's, 'ir>, + project_id: Option, +) -> ExecuteQueryResult { + let mut root_fields = IndexMap::new(); + let mut executed_root_fields = Vec::new(); + + for (alias, field_plan) in mutation_plan.into_iter() { + executed_root_fields.push(( + alias, + execute_mutation_field_plan(http_client, field_plan, project_id.clone()).await, + )); + } + + for executed_root_field in executed_root_fields.into_iter() { + let (alias, root_field) = executed_root_field; + root_fields.insert(alias, root_field); + } + + ExecuteQueryResult { root_fields } +} + +/// Given an entire plan for a query, produce a result. We do this by executing all the singular +/// root fields of the query in parallel, and joining the results back together. pub async fn execute_query_plan<'n, 's, 'ir>( http_client: &reqwest::Client, query_plan: QueryPlan<'n, 's, 'ir>, @@ -541,36 +642,20 @@ pub async fn execute_query_plan<'n, 's, 'ir>( let mut tasks: Vec<_> = Vec::with_capacity(query_plan.capacity()); - let (queries, mutations) = separate_mutations(query_plan); - - let mut executed_root_fields = Vec::new(); - - for (alias, field_plan) in mutations.into_iter() { - executed_root_fields.push(( - alias, - execute_field_plan( - http_client, - NodeQueryPlan::NDCMutationExecution(field_plan), - project_id.clone(), - ) - .await, - )); - } - - for (alias, field_plan) in queries.into_iter() { + for (alias, field_plan) in query_plan.into_iter() { // We are not running the field plans parallely here, we are just running them concurrently on a single thread. // To run the field plans parallely, we will need to use tokio::spawn for each field plan. let task = async { ( alias, - execute_field_plan(http_client, field_plan, project_id.clone()).await, + execute_query_field_plan(http_client, field_plan, project_id.clone()).await, ) }; tasks.push(task); } - executed_root_fields.extend(futures::future::join_all(tasks).await); + let executed_root_fields = futures::future::join_all(tasks).await; for executed_root_field in executed_root_fields.into_iter() { let (alias, root_field) = executed_root_field; diff --git a/v3/engine/src/execute/query_plan/commands.rs b/v3/engine/src/execute/plan/commands.rs similarity index 100% rename from v3/engine/src/execute/query_plan/commands.rs rename to v3/engine/src/execute/plan/commands.rs diff --git a/v3/engine/src/execute/query_plan/model_selection.rs b/v3/engine/src/execute/plan/model_selection.rs similarity index 100% rename from v3/engine/src/execute/query_plan/model_selection.rs rename to v3/engine/src/execute/plan/model_selection.rs diff --git a/v3/engine/src/execute/query_plan/relationships.rs b/v3/engine/src/execute/plan/relationships.rs similarity index 100% rename from v3/engine/src/execute/query_plan/relationships.rs rename to v3/engine/src/execute/plan/relationships.rs diff --git a/v3/engine/src/execute/query_plan/selection_set.rs b/v3/engine/src/execute/plan/selection_set.rs similarity index 100% rename from v3/engine/src/execute/query_plan/selection_set.rs rename to v3/engine/src/execute/plan/selection_set.rs diff --git a/v3/engine/src/execute/process_response.rs b/v3/engine/src/execute/process_response.rs index 1fbf923d68c..d99042591ae 100644 --- a/v3/engine/src/execute/process_response.rs +++ b/v3/engine/src/execute/process_response.rs @@ -16,7 +16,7 @@ use open_dds::types::FieldName; use super::error; use super::global_id::{global_id_col_format, GLOBAL_ID_VERSION}; use super::ndc::FUNCTION_IR_VALUE_COLUMN_NAME; -use super::query_plan::ProcessResponseAs; +use super::plan::ProcessResponseAs; use crate::metadata::resolved::subgraph::Qualified; use crate::schema::{ types::{Annotation, GlobalID, OutputAnnotation}, diff --git a/v3/engine/src/execute/remote_joins.rs b/v3/engine/src/execute/remote_joins.rs index b0f175d2f03..970ace9ca8d 100644 --- a/v3/engine/src/execute/remote_joins.rs +++ b/v3/engine/src/execute/remote_joins.rs @@ -71,7 +71,7 @@ //! //! 5. Perform join on LHS response and RHS response //! -//! [process_selection_set_ir]: crate::execute::query_plan::selection_set::process_selection_set_ir +//! [process_selection_set_ir]: crate::execute::plan::selection_set::process_selection_set_ir //! [generate_selection_set_ir]: crate::execute::ir::selection_set::generate_selection_set_ir //! [build_remote_relationship]: crate::execute::ir::relationship::build_remote_relationship //! [build_remote_command_relationship]: crate::execute::ir::relationship::build_remote_command_relationship @@ -89,7 +89,7 @@ use ndc_client as ndc; use self::types::{JoinNode, LocationKind, TargetField}; use super::ndc::{execute_ndc_query, FUNCTION_IR_VALUE_COLUMN_NAME}; -use super::query_plan::ProcessResponseAs; +use super::plan::ProcessResponseAs; use super::{error, ProjectId}; use types::{ diff --git a/v3/engine/src/execute/remote_joins/types.rs b/v3/engine/src/execute/remote_joins/types.rs index 804be5ed769..fa689ae0fd7 100644 --- a/v3/engine/src/execute/remote_joins/types.rs +++ b/v3/engine/src/execute/remote_joins/types.rs @@ -7,7 +7,7 @@ use open_dds::arguments::ArgumentName; use open_dds::types::FieldName; use std::collections::{BTreeMap, HashMap}; -use crate::execute::query_plan::ProcessResponseAs; +use crate::execute::plan::ProcessResponseAs; use crate::metadata::resolved; use crate::metadata::resolved::types::NdcColumnForComparison; use crate::utils::json_ext::ValueExt;