Group mutation nodes by connector (#406)

## Description

In order to implement transactions for the connectors that allow it, we
want to group mutation commands by connector. Then, we can execute a
group in a single transaction if possible, else just loop through it as
we always have. Grouping by connector also means we can access things
like capabilities for whole batches at a time.

## Changelog

- Add a changelog entry (in the "Changelog entry" section below) if the
changes in this PR have any user-facing impact. See [changelog
guide](https://github.com/hasura/graphql-engine-mono/wiki/Changelog-Guide).
- If no changelog is required ignore/remove this section and add a
`no-changelog-required` label to the PR.

### Product
_(Select all products this will be available in)_
- [ ] community-edition
- [ ] cloud
<!-- product : end : DO NOT REMOVE -->

### Type
<!-- See changelog structure:
https://github.com/hasura/graphql-engine-mono/wiki/Changelog-Guide#structure-of-our-changelog
-->
_(Select only one. In case of multiple, choose the most appropriate)_
- [ ] highlight
- [ ] enhancement
- [ ] bugfix
- [ ] behaviour-change
- [ ] performance-enhancement
- [ ] security-fix
<!-- type : end : DO NOT REMOVE -->

### Changelog entry
<!--
  - Add a user understandable changelog entry
- Include all details needed to understand the change. Try including
links to docs or issues if relevant
  - For Highlights start with a H4 heading (#### <entry title>)
  - Get the changelog entry reviewed by your team
-->

_Replace with changelog entry_

<!-- changelog-entry : end : DO NOT REMOVE -->

<!-- changelog : end : DO NOT REMOVE -->

V3_GIT_ORIGIN_REV_ID: 928715b78b214a3ac2452e8d1e5f33c87b20de33
This commit is contained in:
Tom Harding 2024-03-28 15:53:41 +01:00 committed by hasura-bot
parent 2ea7dbde76
commit fe6e99762e
3 changed files with 41 additions and 18 deletions

View File

@ -153,7 +153,8 @@ pub(crate) async fn explain_mutation_plan(
)); ));
} }
for (alias, ndc_mutation_execution) in mutation_plan.nodes { for (_, mutation_group) in mutation_plan.nodes {
for (alias, ndc_mutation_execution) in mutation_group {
let sequence_steps = get_execution_steps( let sequence_steps = get_execution_steps(
http_client, http_client,
alias, alias,
@ -165,6 +166,7 @@ pub(crate) async fn explain_mutation_plan(
.await; .await;
root_steps.push(Box::new(types::Step::Sequence(sequence_steps))); root_steps.push(Box::new(types::Step::Sequence(sequence_steps)));
} }
}
// simplify the steps // simplify the steps
match NonEmpty::from_vec(root_steps) { match NonEmpty::from_vec(root_steps) {

View File

@ -27,8 +27,17 @@ use crate::schema::GDS;
pub type QueryPlan<'n, 's, 'ir> = IndexMap<ast::Alias, NodeQueryPlan<'n, 's, 'ir>>; pub type QueryPlan<'n, 's, 'ir> = IndexMap<ast::Alias, NodeQueryPlan<'n, 's, 'ir>>;
/// Unlike a query, the root nodes of a mutation aren't necessarily independent. Specifically, the
/// GraphQL specification says that each root mutation must be executed sequentially. Moreover, if
/// we want to, say, insert a parent _and_ children in one query, we want the ability to make
/// transactional requests. In a mutation plan, we group nodes by connector, allowing us to issue
/// transactional commands to connectors whose capabilities allow for transactional mutations.
/// Otherwise, we can just send them one-by-one (though still sequentially).
pub struct MutationPlan<'n, 's, 'ir> { pub struct MutationPlan<'n, 's, 'ir> {
pub nodes: IndexMap<ast::Alias, NDCMutationExecution<'n, 's, 'ir>>, pub nodes: IndexMap<
resolved::data_connector::DataConnectorLink,
IndexMap<ast::Alias, NDCMutationExecution<'n, 's, 'ir>>,
>,
pub type_names: IndexMap<ast::Alias, ast::TypeName>, pub type_names: IndexMap<ast::Alias, ast::TypeName>,
} }
@ -184,9 +193,13 @@ pub fn generate_request_plan<'n, 's, 'ir>(
.insert(alias.clone(), type_name.clone()); .insert(alias.clone(), type_name.clone());
} }
root_field::MutationRootField::ProcedureBasedCommand { selection_set, ir } => { root_field::MutationRootField::ProcedureBasedCommand { selection_set, ir } => {
let plan = plan_mutation(selection_set, ir)?;
mutation_plan mutation_plan
.nodes .nodes
.insert(alias.clone(), plan_mutation(selection_set, ir)?); .entry(plan.data_connector.clone())
.or_default()
.insert(alias.clone(), plan);
} }
}; };
@ -728,12 +741,14 @@ pub async fn execute_mutation_plan<'n, 's, 'ir>(
)); ));
} }
for (alias, field_plan) in mutation_plan.nodes.into_iter() { for (_, mutation_group) in mutation_plan.nodes {
for (alias, field_plan) in mutation_group {
executed_root_fields.push(( executed_root_fields.push((
alias, alias,
execute_mutation_field_plan(http_client, field_plan, project_id.clone()).await, execute_mutation_field_plan(http_client, field_plan, project_id.clone()).await,
)); ));
} }
}
for executed_root_field in executed_root_fields.into_iter() { for executed_root_field in executed_root_fields.into_iter() {
let (alias, root_field) = executed_root_field; let (alias, root_field) = executed_root_field;

View File

@ -26,6 +26,12 @@ pub struct DataConnectorLink {
pub headers: SerializableHeaderMap, pub headers: SerializableHeaderMap,
} }
impl std::hash::Hash for DataConnectorLink {
fn hash<H>(&self, h: &mut H) where H: std::hash::Hasher {
self.name.hash(h)
}
}
impl DataConnectorLink { impl DataConnectorLink {
pub(crate) fn new( pub(crate) fn new(
name: Qualified<DataConnectorName>, name: Qualified<DataConnectorName>,