From fe6e99762ef65394306f3391e4ce6942e5c25cc0 Mon Sep 17 00:00:00 2001 From: Tom Harding Date: Thu, 28 Mar 2024 15:53:41 +0100 Subject: [PATCH] Group mutation nodes by connector (#406) ## Description In order to implement transactions for the connectors that allow it, we want to group mutation commands by connector. Then, we can execute a group in a single transaction if possible, else just loop through it as we always have. Grouping by connector also means we can access things like capabilities for whole batches at a time. ## Changelog - Add a changelog entry (in the "Changelog entry" section below) if the changes in this PR have any user-facing impact. See [changelog guide](https://github.com/hasura/graphql-engine-mono/wiki/Changelog-Guide). - If no changelog is required ignore/remove this section and add a `no-changelog-required` label to the PR. ### Product _(Select all products this will be available in)_ - [ ] community-edition - [ ] cloud ### Type _(Select only one. In case of multiple, choose the most appropriate)_ - [ ] highlight - [ ] enhancement - [ ] bugfix - [ ] behaviour-change - [ ] performance-enhancement - [ ] security-fix ### Changelog entry _Replace with changelog entry_ V3_GIT_ORIGIN_REV_ID: 928715b78b214a3ac2452e8d1e5f33c87b20de33 --- v3/crates/engine/src/execute/explain.rs | 24 ++++++++------- v3/crates/engine/src/execute/plan.rs | 29 ++++++++++++++----- .../src/metadata/resolved/data_connector.rs | 6 ++++ 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/v3/crates/engine/src/execute/explain.rs b/v3/crates/engine/src/execute/explain.rs index 80564b2ae03..2f578555944 100644 --- a/v3/crates/engine/src/execute/explain.rs +++ b/v3/crates/engine/src/execute/explain.rs @@ -153,17 +153,19 @@ pub(crate) async fn explain_mutation_plan( )); } - for (alias, ndc_mutation_execution) in mutation_plan.nodes { - let sequence_steps = get_execution_steps( - http_client, - alias, - &ndc_mutation_execution.process_response_as, - ndc_mutation_execution.join_locations, - types::NDCRequest::Mutation(ndc_mutation_execution.query), - ndc_mutation_execution.data_connector, - ) - .await; - root_steps.push(Box::new(types::Step::Sequence(sequence_steps))); + for (_, mutation_group) in mutation_plan.nodes { + for (alias, ndc_mutation_execution) in mutation_group { + let sequence_steps = get_execution_steps( + http_client, + alias, + &ndc_mutation_execution.process_response_as, + ndc_mutation_execution.join_locations, + types::NDCRequest::Mutation(ndc_mutation_execution.query), + ndc_mutation_execution.data_connector, + ) + .await; + root_steps.push(Box::new(types::Step::Sequence(sequence_steps))); + } } // simplify the steps diff --git a/v3/crates/engine/src/execute/plan.rs b/v3/crates/engine/src/execute/plan.rs index 872d48bc72f..51563868b60 100644 --- a/v3/crates/engine/src/execute/plan.rs +++ b/v3/crates/engine/src/execute/plan.rs @@ -27,8 +27,17 @@ use crate::schema::GDS; pub type QueryPlan<'n, 's, 'ir> = IndexMap>; +/// Unlike a query, the root nodes of a mutation aren't necessarily independent. Specifically, the +/// GraphQL specification says that each root mutation must be executed sequentially. Moreover, if +/// we want to, say, insert a parent _and_ children in one query, we want the ability to make +/// transactional requests. In a mutation plan, we group nodes by connector, allowing us to issue +/// transactional commands to connectors whose capabilities allow for transactional mutations. +/// Otherwise, we can just send them one-by-one (though still sequentially). pub struct MutationPlan<'n, 's, 'ir> { - pub nodes: IndexMap>, + pub nodes: IndexMap< + resolved::data_connector::DataConnectorLink, + IndexMap>, + >, pub type_names: IndexMap, } @@ -184,9 +193,13 @@ pub fn generate_request_plan<'n, 's, 'ir>( .insert(alias.clone(), type_name.clone()); } root_field::MutationRootField::ProcedureBasedCommand { selection_set, ir } => { + let plan = plan_mutation(selection_set, ir)?; + mutation_plan .nodes - .insert(alias.clone(), plan_mutation(selection_set, ir)?); + .entry(plan.data_connector.clone()) + .or_default() + .insert(alias.clone(), plan); } }; @@ -728,11 +741,13 @@ pub async fn execute_mutation_plan<'n, 's, 'ir>( )); } - for (alias, field_plan) in mutation_plan.nodes.into_iter() { - executed_root_fields.push(( - alias, - execute_mutation_field_plan(http_client, field_plan, project_id.clone()).await, - )); + for (_, mutation_group) in mutation_plan.nodes { + for (alias, field_plan) in mutation_group { + executed_root_fields.push(( + alias, + execute_mutation_field_plan(http_client, field_plan, project_id.clone()).await, + )); + } } for executed_root_field in executed_root_fields.into_iter() { diff --git a/v3/crates/engine/src/metadata/resolved/data_connector.rs b/v3/crates/engine/src/metadata/resolved/data_connector.rs index 827865fcf8a..92bbd79c667 100644 --- a/v3/crates/engine/src/metadata/resolved/data_connector.rs +++ b/v3/crates/engine/src/metadata/resolved/data_connector.rs @@ -26,6 +26,12 @@ pub struct DataConnectorLink { pub headers: SerializableHeaderMap, } +impl std::hash::Hash for DataConnectorLink { + fn hash(&self, h: &mut H) where H: std::hash::Hasher { + self.name.hash(h) + } +} + impl DataConnectorLink { pub(crate) fn new( name: Qualified,