diff --git a/v3/engine/src/execute/query_plan.rs b/v3/engine/src/execute/query_plan.rs index b2302997673..e41549d6270 100644 --- a/v3/engine/src/execute/query_plan.rs +++ b/v3/engine/src/execute/query_plan.rs @@ -26,6 +26,7 @@ use crate::metadata::resolved::{self, subgraph}; use crate::schema::GDS; pub type QueryPlan<'n, 's, 'ir> = IndexMap>; +pub type MutationPlan<'n, 's, 'ir> = IndexMap>; /// Query plan of individual root field or node #[derive(Debug)] @@ -510,6 +511,27 @@ async fn execute_field_plan<'n, 's, 'ir>( .await } +/// Extract the mutations from a query plan, so that we can run them sequentially. +pub fn separate_mutations<'n, 's, 'ir>( + query_plan: QueryPlan<'n, 's, 'ir>, +) -> (QueryPlan<'n, 's, 'ir>, MutationPlan<'n, 's, 'ir>) { + let mut mutations = IndexMap::new(); + let mut queries = IndexMap::new(); + + for (alias, field_plan) in query_plan.into_iter() { + match field_plan { + NodeQueryPlan::NDCMutationExecution(ndc_mutation_execution) => { + mutations.insert(alias, ndc_mutation_execution); + } + otherwise => { + queries.insert(alias, otherwise); + } + }; + } + + (queries, mutations) +} + pub async fn execute_query_plan<'n, 's, 'ir>( http_client: &reqwest::Client, query_plan: QueryPlan<'n, 's, 'ir>, @@ -519,7 +541,23 @@ pub async fn execute_query_plan<'n, 's, 'ir>( let mut tasks: Vec<_> = Vec::with_capacity(query_plan.capacity()); - for (alias, field_plan) in query_plan.into_iter() { + let (queries, mutations) = separate_mutations(query_plan); + + let mut executed_root_fields = Vec::new(); + + for (alias, field_plan) in mutations.into_iter() { + executed_root_fields.push(( + alias, + execute_field_plan( + http_client, + NodeQueryPlan::NDCMutationExecution(field_plan), + project_id.clone(), + ) + .await, + )); + } + + for (alias, field_plan) in queries.into_iter() { // We are not running the field plans parallely here, we are just running them concurrently on a single thread. // To run the field plans parallely, we will need to use tokio::spawn for each field plan. let task = async { @@ -532,7 +570,7 @@ pub async fn execute_query_plan<'n, 's, 'ir>( tasks.push(task); } - let executed_root_fields = futures::future::join_all(tasks).await; + executed_root_fields.extend(futures::future::join_all(tasks).await); for executed_root_field in executed_root_fields.into_iter() { let (alias, root_field) = executed_root_field;