mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-15 09:22:43 +03:00
Clone type_container
instead of maintaining reference (#1150)
<!-- The PR description should answer 2 important questions: --> ### What We need less lifetimes in our plan, particular those that are connected to graphql-specific IR. This makes this reference a copy and fixes call sites. Functional no-op. V3_GIT_ORIGIN_REV_ID: beb05a09a30cd33240e916255265c41db17c0c78
This commit is contained in:
parent
2839170ddf
commit
0e4a3afbca
@ -1,3 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
mod arguments;
|
||||
mod commands;
|
||||
pub mod error;
|
||||
@ -47,10 +49,10 @@ pub type QueryPlan<'n, 's, 'ir> = IndexMap<ast::Alias, NodeQueryPlan<'n, 's, 'ir
|
||||
/// transactional requests. In a mutation plan, we group nodes by connector, allowing us to issue
|
||||
/// transactional commands to connectors whose capabilities allow for transactional mutations.
|
||||
/// Otherwise, we can just send them one-by-one (though still sequentially).
|
||||
pub struct MutationPlan<'n, 's, 'ir> {
|
||||
pub struct MutationPlan<'n, 's> {
|
||||
pub nodes: IndexMap<
|
||||
metadata_resolve::DataConnectorLink,
|
||||
IndexMap<ast::Alias, NDCMutationExecution<'n, 's, 'ir>>,
|
||||
IndexMap<ast::Alias, NDCMutationExecution<'n, 's>>,
|
||||
>,
|
||||
pub type_names: IndexMap<ast::Alias, ast::TypeName>,
|
||||
}
|
||||
@ -59,7 +61,7 @@ pub struct MutationPlan<'n, 's, 'ir> {
|
||||
// treated as an invalid request. We may want to change this in the future.
|
||||
pub enum RequestPlan<'n, 's, 'ir> {
|
||||
QueryPlan(QueryPlan<'n, 's, 'ir>),
|
||||
MutationPlan(MutationPlan<'n, 's, 'ir>),
|
||||
MutationPlan(MutationPlan<'n, 's>),
|
||||
SubscriptionPlan(ast::Alias, NDCSubscriptionExecution<'s, 'ir>),
|
||||
}
|
||||
|
||||
@ -83,13 +85,13 @@ pub enum NodeQueryPlan<'n, 's, 'ir> {
|
||||
},
|
||||
/// NDC query to be executed
|
||||
NDCQueryExecution {
|
||||
query_execution: NDCQueryExecution<'s, 'ir>,
|
||||
query_execution: NDCQueryExecution<'s>,
|
||||
selection_set: &'ir normalized_ast::SelectionSet<'s, GDS>,
|
||||
},
|
||||
/// NDC query for Relay 'node' to be executed
|
||||
RelayNodeSelect(
|
||||
Option<(
|
||||
NDCQueryExecution<'s, 'ir>,
|
||||
NDCQueryExecution<'s>,
|
||||
&'ir normalized_ast::SelectionSet<'s, GDS>,
|
||||
)>,
|
||||
),
|
||||
@ -98,11 +100,11 @@ pub enum NodeQueryPlan<'n, 's, 'ir> {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct NDCQueryExecution<'s, 'ir> {
|
||||
pub execution_tree: ExecutionTree<'s, 'ir>,
|
||||
pub struct NDCQueryExecution<'s> {
|
||||
pub execution_tree: ExecutionTree<'s>,
|
||||
pub execution_span_attribute: &'static str,
|
||||
pub field_span_attribute: String,
|
||||
pub process_response_as: ProcessResponseAs<'ir>,
|
||||
pub process_response_as: ProcessResponseAs,
|
||||
}
|
||||
|
||||
pub struct NDCSubscriptionExecution<'s, 'ir> {
|
||||
@ -110,7 +112,7 @@ pub struct NDCSubscriptionExecution<'s, 'ir> {
|
||||
pub polling_interval_ms: u64,
|
||||
pub execution_span_attribute: &'static str,
|
||||
pub field_span_attribute: String,
|
||||
pub process_response_as: ProcessResponseAs<'ir>,
|
||||
pub process_response_as: ProcessResponseAs,
|
||||
pub selection_set: &'ir normalized_ast::SelectionSet<'s, GDS>,
|
||||
}
|
||||
|
||||
@ -119,7 +121,7 @@ pub enum ApolloFederationSelect<'n, 's, 'ir> {
|
||||
/// NDC queries for Apollo Federation '_entities' to be executed
|
||||
EntitiesSelect(
|
||||
Vec<(
|
||||
NDCQueryExecution<'s, 'ir>,
|
||||
NDCQueryExecution<'s>,
|
||||
&'ir normalized_ast::SelectionSet<'s, GDS>,
|
||||
)>,
|
||||
),
|
||||
@ -130,24 +132,24 @@ pub enum ApolloFederationSelect<'n, 's, 'ir> {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct NDCMutationExecution<'n, 's, 'ir> {
|
||||
pub struct NDCMutationExecution<'n, 's> {
|
||||
pub execution_node: mutation::UnresolvedMutationExecutionPlan<'s>,
|
||||
pub join_locations: JoinLocations<'s, 'ir>,
|
||||
pub join_locations: JoinLocations<'s>,
|
||||
pub data_connector: &'s metadata_resolve::DataConnectorLink,
|
||||
pub execution_span_attribute: String,
|
||||
pub field_span_attribute: String,
|
||||
pub process_response_as: ProcessResponseAs<'ir>,
|
||||
pub process_response_as: ProcessResponseAs,
|
||||
pub selection_set: &'n normalized_ast::SelectionSet<'s, GDS>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ExecutionTree<'s, 'ir> {
|
||||
pub struct ExecutionTree<'s> {
|
||||
pub query_execution_plan: query::UnresolvedQueryExecutionPlan<'s>,
|
||||
pub remote_join_executions: JoinLocations<'s, 'ir>,
|
||||
pub remote_join_executions: JoinLocations<'s>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
pub enum ProcessResponseAs<'ir> {
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum ProcessResponseAs {
|
||||
Object {
|
||||
is_nullable: bool,
|
||||
},
|
||||
@ -155,15 +157,15 @@ pub enum ProcessResponseAs<'ir> {
|
||||
is_nullable: bool,
|
||||
},
|
||||
CommandResponse {
|
||||
command_name: &'ir metadata_resolve::Qualified<open_dds::commands::CommandName>,
|
||||
type_container: &'ir ast::TypeContainer<ast::TypeName>,
|
||||
command_name: Arc<metadata_resolve::Qualified<open_dds::commands::CommandName>>,
|
||||
type_container: ast::TypeContainer<ast::TypeName>,
|
||||
// how to process a command response
|
||||
response_config: &'ir Option<metadata_resolve::data_connectors::CommandsResponseConfig>,
|
||||
response_config: Option<Arc<metadata_resolve::data_connectors::CommandsResponseConfig>>,
|
||||
},
|
||||
Aggregates,
|
||||
}
|
||||
|
||||
impl<'ir> ProcessResponseAs<'ir> {
|
||||
impl ProcessResponseAs {
|
||||
pub fn is_nullable(&self) -> bool {
|
||||
match self {
|
||||
ProcessResponseAs::Object { is_nullable }
|
||||
@ -220,10 +222,10 @@ pub fn generate_request_plan<'n, 's, 'ir>(
|
||||
}
|
||||
|
||||
// Given a singular root field of a mutation, plan the execution of that root field.
|
||||
fn plan_mutation<'n, 's, 'ir>(
|
||||
fn plan_mutation<'n, 's>(
|
||||
selection_set: &'n gql::normalized_ast::SelectionSet<'s, GDS>,
|
||||
ir: &'ir graphql_ir::ProcedureBasedCommand<'s>,
|
||||
) -> Result<NDCMutationExecution<'n, 's, 'ir>, error::Error> {
|
||||
ir: &graphql_ir::ProcedureBasedCommand<'s>,
|
||||
) -> Result<NDCMutationExecution<'n, 's>, error::Error> {
|
||||
let (ndc_ir, join_locations) = commands::plan_mutation_execution(ir.procedure_name, ir)?;
|
||||
Ok(NDCMutationExecution {
|
||||
execution_node: ndc_ir,
|
||||
@ -233,9 +235,9 @@ fn plan_mutation<'n, 's, 'ir>(
|
||||
execution_span_attribute: "execute_command".into(),
|
||||
field_span_attribute: ir.command_info.field_name.to_string(),
|
||||
process_response_as: ProcessResponseAs::CommandResponse {
|
||||
command_name: &ir.command_info.command_name,
|
||||
type_container: &ir.command_info.type_container,
|
||||
response_config: &ir.command_info.data_connector.response_config,
|
||||
command_name: ir.command_info.command_name.clone(),
|
||||
type_container: ir.command_info.type_container.clone(),
|
||||
response_config: ir.command_info.data_connector.response_config.clone(),
|
||||
},
|
||||
})
|
||||
}
|
||||
@ -301,9 +303,9 @@ fn plan_subscription<'s, 'ir>(
|
||||
}
|
||||
}
|
||||
|
||||
fn reject_remote_joins<'s>(
|
||||
tree: ExecutionTree<'s, '_>,
|
||||
) -> Result<query::UnresolvedQueryExecutionPlan<'s>, error::Error> {
|
||||
fn reject_remote_joins(
|
||||
tree: ExecutionTree,
|
||||
) -> Result<query::UnresolvedQueryExecutionPlan, error::Error> {
|
||||
if !tree.remote_join_executions.is_empty() {
|
||||
return Err(error::Error::RemoteJoinsAreNotSupportedSubscriptions);
|
||||
}
|
||||
@ -407,9 +409,9 @@ fn plan_query<'n, 's, 'ir>(
|
||||
execution_span_attribute: "execute_command",
|
||||
field_span_attribute: ir.command_info.field_name.to_string(),
|
||||
process_response_as: ProcessResponseAs::CommandResponse {
|
||||
command_name: &ir.command_info.command_name,
|
||||
type_container: &ir.command_info.type_container,
|
||||
response_config: &ir.command_info.data_connector.response_config,
|
||||
command_name: ir.command_info.command_name.clone(),
|
||||
type_container: ir.command_info.type_container.clone(),
|
||||
response_config: ir.command_info.data_connector.response_config.clone(),
|
||||
},
|
||||
},
|
||||
}
|
||||
@ -453,9 +455,7 @@ fn plan_query<'n, 's, 'ir>(
|
||||
Ok(query_plan)
|
||||
}
|
||||
|
||||
fn generate_execution_tree<'s, 'ir>(
|
||||
ir: &'ir ModelSelection<'s>,
|
||||
) -> Result<ExecutionTree<'s, 'ir>, error::Error> {
|
||||
fn generate_execution_tree<'s>(ir: &ModelSelection<'s>) -> Result<ExecutionTree<'s>, error::Error> {
|
||||
let (query_execution_plan, join_locations) = model_selection::plan_query_execution(ir)?;
|
||||
Ok(ExecutionTree {
|
||||
query_execution_plan,
|
||||
@ -734,9 +734,9 @@ async fn execute_query_field_plan<'n, 's, 'ir>(
|
||||
}
|
||||
|
||||
/// Execute a single root field's mutation plan to produce a result.
|
||||
async fn execute_mutation_field_plan<'n, 's, 'ir>(
|
||||
async fn execute_mutation_field_plan<'n, 's>(
|
||||
http_context: &HttpContext,
|
||||
mutation_plan: NDCMutationExecution<'n, 's, 'ir>,
|
||||
mutation_plan: NDCMutationExecution<'n, 's>,
|
||||
project_id: Option<&ProjectId>,
|
||||
) -> RootFieldResult {
|
||||
let tracer = tracing_util::global_tracer();
|
||||
@ -761,9 +761,9 @@ async fn execute_mutation_field_plan<'n, 's, 'ir>(
|
||||
/// Given an entire plan for a mutation, produce a result. We do this by executing the singular
|
||||
/// root fields of the mutation sequentially rather than concurrently, in the order defined by the
|
||||
/// `IndexMap`'s keys.
|
||||
pub async fn execute_mutation_plan<'n, 's, 'ir>(
|
||||
pub async fn execute_mutation_plan<'n, 's>(
|
||||
http_context: &HttpContext,
|
||||
mutation_plan: MutationPlan<'n, 's, 'ir>,
|
||||
mutation_plan: MutationPlan<'n, 's>,
|
||||
project_id: Option<&ProjectId>,
|
||||
) -> ExecuteQueryResult {
|
||||
let mut root_fields = IndexMap::new();
|
||||
@ -861,7 +861,7 @@ fn resolve_schema_field<NSGet: NamespacedGetter<GDS>>(
|
||||
pub struct NDCSubscriptionQuery<'s, 'ir> {
|
||||
pub query_request: ndc::NdcQueryRequest,
|
||||
pub data_connector: &'s metadata_resolve::DataConnectorLink,
|
||||
pub process_response_as: ProcessResponseAs<'ir>,
|
||||
pub process_response_as: ProcessResponseAs,
|
||||
pub selection_set: &'ir normalized_ast::SelectionSet<'s, GDS>,
|
||||
pub polling_interval_ms: u64,
|
||||
}
|
||||
@ -896,7 +896,7 @@ pub async fn resolve_ndc_subscription_execution<'s, 'ir>(
|
||||
// run ndc query, do any joins, and process result
|
||||
async fn resolve_ndc_query_execution<'s, 'ir>(
|
||||
http_context: &HttpContext,
|
||||
ndc_query: NDCQueryExecution<'s, 'ir>,
|
||||
ndc_query: NDCQueryExecution<'s>,
|
||||
selection_set: &normalized_ast::SelectionSet<'ir, GDS>,
|
||||
project_id: Option<&ProjectId>,
|
||||
) -> Result<ProcessedResponse, FieldError> {
|
||||
@ -960,10 +960,10 @@ pub async fn execute_ndc_query<'s, 'ir>(
|
||||
// given results of ndc query, do any joins, and process result
|
||||
async fn process_ndc_query_response<'s, 'ir>(
|
||||
http_context: &HttpContext,
|
||||
remote_join_executions: JoinLocations<'s, 'ir>,
|
||||
remote_join_executions: JoinLocations<'s>,
|
||||
execution_span_attribute: &'static str,
|
||||
selection_set: &'ir normalized_ast::SelectionSet<'s, GDS>,
|
||||
process_response_as: ProcessResponseAs<'ir>,
|
||||
process_response_as: ProcessResponseAs,
|
||||
project_id: Option<&ProjectId>,
|
||||
mut response_rowsets: Vec<ndc_models::RowSet>,
|
||||
) -> Result<ProcessedResponse, FieldError> {
|
||||
@ -984,7 +984,7 @@ async fn process_ndc_query_response<'s, 'ir>(
|
||||
|
||||
async fn resolve_ndc_mutation_execution(
|
||||
http_context: &HttpContext,
|
||||
ndc_mutation_execution: NDCMutationExecution<'_, '_, '_>,
|
||||
ndc_mutation_execution: NDCMutationExecution<'_, '_>,
|
||||
project_id: Option<&ProjectId>,
|
||||
) -> Result<ProcessedResponse, FieldError> {
|
||||
let NDCMutationExecution {
|
||||
@ -1021,7 +1021,7 @@ async fn resolve_ndc_mutation_execution(
|
||||
async fn resolve_optional_ndc_select(
|
||||
http_context: &HttpContext,
|
||||
optional_query: Option<(
|
||||
NDCQueryExecution<'_, '_>,
|
||||
NDCQueryExecution<'_>,
|
||||
&normalized_ast::SelectionSet<'_, GDS>,
|
||||
)>,
|
||||
project_id: Option<&ProjectId>,
|
||||
|
@ -18,10 +18,10 @@ use graphql_ir::{
|
||||
};
|
||||
use open_dds::commands::ProcedureName;
|
||||
|
||||
pub(crate) fn plan_query_node<'s, 'ir>(
|
||||
ir: &'ir CommandInfo<'s>,
|
||||
pub(crate) fn plan_query_node<'s>(
|
||||
ir: &CommandInfo<'s>,
|
||||
relationships: &mut BTreeMap<NdcRelationshipName, relationships::Relationship>,
|
||||
) -> Result<(query::UnresolvedQueryNode<'s>, JoinLocations<'s, 'ir>), error::Error> {
|
||||
) -> Result<(query::UnresolvedQueryNode<'s>, JoinLocations<'s>), error::Error> {
|
||||
let mut ndc_nested_field = None;
|
||||
let mut jl = JoinLocations::new();
|
||||
if let Some(nested_selection) = &ir.selection {
|
||||
@ -51,15 +51,9 @@ pub(crate) fn plan_query_node<'s, 'ir>(
|
||||
Ok((query, jl))
|
||||
}
|
||||
|
||||
pub(crate) fn plan_query_execution<'s, 'ir>(
|
||||
ir: &'ir FunctionBasedCommand<'s>,
|
||||
) -> Result<
|
||||
(
|
||||
query::UnresolvedQueryExecutionPlan<'s>,
|
||||
JoinLocations<'s, 'ir>,
|
||||
),
|
||||
error::Error,
|
||||
> {
|
||||
pub(crate) fn plan_query_execution<'s>(
|
||||
ir: &FunctionBasedCommand<'s>,
|
||||
) -> Result<(query::UnresolvedQueryExecutionPlan<'s>, JoinLocations<'s>), error::Error> {
|
||||
let mut collection_relationships = BTreeMap::new();
|
||||
let mut arguments =
|
||||
arguments::plan_arguments(&ir.command_info.arguments, &mut collection_relationships)?;
|
||||
@ -93,7 +87,7 @@ pub(crate) fn plan_mutation_execution<'s, 'ir>(
|
||||
) -> Result<
|
||||
(
|
||||
mutation::UnresolvedMutationExecutionPlan<'s>,
|
||||
JoinLocations<'s, 'ir>,
|
||||
JoinLocations<'s>,
|
||||
),
|
||||
error::Error,
|
||||
> {
|
||||
|
@ -14,10 +14,10 @@ use graphql_ir::NdcRelationshipName;
|
||||
|
||||
/// Create an NDC `Query` based on the internal IR `ModelSelection` settings
|
||||
// #[async_recursion]
|
||||
pub(crate) fn plan_query_node<'s, 'ir>(
|
||||
ir: &'ir ModelSelection<'s>,
|
||||
pub(crate) fn plan_query_node<'s>(
|
||||
ir: &ModelSelection<'s>,
|
||||
relationships: &mut BTreeMap<NdcRelationshipName, relationships::Relationship>,
|
||||
) -> Result<(query::UnresolvedQueryNode<'s>, JoinLocations<'s, 'ir>), error::Error> {
|
||||
) -> Result<(query::UnresolvedQueryNode<'s>, JoinLocations<'s>), error::Error> {
|
||||
let mut query_fields = None;
|
||||
let mut join_locations = JoinLocations::new();
|
||||
if let Some(selection) = &ir.selection {
|
||||
@ -44,15 +44,9 @@ pub(crate) fn plan_query_node<'s, 'ir>(
|
||||
}
|
||||
|
||||
/// Generate query execution plan from internal IR (`ModelSelection`)
|
||||
pub(crate) fn plan_query_execution<'s, 'ir>(
|
||||
ir: &'ir ModelSelection<'s>,
|
||||
) -> Result<
|
||||
(
|
||||
query::UnresolvedQueryExecutionPlan<'s>,
|
||||
JoinLocations<'s, 'ir>,
|
||||
),
|
||||
error::Error,
|
||||
> {
|
||||
pub(crate) fn plan_query_execution<'s>(
|
||||
ir: &ModelSelection<'s>,
|
||||
) -> Result<(query::UnresolvedQueryExecutionPlan<'s>, JoinLocations<'s>), error::Error> {
|
||||
let mut collection_relationships = BTreeMap::new();
|
||||
let (query, join_locations) = plan_query_node(ir, &mut collection_relationships)?;
|
||||
// collection relationships from order_by clause
|
||||
|
@ -19,11 +19,11 @@ use metadata_resolve::FieldMapping;
|
||||
use open_dds::data_connector::DataConnectorColumnName;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
|
||||
pub(crate) fn plan_nested_selection<'s, 'ir>(
|
||||
nested_selection: &'ir NestedSelection<'s>,
|
||||
pub(crate) fn plan_nested_selection<'s>(
|
||||
nested_selection: &NestedSelection<'s>,
|
||||
ndc_version: NdcVersion,
|
||||
relationships: &mut BTreeMap<NdcRelationshipName, relationships::Relationship>,
|
||||
) -> Result<(field::UnresolvedNestedField<'s>, JoinLocations<'s, 'ir>), error::Error> {
|
||||
) -> Result<(field::UnresolvedNestedField<'s>, JoinLocations<'s>), error::Error> {
|
||||
match nested_selection {
|
||||
NestedSelection::Object(model_selection) => {
|
||||
let (fields, join_locations) =
|
||||
@ -58,7 +58,7 @@ pub(crate) fn plan_selection_set<'s, 'ir>(
|
||||
) -> Result<
|
||||
(
|
||||
IndexMap<NdcFieldAlias, field::UnresolvedField<'s>>,
|
||||
JoinLocations<'s, 'ir>,
|
||||
JoinLocations<'s>,
|
||||
),
|
||||
error::Error,
|
||||
> {
|
||||
@ -224,9 +224,9 @@ pub(crate) fn plan_selection_set<'s, 'ir>(
|
||||
target_data_connector: ir.command_info.data_connector,
|
||||
join_mapping,
|
||||
process_response_as: ProcessResponseAs::CommandResponse {
|
||||
command_name: &ir.command_info.command_name,
|
||||
type_container: &ir.command_info.type_container,
|
||||
response_config: &ir.command_info.data_connector.response_config,
|
||||
command_name: ir.command_info.command_name.clone(),
|
||||
type_container: ir.command_info.type_container.clone(),
|
||||
response_config: ir.command_info.data_connector.response_config.clone(),
|
||||
},
|
||||
remote_join_type: RemoteJoinType::ToCommand,
|
||||
};
|
||||
|
@ -1,3 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use metadata_resolve::http::SerializableHeaderMap;
|
||||
use serde_json as json;
|
||||
use tracing_util::SpanVisibility;
|
||||
@ -78,7 +80,7 @@ where
|
||||
fn process_single_query_response_row<T>(
|
||||
mut row: T,
|
||||
selection_set: &normalized_ast::SelectionSet<'_, GDS>,
|
||||
response_config: &Option<data_connectors::CommandsResponseConfig>,
|
||||
response_config: &Option<Arc<data_connectors::CommandsResponseConfig>>,
|
||||
) -> Result<IndexMap<ast::Alias, json::Value>, error::FieldError>
|
||||
where
|
||||
T: KeyValueResponse,
|
||||
@ -236,7 +238,7 @@ where
|
||||
pub fn process_selection_set_as_list(
|
||||
row_set: ndc_models::RowSet,
|
||||
selection_set: &normalized_ast::SelectionSet<'_, GDS>,
|
||||
response_config: &Option<data_connectors::CommandsResponseConfig>,
|
||||
response_config: &Option<Arc<data_connectors::CommandsResponseConfig>>,
|
||||
) -> Result<Option<Vec<IndexMap<ast::Alias, json::Value>>>, error::FieldError> {
|
||||
let processed_response = row_set
|
||||
.rows
|
||||
@ -252,7 +254,7 @@ pub fn process_selection_set_as_list(
|
||||
pub fn process_selection_set_as_object(
|
||||
row_set: ndc_models::RowSet,
|
||||
selection_set: &normalized_ast::SelectionSet<'_, GDS>,
|
||||
response_config: &Option<data_connectors::CommandsResponseConfig>,
|
||||
response_config: &Option<Arc<data_connectors::CommandsResponseConfig>>,
|
||||
) -> Result<Option<IndexMap<ast::Alias, json::Value>>, error::FieldError> {
|
||||
let processed_response = row_set
|
||||
.rows
|
||||
@ -265,7 +267,7 @@ pub fn process_selection_set_as_object(
|
||||
pub fn process_field_selection_as_list(
|
||||
value: json::Value,
|
||||
selection_set: &normalized_ast::SelectionSet<'_, GDS>,
|
||||
response_config: &Option<data_connectors::CommandsResponseConfig>,
|
||||
response_config: &Option<Arc<data_connectors::CommandsResponseConfig>>,
|
||||
) -> Result<json::Value, error::FieldError> {
|
||||
if selection_set.fields.is_empty() || value.is_null() {
|
||||
// If selection set is empty we return the whole value without further processing.
|
||||
@ -285,7 +287,7 @@ pub fn process_field_selection_as_list(
|
||||
pub fn process_field_selection_as_object(
|
||||
value: json::Value,
|
||||
selection_set: &normalized_ast::SelectionSet<'_, GDS>,
|
||||
response_config: &Option<data_connectors::CommandsResponseConfig>,
|
||||
response_config: &Option<Arc<data_connectors::CommandsResponseConfig>>,
|
||||
) -> Result<json::Value, error::FieldError> {
|
||||
if selection_set.fields.is_empty() || value.is_null() {
|
||||
// If selection set is empty we return the whole value without further processing.
|
||||
@ -304,7 +306,7 @@ pub fn process_command_rows(
|
||||
rows: Option<Vec<IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue, RandomState>>>,
|
||||
selection_set: &normalized_ast::SelectionSet<'_, GDS>,
|
||||
type_container: &TypeContainer<TypeName>,
|
||||
response_config: &Option<data_connectors::CommandsResponseConfig>,
|
||||
response_config: &Option<Arc<data_connectors::CommandsResponseConfig>>,
|
||||
) -> Result<Option<ProcessedResponse>, error::FieldError> {
|
||||
match rows {
|
||||
None => Err(error::NDCUnexpectedError::BadNDCResponse {
|
||||
@ -342,7 +344,7 @@ fn process_command_response_row(
|
||||
mut row: IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue>,
|
||||
selection_set: &normalized_ast::SelectionSet<'_, GDS>,
|
||||
type_container: &TypeContainer<TypeName>,
|
||||
response_config: &Option<data_connectors::CommandsResponseConfig>,
|
||||
response_config: &Option<Arc<data_connectors::CommandsResponseConfig>>,
|
||||
) -> Result<ProcessedResponse, error::FieldError> {
|
||||
let field_value_result = row
|
||||
.swap_remove(FUNCTION_IR_VALUE_COLUMN_NAME)
|
||||
@ -367,7 +369,7 @@ fn process_command_field_value(
|
||||
field_value_result: serde_json::Value,
|
||||
selection_set: &normalized_ast::SelectionSet<'_, GDS>,
|
||||
type_container: &TypeContainer<TypeName>,
|
||||
response_config: &Option<data_connectors::CommandsResponseConfig>,
|
||||
response_config: &Option<Arc<data_connectors::CommandsResponseConfig>>,
|
||||
) -> Result<json::Value, error::FieldError> {
|
||||
// When no selection set for commands, return back the value from the
|
||||
// connector without any processing.
|
||||
@ -564,7 +566,7 @@ pub fn process_command_mutation_response(
|
||||
mutation_result: ndc_models::MutationOperationResults,
|
||||
selection_set: &normalized_ast::SelectionSet<'_, GDS>,
|
||||
type_container: &TypeContainer<TypeName>,
|
||||
response_config: &Option<data_connectors::CommandsResponseConfig>,
|
||||
response_config: &Option<Arc<data_connectors::CommandsResponseConfig>>,
|
||||
) -> Result<ProcessedResponse, error::FieldError> {
|
||||
match mutation_result {
|
||||
ndc_models::MutationOperationResults::Procedure { result } => {
|
||||
@ -641,7 +643,7 @@ pub(crate) fn get_single_rowset(
|
||||
/// to `DataConnectorLink.responseHeaders` config
|
||||
fn extract_response_headers_and_result(
|
||||
result: serde_json::Value,
|
||||
response_config: &Option<data_connectors::CommandsResponseConfig>,
|
||||
response_config: &Option<Arc<data_connectors::CommandsResponseConfig>>,
|
||||
) -> Result<ProcessedResponse, error::FieldError> {
|
||||
if let Some(response_config) = response_config {
|
||||
match result {
|
||||
|
@ -99,17 +99,14 @@ pub(crate) mod types;
|
||||
/// Execute remote joins. As an entry-point it assumes the response is available
|
||||
/// for the top-level query, and executes further remote joins recursively.
|
||||
#[async_recursion]
|
||||
pub(crate) async fn execute_join_locations<'ir>(
|
||||
pub(crate) async fn execute_join_locations(
|
||||
http_context: &HttpContext,
|
||||
execution_span_attribute: &'static str,
|
||||
lhs_response: &mut Vec<ndc_models::RowSet>,
|
||||
lhs_response_type: &ProcessResponseAs,
|
||||
join_locations: &JoinLocations<'async_recursion, 'ir>,
|
||||
join_locations: &JoinLocations<'async_recursion>,
|
||||
project_id: Option<&ProjectId>,
|
||||
) -> Result<(), error::FieldError>
|
||||
where
|
||||
'ir: 'async_recursion,
|
||||
{
|
||||
) -> Result<(), error::FieldError> {
|
||||
let tracer = tracing_util::global_tracer();
|
||||
|
||||
// collect the join column arguments from the LHS response
|
||||
|
@ -23,12 +23,12 @@ use graphql_ir::VariableName;
|
||||
/// An executable join node is a remote join node, it's collected join values
|
||||
/// from a LHS response, and the rest of the join sub-tree
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ExecutableJoinNode<'s, 'ir> {
|
||||
pub(crate) join_node: RemoteJoin<'s, 'ir>,
|
||||
pub(crate) struct ExecutableJoinNode<'s> {
|
||||
pub(crate) join_node: RemoteJoin<'s>,
|
||||
pub(crate) remote_alias: String,
|
||||
pub(crate) location_path: Vec<LocationInfo>,
|
||||
pub(crate) arguments: HashSet<Argument>,
|
||||
pub(crate) sub_tree: JoinLocations<'s, 'ir>,
|
||||
pub(crate) sub_tree: JoinLocations<'s>,
|
||||
}
|
||||
|
||||
/// Indicates a field alias which might have more nesting inside
|
||||
@ -40,12 +40,12 @@ pub(crate) struct LocationInfo {
|
||||
|
||||
/// Given a LHS response and `JoinLocations` tree, get the next executable join
|
||||
/// nodes down the tree. Also, extract the join values from the response.
|
||||
pub(crate) fn collect_next_join_nodes<'s, 'ir>(
|
||||
pub(crate) fn collect_next_join_nodes<'s>(
|
||||
lhs_response: &Vec<ndc_models::RowSet>,
|
||||
lhs_response_type: &ProcessResponseAs,
|
||||
join_locations: &JoinLocations<'s, 'ir>,
|
||||
join_locations: &JoinLocations<'s>,
|
||||
path: &mut [LocationInfo],
|
||||
) -> Result<Vec<ExecutableJoinNode<'s, 'ir>>, error::FieldError> {
|
||||
) -> Result<Vec<ExecutableJoinNode<'s>>, error::FieldError> {
|
||||
let mut arguments_results = Vec::new();
|
||||
|
||||
// if lhs_response is empty, there are no rows to collect arguments from
|
||||
|
@ -14,11 +14,11 @@ use crate::plan::{self, ProcessResponseAs};
|
||||
///
|
||||
/// It also includes other info, like field mapping etc., for the join
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct JoinLocations<'s, 'ir> {
|
||||
pub locations: IndexMap<String, Location<'s, 'ir>>,
|
||||
pub struct JoinLocations<'s> {
|
||||
pub locations: IndexMap<String, Location<'s>>,
|
||||
}
|
||||
|
||||
impl<'s, 'ir> JoinLocations<'s, 'ir> {
|
||||
impl<'s> JoinLocations<'s> {
|
||||
pub fn new() -> Self {
|
||||
JoinLocations::default()
|
||||
}
|
||||
@ -28,7 +28,7 @@ impl<'s, 'ir> JoinLocations<'s, 'ir> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'s, 'ir> Default for JoinLocations<'s, 'ir> {
|
||||
impl<'s> Default for JoinLocations<'s> {
|
||||
fn default() -> Self {
|
||||
JoinLocations {
|
||||
locations: IndexMap::new(),
|
||||
@ -43,9 +43,9 @@ pub enum LocationKind {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum JoinNode<'s, 'ir> {
|
||||
pub enum JoinNode<'s> {
|
||||
Local(LocationKind),
|
||||
Remote(RemoteJoin<'s, 'ir>),
|
||||
Remote(RemoteJoin<'s>),
|
||||
}
|
||||
|
||||
/// Location indicates if the current node/field is a join node.
|
||||
@ -101,14 +101,14 @@ pub enum JoinNode<'s, 'ir> {
|
||||
/// Note: `join_node` and `rest` both cannot be empty; it is an invalid/illegal
|
||||
/// state.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Location<'s, 'ir> {
|
||||
pub join_node: JoinNode<'s, 'ir>,
|
||||
pub rest: JoinLocations<'s, 'ir>,
|
||||
pub struct Location<'s> {
|
||||
pub join_node: JoinNode<'s>,
|
||||
pub rest: JoinLocations<'s>,
|
||||
}
|
||||
|
||||
/// Contains information to be captured for a join node
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct RemoteJoin<'s, 'ir> {
|
||||
pub struct RemoteJoin<'s> {
|
||||
/// target data connector to execute query on
|
||||
pub target_data_connector: &'s metadata_resolve::DataConnectorLink,
|
||||
/// NDC node to execute on a data connector
|
||||
@ -124,7 +124,7 @@ pub struct RemoteJoin<'s, 'ir> {
|
||||
/// field or an argument name.
|
||||
pub join_mapping: HashMap<SourceFieldName, (SourceFieldAlias, TargetField)>,
|
||||
/// Represents how to process the join response.
|
||||
pub process_response_as: ProcessResponseAs<'ir>,
|
||||
pub process_response_as: ProcessResponseAs,
|
||||
/// Represents the type of the remote join
|
||||
pub remote_join_type: RemoteJoinType,
|
||||
}
|
||||
|
@ -290,7 +290,7 @@ pub(crate) async fn explain_query_plan(
|
||||
pub(crate) async fn explain_mutation_plan(
|
||||
expose_internal_errors: execute::ExposeInternalErrors,
|
||||
http_context: &HttpContext,
|
||||
mutation_plan: plan::MutationPlan<'_, '_, '_>,
|
||||
mutation_plan: plan::MutationPlan<'_, '_>,
|
||||
) -> Result<types::Step, execute::RequestError> {
|
||||
let mut root_steps = vec![];
|
||||
|
||||
@ -363,8 +363,8 @@ async fn get_execution_steps<'s>(
|
||||
http_context: &HttpContext,
|
||||
resolve_context: &ResolveFilterExpressionContext,
|
||||
alias: gql::ast::common::Alias,
|
||||
process_response_as: &ProcessResponseAs<'s>,
|
||||
join_locations: JoinLocations<'s, '_>,
|
||||
process_response_as: &ProcessResponseAs,
|
||||
join_locations: JoinLocations<'s>,
|
||||
ndc_request: types::NDCRequest,
|
||||
data_connector: &metadata_resolve::DataConnectorLink,
|
||||
) -> Result<NonEmpty<Box<types::Step>>, execute::RequestError> {
|
||||
@ -425,7 +425,7 @@ async fn get_execution_steps<'s>(
|
||||
#[async_recursion]
|
||||
async fn get_join_steps(
|
||||
expose_internal_errors: execute::ExposeInternalErrors,
|
||||
join_locations: JoinLocations<'async_recursion, 'async_recursion>,
|
||||
join_locations: JoinLocations<'async_recursion>,
|
||||
http_context: &HttpContext,
|
||||
resolve_context: &ResolveFilterExpressionContext,
|
||||
) -> Result<Option<NonEmpty<Box<types::Step>>>, execute::RequestError> {
|
||||
|
@ -14,6 +14,7 @@ use open_dds::commands::ProcedureName;
|
||||
use open_dds::types::DataConnectorArgumentName;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::arguments;
|
||||
use super::selection_set;
|
||||
@ -33,7 +34,7 @@ use metadata_resolve::{Qualified, QualifiedTypeReference};
|
||||
#[derive(Serialize, Debug)]
|
||||
pub struct CommandInfo<'s> {
|
||||
/// The name of the command
|
||||
pub command_name: Qualified<commands::CommandName>,
|
||||
pub command_name: Arc<Qualified<commands::CommandName>>,
|
||||
|
||||
/// The name of the field as published in the schema
|
||||
pub field_name: ast::Name,
|
||||
@ -137,7 +138,7 @@ pub fn generate_command_info<'n, 's>(
|
||||
let selection = wrap_selection_in_response_config(command_source, selection);
|
||||
|
||||
Ok(CommandInfo {
|
||||
command_name: command_name.clone(),
|
||||
command_name: Arc::new(command_name.clone()),
|
||||
field_name: field_call.name.clone(),
|
||||
data_connector: &command_source.data_connector,
|
||||
arguments: command_arguments,
|
||||
|
@ -22,6 +22,7 @@ use open_dds::{
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use strum_macros::EnumIter;
|
||||
|
||||
pub struct DataConnectorsOutput<'a> {
|
||||
@ -245,7 +246,7 @@ pub struct DataConnectorLink {
|
||||
/// HTTP response headers configuration that is forwarded from a NDC
|
||||
/// function/procedure to the client.
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub response_config: Option<CommandsResponseConfig>,
|
||||
pub response_config: Option<Arc<CommandsResponseConfig>>,
|
||||
pub capabilities: DataConnectorCapabilities,
|
||||
}
|
||||
|
||||
@ -320,7 +321,7 @@ impl DataConnectorLink {
|
||||
url,
|
||||
headers,
|
||||
capabilities,
|
||||
response_config: context.response_headers.clone(),
|
||||
response_config: context.response_headers.clone().map(Arc::new),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user