Run mutations sequentially (#347)

V3_GIT_ORIGIN_REV_ID: 805e0d008954617e85ad6e1688f8016a85748bb5
This commit is contained in:
Tom Harding 2024-03-13 13:26:27 +01:00 committed by hasura-bot
parent 8924812bc1
commit ee225734f1

View File

@ -26,6 +26,7 @@ use crate::metadata::resolved::{self, subgraph};
use crate::schema::GDS;
pub type QueryPlan<'n, 's, 'ir> = IndexMap<ast::Alias, NodeQueryPlan<'n, 's, 'ir>>;
pub type MutationPlan<'n, 's, 'ir> = IndexMap<ast::Alias, NDCMutationExecution<'n, 's, 'ir>>;
/// Query plan of individual root field or node
#[derive(Debug)]
@ -510,6 +511,27 @@ async fn execute_field_plan<'n, 's, 'ir>(
.await
}
/// Extract the mutations from a query plan, so that we can run them sequentially.
pub fn separate_mutations<'n, 's, 'ir>(
query_plan: QueryPlan<'n, 's, 'ir>,
) -> (QueryPlan<'n, 's, 'ir>, MutationPlan<'n, 's, 'ir>) {
let mut mutations = IndexMap::new();
let mut queries = IndexMap::new();
for (alias, field_plan) in query_plan.into_iter() {
match field_plan {
NodeQueryPlan::NDCMutationExecution(ndc_mutation_execution) => {
mutations.insert(alias, ndc_mutation_execution);
}
otherwise => {
queries.insert(alias, otherwise);
}
};
}
(queries, mutations)
}
pub async fn execute_query_plan<'n, 's, 'ir>(
http_client: &reqwest::Client,
query_plan: QueryPlan<'n, 's, 'ir>,
@ -519,7 +541,23 @@ pub async fn execute_query_plan<'n, 's, 'ir>(
let mut tasks: Vec<_> = Vec::with_capacity(query_plan.capacity());
for (alias, field_plan) in query_plan.into_iter() {
let (queries, mutations) = separate_mutations(query_plan);
let mut executed_root_fields = Vec::new();
for (alias, field_plan) in mutations.into_iter() {
executed_root_fields.push((
alias,
execute_field_plan(
http_client,
NodeQueryPlan::NDCMutationExecution(field_plan),
project_id.clone(),
)
.await,
));
}
for (alias, field_plan) in queries.into_iter() {
// We are not running the field plans parallely here, we are just running them concurrently on a single thread.
// To run the field plans parallely, we will need to use tokio::spawn for each field plan.
let task = async {
@ -532,7 +570,7 @@ pub async fn execute_query_plan<'n, 's, 'ir>(
tasks.push(task);
}
let executed_root_fields = futures::future::join_all(tasks).await;
executed_root_fields.extend(futures::future::join_all(tasks).await);
for executed_root_field in executed_root_fields.into_iter() {
let (alias, root_field) = executed_root_field;