mirror of
https://github.com/hasura/graphql-engine.git
synced 2024-12-12 14:05:16 +03:00
New copy of execution plan in execute
(#1365)
<!-- The PR description should answer 2 important questions: --> ### What Changing all of this code in situ is going to be extremely painful, so instead we're taking the approach of copying small sections, making them typecheck with the new set of types, and integrating them bit by bit. This adds a version of the execution steps in `execute`, using the new types. The next PR to follow will change the OpenDD pipeline to use them, as remote predicates aren't implemented there so we can defer implementing them in the new system. V3_GIT_ORIGIN_REV_ID: 98d1bfa4753edf7d9a9f0b9d1e617a0a760d5862
This commit is contained in:
parent
c8a1678b7a
commit
b547e66ecb
2
v3/Cargo.lock
generated
2
v3/Cargo.lock
generated
@ -4148,6 +4148,8 @@ version = "3.0.0"
|
||||
dependencies = [
|
||||
"derive_more",
|
||||
"indexmap 2.6.0",
|
||||
"json-ext",
|
||||
"lang-graphql",
|
||||
"metadata-resolve",
|
||||
"nonempty",
|
||||
"open-dds",
|
||||
|
98
v3/crates/execute/src/execute.rs
Normal file
98
v3/crates/execute/src/execute.rs
Normal file
@ -0,0 +1,98 @@
|
||||
//! everything from `plan` slowly moves here, binning as much as possible along the way
|
||||
// ideally we'll bin off everything around GraphQL-specific nodes and leave those to the GraphQL
|
||||
// frontend
|
||||
mod ndc_request;
|
||||
mod remote_joins;
|
||||
use super::ndc;
|
||||
use super::process_response::process_response;
|
||||
use super::{HttpContext, ProjectId};
|
||||
use crate::error::FieldError;
|
||||
use crate::process_response::ProcessedResponse;
|
||||
use gql::normalized_ast;
|
||||
use graphql_schema::GDS;
|
||||
use lang_graphql as gql;
|
||||
use plan_types::{JoinLocations, NDCQueryExecution, ProcessResponseAs, QueryExecutionPlan};
|
||||
|
||||
// run ndc query, do any joins, and process result
|
||||
pub async fn resolve_ndc_query_execution<'ir>(
|
||||
http_context: &HttpContext,
|
||||
ndc_query: NDCQueryExecution,
|
||||
selection_set: &normalized_ast::SelectionSet<'ir, GDS>,
|
||||
project_id: Option<&ProjectId>,
|
||||
) -> Result<ProcessedResponse, FieldError> {
|
||||
let NDCQueryExecution {
|
||||
execution_tree,
|
||||
execution_span_attribute,
|
||||
ref field_span_attribute,
|
||||
process_response_as,
|
||||
} = ndc_query;
|
||||
|
||||
let response_rowsets = execute_ndc_query(
|
||||
http_context,
|
||||
execution_tree.query_execution_plan,
|
||||
field_span_attribute,
|
||||
execution_span_attribute,
|
||||
project_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
process_ndc_query_response(
|
||||
http_context,
|
||||
execution_tree.remote_join_executions,
|
||||
execution_span_attribute,
|
||||
selection_set,
|
||||
process_response_as,
|
||||
project_id,
|
||||
response_rowsets,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn execute_ndc_query<'s, 'ir>(
|
||||
http_context: &HttpContext,
|
||||
query_execution_plan: QueryExecutionPlan,
|
||||
field_span_attribute: &str,
|
||||
execution_span_attribute: &'static str,
|
||||
project_id: Option<&ProjectId>,
|
||||
) -> Result<Vec<ndc_models::RowSet>, FieldError> {
|
||||
let data_connector = query_execution_plan.data_connector.clone();
|
||||
let query_request = ndc_request::make_ndc_query_request(query_execution_plan)?;
|
||||
|
||||
let response = ndc::execute_ndc_query(
|
||||
http_context,
|
||||
&query_request,
|
||||
&data_connector,
|
||||
execution_span_attribute,
|
||||
field_span_attribute.to_owned(),
|
||||
project_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(response.as_latest_rowsets())
|
||||
}
|
||||
|
||||
// given results of ndc query, do any joins, and process result
|
||||
async fn process_ndc_query_response<'s, 'ir>(
|
||||
http_context: &HttpContext,
|
||||
remote_join_executions: JoinLocations,
|
||||
execution_span_attribute: &'static str,
|
||||
selection_set: &'ir normalized_ast::SelectionSet<'s, GDS>,
|
||||
process_response_as: ProcessResponseAs,
|
||||
project_id: Option<&ProjectId>,
|
||||
mut response_rowsets: Vec<ndc_models::RowSet>,
|
||||
) -> Result<ProcessedResponse, FieldError> {
|
||||
// TODO: Failures in remote joins should result in partial response
|
||||
// https://github.com/hasura/v3-engine/issues/229
|
||||
remote_joins::execute_join_locations(
|
||||
http_context,
|
||||
execution_span_attribute,
|
||||
&mut response_rowsets,
|
||||
&process_response_as,
|
||||
&remote_join_executions,
|
||||
project_id,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// this ties all of this to GraphQL, let's not do this
|
||||
process_response(selection_set, response_rowsets, &process_response_as)
|
||||
}
|
23
v3/crates/execute/src/execute/ndc_request.rs
Normal file
23
v3/crates/execute/src/execute/ndc_request.rs
Normal file
@ -0,0 +1,23 @@
|
||||
pub mod v01;
|
||||
pub mod v02;
|
||||
|
||||
use crate::{error, ndc};
|
||||
use metadata_resolve::data_connectors::NdcVersion;
|
||||
use plan_types::QueryExecutionPlan;
|
||||
|
||||
pub fn make_ndc_query_request(
|
||||
query_execution_plan: QueryExecutionPlan,
|
||||
) -> Result<ndc::NdcQueryRequest, error::FieldError> {
|
||||
match query_execution_plan
|
||||
.data_connector
|
||||
.capabilities
|
||||
.supported_ndc_version
|
||||
{
|
||||
NdcVersion::V01 => Ok(ndc::NdcQueryRequest::V01(v01::make_query_request(
|
||||
query_execution_plan,
|
||||
)?)),
|
||||
NdcVersion::V02 => Ok(ndc::NdcQueryRequest::V02(v02::make_query_request(
|
||||
query_execution_plan,
|
||||
)?)),
|
||||
}
|
||||
}
|
546
v3/crates/execute/src/execute/ndc_request/v01.rs
Normal file
546
v3/crates/execute/src/execute/ndc_request/v01.rs
Normal file
@ -0,0 +1,546 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use indexmap::IndexMap;
|
||||
use open_dds::data_connector::DataConnectorColumnName;
|
||||
use open_dds::types::DataConnectorArgumentName;
|
||||
|
||||
use crate::error::{FieldError, FieldInternalError};
|
||||
use plan_types::{
|
||||
AggregateFieldSelection, AggregateSelectionSet, Argument, Field, NestedArray, NestedField,
|
||||
NestedObject, OrderByDirection, OrderByElement, OrderByTarget, QueryExecutionPlan,
|
||||
QueryNodeNew, Relationship, RelationshipArgument, ResolvedFilterExpression, VariableName,
|
||||
};
|
||||
|
||||
pub fn make_query_request(
|
||||
query_execution_plan: QueryExecutionPlan,
|
||||
) -> Result<ndc_models_v01::QueryRequest, FieldError> {
|
||||
let query_request = ndc_models_v01::QueryRequest {
|
||||
collection: ndc_models_v01::CollectionName::from(query_execution_plan.collection.as_str()),
|
||||
query: make_query(query_execution_plan.query_node)?,
|
||||
arguments: make_arguments(query_execution_plan.arguments)?,
|
||||
collection_relationships: make_collection_relationships(
|
||||
query_execution_plan.collection_relationships,
|
||||
),
|
||||
variables: make_variables(query_execution_plan.variables),
|
||||
};
|
||||
Ok(query_request)
|
||||
}
|
||||
|
||||
fn make_variables(
|
||||
variables: Option<Vec<BTreeMap<VariableName, serde_json::Value>>>,
|
||||
) -> Option<Vec<BTreeMap<ndc_models_v01::VariableName, serde_json::Value>>> {
|
||||
variables.map(|variables| {
|
||||
variables
|
||||
.into_iter()
|
||||
.map(|variables_map| {
|
||||
variables_map
|
||||
.into_iter()
|
||||
.map(|(name, value)| {
|
||||
(ndc_models_v01::VariableName::from(name.0.as_str()), value)
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
}
|
||||
|
||||
fn make_query(query_node: QueryNodeNew) -> Result<ndc_models_v01::Query, FieldError> {
|
||||
let ndc_predicate = query_node.predicate.map(make_expression).transpose()?;
|
||||
|
||||
let ndc_fields = query_node
|
||||
.fields
|
||||
.map(|fields| {
|
||||
fields
|
||||
.fields
|
||||
.into_iter()
|
||||
.map(|(name, field)| {
|
||||
Ok((
|
||||
ndc_models_v01::FieldName::from(name.as_str()),
|
||||
make_field(field)?,
|
||||
))
|
||||
})
|
||||
.collect::<Result<IndexMap<_, _>, FieldError>>()
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
Ok(ndc_models_v01::Query {
|
||||
limit: query_node.limit,
|
||||
offset: query_node.offset,
|
||||
order_by: query_node.order_by.map(make_order_by),
|
||||
predicate: ndc_predicate,
|
||||
aggregates: query_node.aggregates.map(make_aggregates),
|
||||
fields: ndc_fields,
|
||||
})
|
||||
}
|
||||
|
||||
fn make_arguments(
|
||||
arguments: BTreeMap<DataConnectorArgumentName, Argument>,
|
||||
) -> Result<BTreeMap<ndc_models_v01::ArgumentName, ndc_models_v01::Argument>, FieldError> {
|
||||
arguments
|
||||
.into_iter()
|
||||
.map(|(name, argument)| {
|
||||
Ok((
|
||||
ndc_models_v01::ArgumentName::new(name.into_inner()),
|
||||
make_argument(argument)?,
|
||||
))
|
||||
})
|
||||
.collect::<Result<BTreeMap<_, _>, _>>()
|
||||
}
|
||||
|
||||
fn make_argument(argument: Argument) -> Result<ndc_models_v01::Argument, FieldError> {
|
||||
match argument {
|
||||
Argument::Literal { value } => Ok(ndc_models_v01::Argument::Literal { value }),
|
||||
Argument::Variable { name } => Ok(ndc_models_v01::Argument::Variable {
|
||||
name: ndc_models_v01::VariableName::from(name.0.as_str()),
|
||||
}),
|
||||
Argument::BooleanExpression { predicate } => {
|
||||
let ndc_expression = make_expression(predicate)?;
|
||||
Ok(ndc_models_v01::Argument::Literal {
|
||||
value: serde_json::to_value(ndc_expression).map_err(|e| {
|
||||
FieldError::InternalError(FieldInternalError::ExpressionSerializationError(e))
|
||||
})?,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn make_relationship_arguments_from_arguments(
|
||||
arguments: BTreeMap<DataConnectorArgumentName, Argument>,
|
||||
) -> Result<BTreeMap<ndc_models_v01::ArgumentName, ndc_models_v01::RelationshipArgument>, FieldError>
|
||||
{
|
||||
arguments
|
||||
.into_iter()
|
||||
.map(|(name, argument)| {
|
||||
Ok((
|
||||
ndc_models_v01::ArgumentName::new(name.into_inner()),
|
||||
make_relationship_argument_from_argument(argument)?,
|
||||
))
|
||||
})
|
||||
.collect::<Result<BTreeMap<_, _>, _>>()
|
||||
}
|
||||
|
||||
fn make_relationship_argument_from_argument(
|
||||
argument: Argument,
|
||||
) -> Result<ndc_models_v01::RelationshipArgument, FieldError> {
|
||||
match argument {
|
||||
Argument::Literal { value } => Ok(ndc_models_v01::RelationshipArgument::Literal { value }),
|
||||
Argument::Variable { name } => Ok(ndc_models_v01::RelationshipArgument::Variable {
|
||||
name: ndc_models_v01::VariableName::from(name.0.as_str()),
|
||||
}),
|
||||
Argument::BooleanExpression { predicate } => {
|
||||
let ndc_expression = make_expression(predicate)?;
|
||||
Ok(ndc_models_v01::RelationshipArgument::Literal {
|
||||
value: serde_json::to_value(ndc_expression).map_err(|e| {
|
||||
FieldError::InternalError(FieldInternalError::ExpressionSerializationError(e))
|
||||
})?,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn make_relationship_arguments(
|
||||
arguments: BTreeMap<DataConnectorArgumentName, RelationshipArgument>,
|
||||
) -> BTreeMap<ndc_models_v01::ArgumentName, ndc_models_v01::RelationshipArgument> {
|
||||
arguments
|
||||
.into_iter()
|
||||
.map(|(name, argument)| {
|
||||
(
|
||||
ndc_models_v01::ArgumentName::new(name.into_inner()),
|
||||
make_relationship_argument(argument),
|
||||
)
|
||||
})
|
||||
.collect::<BTreeMap<_, _>>()
|
||||
}
|
||||
|
||||
fn make_relationship_argument(
|
||||
argument: RelationshipArgument,
|
||||
) -> ndc_models_v01::RelationshipArgument {
|
||||
match argument {
|
||||
RelationshipArgument::Column { name } => ndc_models_v01::RelationshipArgument::Column {
|
||||
name: ndc_models_v01::FieldName::new(name.into_inner()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn make_expression(
|
||||
predicate: ResolvedFilterExpression,
|
||||
) -> Result<ndc_models_v01::Expression, FieldError> {
|
||||
match predicate {
|
||||
ResolvedFilterExpression::And { expressions } => {
|
||||
let mut ndc_expressions = Vec::new();
|
||||
for expression in expressions {
|
||||
let ndc_expression = make_expression(expression)?;
|
||||
ndc_expressions.push(ndc_expression);
|
||||
}
|
||||
Ok(ndc_models_v01::Expression::And {
|
||||
expressions: ndc_expressions,
|
||||
})
|
||||
}
|
||||
ResolvedFilterExpression::Or { expressions } => {
|
||||
let mut ndc_expressions = Vec::new();
|
||||
for expression in expressions {
|
||||
let ndc_expression = make_expression(expression)?;
|
||||
ndc_expressions.push(ndc_expression);
|
||||
}
|
||||
Ok(ndc_models_v01::Expression::Or {
|
||||
expressions: ndc_expressions,
|
||||
})
|
||||
}
|
||||
ResolvedFilterExpression::Not { expression } => {
|
||||
let ndc_expression = make_expression(*expression)?;
|
||||
Ok(ndc_models_v01::Expression::Not {
|
||||
expression: Box::new(ndc_expression),
|
||||
})
|
||||
}
|
||||
ResolvedFilterExpression::LocalFieldComparison(
|
||||
plan_types::LocalFieldComparison::BinaryComparison {
|
||||
column,
|
||||
operator,
|
||||
value,
|
||||
},
|
||||
) => Ok(ndc_models_v01::Expression::BinaryComparisonOperator {
|
||||
column: make_comparison_target(column),
|
||||
operator: ndc_models_v01::ComparisonOperatorName::new(operator.into_inner()),
|
||||
value: make_comparison_value(value),
|
||||
}),
|
||||
ResolvedFilterExpression::LocalNestedArray {
|
||||
column,
|
||||
field_path,
|
||||
predicate,
|
||||
} => {
|
||||
let ndc_expression = make_expression(*predicate)?;
|
||||
let field_name = ndc_models_v01::FieldName::new(column.into_inner());
|
||||
|
||||
Ok(ndc_models_v01::Expression::Exists {
|
||||
in_collection: ndc_models_v01::ExistsInCollection::NestedCollection {
|
||||
column_name: field_name,
|
||||
field_path: field_path
|
||||
.into_iter()
|
||||
.map(|f| ndc_models_v01::FieldName::new(f.into_inner()))
|
||||
.collect(),
|
||||
arguments: BTreeMap::new(),
|
||||
},
|
||||
predicate: Some(Box::new(ndc_expression)),
|
||||
})
|
||||
}
|
||||
ResolvedFilterExpression::LocalFieldComparison(
|
||||
plan_types::LocalFieldComparison::UnaryComparison { column, operator },
|
||||
) => Ok(ndc_models_v01::Expression::UnaryComparisonOperator {
|
||||
column: make_comparison_target(column),
|
||||
operator: match operator {
|
||||
metadata_resolve::UnaryComparisonOperator::IsNull => {
|
||||
ndc_models_v01::UnaryComparisonOperator::IsNull
|
||||
}
|
||||
},
|
||||
}),
|
||||
ResolvedFilterExpression::LocalRelationshipComparison {
|
||||
relationship,
|
||||
predicate,
|
||||
} => {
|
||||
let ndc_expression = make_expression(*predicate)?;
|
||||
Ok(ndc_models_v01::Expression::Exists {
|
||||
in_collection: ndc_models_v01::ExistsInCollection::Related {
|
||||
relationship: ndc_models_v01::RelationshipName::from(relationship.as_str()),
|
||||
arguments: BTreeMap::new(),
|
||||
},
|
||||
predicate: Some(Box::new(ndc_expression)),
|
||||
})
|
||||
}
|
||||
// we are generating NDC request for one connector, we can ignore anything remote
|
||||
ResolvedFilterExpression::RemoteRelationshipComparison {
|
||||
remote_predicate_id: _,
|
||||
} => Ok(ndc_models_v01::Expression::And {
|
||||
expressions: vec![],
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_comparison_target(
|
||||
comparison_target: plan_types::ComparisonTarget,
|
||||
) -> ndc_models_v01::ComparisonTarget {
|
||||
match comparison_target {
|
||||
plan_types::ComparisonTarget::Column { name, field_path } => {
|
||||
ndc_models_v01::ComparisonTarget::Column {
|
||||
name: ndc_models_v01::FieldName::new(name.into_inner()),
|
||||
field_path: if field_path.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
field_path
|
||||
.into_iter()
|
||||
.map(|f| ndc_models_v01::FieldName::new(f.into_inner()))
|
||||
.collect(),
|
||||
)
|
||||
},
|
||||
path: vec![],
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn make_comparison_value(
|
||||
comparison_value: plan_types::ComparisonValue,
|
||||
) -> ndc_models_v01::ComparisonValue {
|
||||
match comparison_value {
|
||||
plan_types::ComparisonValue::Scalar { value } => {
|
||||
ndc_models_v01::ComparisonValue::Scalar { value }
|
||||
}
|
||||
plan_types::ComparisonValue::Variable { name } => {
|
||||
ndc_models_v01::ComparisonValue::Variable {
|
||||
name: ndc_models_v01::VariableName::from(name.0.as_str()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn make_field(field: Field) -> Result<ndc_models_v01::Field, FieldError> {
|
||||
match field {
|
||||
Field::Column {
|
||||
column,
|
||||
fields,
|
||||
arguments,
|
||||
} => {
|
||||
let nested_fields = fields.map(make_nested_field).transpose()?;
|
||||
|
||||
Ok(ndc_models_v01::Field::Column {
|
||||
column: ndc_models_v01::FieldName::new(column.into_inner()),
|
||||
fields: nested_fields,
|
||||
arguments: make_arguments(arguments)?,
|
||||
})
|
||||
}
|
||||
Field::Relationship {
|
||||
query_node,
|
||||
relationship,
|
||||
arguments,
|
||||
} => {
|
||||
let query = make_query(*query_node)?;
|
||||
Ok(ndc_models_v01::Field::Relationship {
|
||||
query: Box::new(query),
|
||||
relationship: ndc_models_v01::RelationshipName::from(relationship.as_str()),
|
||||
arguments: make_relationship_arguments_from_arguments(arguments)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn make_nested_field(nested_field: NestedField) -> Result<ndc_models_v01::NestedField, FieldError> {
|
||||
match nested_field {
|
||||
NestedField::Object(nested_object) => Ok(ndc_models_v01::NestedField::Object(
|
||||
make_nested_object(nested_object)?,
|
||||
)),
|
||||
NestedField::Array(nested_array) => Ok(ndc_models_v01::NestedField::Array(
|
||||
make_nested_array(nested_array)?,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_nested_object(
|
||||
nested_field: NestedObject,
|
||||
) -> Result<ndc_models_v01::NestedObject, FieldError> {
|
||||
let fields = nested_field
|
||||
.fields
|
||||
.into_iter()
|
||||
.map(|(name, field)| {
|
||||
Ok((
|
||||
ndc_models_v01::FieldName::from(name.as_str()),
|
||||
make_field(field)?,
|
||||
))
|
||||
})
|
||||
.collect::<Result<IndexMap<_, _>, FieldError>>()?;
|
||||
Ok(ndc_models_v01::NestedObject { fields })
|
||||
}
|
||||
|
||||
fn make_nested_array(nested_field: NestedArray) -> Result<ndc_models_v01::NestedArray, FieldError> {
|
||||
let fields = make_nested_field(*nested_field.fields)?;
|
||||
Ok(ndc_models_v01::NestedArray {
|
||||
fields: Box::new(fields),
|
||||
})
|
||||
}
|
||||
|
||||
fn make_collection_relationships(
|
||||
collection_relationships: BTreeMap<plan_types::NdcRelationshipName, Relationship>,
|
||||
) -> BTreeMap<ndc_models_v01::RelationshipName, ndc_models_v01::Relationship> {
|
||||
collection_relationships
|
||||
.into_iter()
|
||||
.map(|(name, relationship)| {
|
||||
(
|
||||
ndc_models_v01::RelationshipName::from(name.as_str()),
|
||||
make_relationship(relationship),
|
||||
)
|
||||
})
|
||||
.collect::<BTreeMap<_, _>>()
|
||||
}
|
||||
|
||||
fn make_relationship(relationship: Relationship) -> ndc_models_v01::Relationship {
|
||||
ndc_models_v01::Relationship {
|
||||
column_mapping: relationship
|
||||
.column_mapping
|
||||
.into_iter()
|
||||
.map(|(s, t)| {
|
||||
(
|
||||
ndc_models_v01::FieldName::new(s.into_inner()),
|
||||
ndc_models_v01::FieldName::new(t.into_inner()),
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
relationship_type: match relationship.relationship_type {
|
||||
open_dds::relationships::RelationshipType::Object => {
|
||||
ndc_models_v01::RelationshipType::Object
|
||||
}
|
||||
open_dds::relationships::RelationshipType::Array => {
|
||||
ndc_models_v01::RelationshipType::Array
|
||||
}
|
||||
},
|
||||
target_collection: ndc_models_v01::CollectionName::new(
|
||||
relationship.target_collection.into_inner(),
|
||||
),
|
||||
arguments: make_relationship_arguments(relationship.arguments),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_order_by(order_by_elements: Vec<OrderByElement>) -> ndc_models_v01::OrderBy {
|
||||
ndc_models_v01::OrderBy {
|
||||
elements: order_by_elements
|
||||
.into_iter()
|
||||
.map(|element| ndc_models_v01::OrderByElement {
|
||||
order_direction: match element.order_direction {
|
||||
OrderByDirection::Asc => ndc_models_v01::OrderDirection::Asc,
|
||||
OrderByDirection::Desc => ndc_models_v01::OrderDirection::Desc,
|
||||
},
|
||||
target: make_order_by_target(element.target),
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_order_by_target(target: OrderByTarget) -> ndc_models_v01::OrderByTarget {
|
||||
match target {
|
||||
OrderByTarget::Column {
|
||||
name,
|
||||
field_path,
|
||||
relationship_path,
|
||||
} => {
|
||||
let mut order_by_element_path = Vec::new();
|
||||
// When using a nested relationship column, you'll have to provide all the relationships(paths)
|
||||
// NDC has to traverse to access the column. The ordering of that paths is important.
|
||||
// The order decides how to access the column.
|
||||
//
|
||||
// For example, if you have a model called `User` with a relationship column called `Posts`
|
||||
// which has a relationship column called `Comments` which has a non-relationship column
|
||||
// called `text`, you'll have to provide the following paths to access the `text` column:
|
||||
// ["UserPosts", "PostsComments"]
|
||||
for path in relationship_path {
|
||||
order_by_element_path.push(ndc_models_v01::PathElement {
|
||||
relationship: ndc_models_v01::RelationshipName::from(path.0.as_str()),
|
||||
arguments: BTreeMap::new(),
|
||||
// 'AND' predicate indicates that the column can be accessed
|
||||
// by joining all the relationships paths provided
|
||||
predicate: Some(Box::new(ndc_models_v01::Expression::And {
|
||||
// TODO(naveen): Add expressions here, when we support sorting with predicates.
|
||||
//
|
||||
// There are two types of sorting:
|
||||
// 1. Sorting without predicates
|
||||
// 2. Sorting with predicates
|
||||
//
|
||||
// In the 1st sort, we sort all the elements of the results either in ascending
|
||||
// or descing order based on the order_by argument.
|
||||
//
|
||||
// In the 2nd sort, we want fetch the entire result but only sort a subset
|
||||
// of result and put those sorted set either at the beginning or at the end of the
|
||||
// result.
|
||||
//
|
||||
// Currently we only support the 1st type of sort. Hence we don't have any expressions/predicate.
|
||||
expressions: Vec::new(),
|
||||
})),
|
||||
});
|
||||
}
|
||||
|
||||
ndc_models_v01::OrderByTarget::Column {
|
||||
name: ndc_models_v01::FieldName::new(name.into_inner()),
|
||||
path: order_by_element_path,
|
||||
field_path: field_path.map(|field_path| {
|
||||
field_path
|
||||
.iter()
|
||||
.map(|name| ndc_models_v01::FieldName::from(name.as_str()))
|
||||
.collect()
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Translates the internal IR 'AggregateSelectionSet' into an NDC query aggregates selection
|
||||
fn make_aggregates(
|
||||
aggregate_selection_set: AggregateSelectionSet,
|
||||
) -> IndexMap<ndc_models_v01::FieldName, ndc_models_v01::Aggregate> {
|
||||
aggregate_selection_set
|
||||
.fields
|
||||
.into_iter()
|
||||
.map(|(field_name, aggregate_selection)| {
|
||||
let aggregate = match aggregate_selection {
|
||||
AggregateFieldSelection::Count { column_path, .. } => {
|
||||
make_count_aggregate(column_path, false)
|
||||
}
|
||||
AggregateFieldSelection::CountDistinct { column_path, .. } => {
|
||||
make_count_aggregate(column_path, true)
|
||||
}
|
||||
AggregateFieldSelection::AggregationFunction {
|
||||
function_name,
|
||||
column_path,
|
||||
} => {
|
||||
let nonempty::NonEmpty {
|
||||
head: column,
|
||||
tail: field_path,
|
||||
} = column_path;
|
||||
let nested_field_path = field_path
|
||||
.into_iter()
|
||||
.map(|column_name| {
|
||||
ndc_models_v01::FieldName::from(column_name.into_inner())
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
ndc_models_v01::Aggregate::SingleColumn {
|
||||
column: ndc_models_v01::FieldName::from(column.into_inner()),
|
||||
field_path: if nested_field_path.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(nested_field_path)
|
||||
},
|
||||
function: ndc_models_v01::AggregateFunctionName::from(
|
||||
function_name.as_str(),
|
||||
),
|
||||
}
|
||||
}
|
||||
};
|
||||
(
|
||||
ndc_models_v01::FieldName::from(field_name.as_str()),
|
||||
aggregate,
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Creates the appropriate NDC count aggregation based on whether we're selecting
|
||||
/// a column (nested or otherwise) or not
|
||||
fn make_count_aggregate(
|
||||
column_path: Vec<DataConnectorColumnName>,
|
||||
distinct: bool,
|
||||
) -> ndc_models_v01::Aggregate {
|
||||
let mut column_path_iter = column_path.into_iter();
|
||||
if let Some(first_path_element) = column_path_iter.next() {
|
||||
let remaining_path = column_path_iter
|
||||
.map(|column_name| ndc_models_v01::FieldName::from(column_name.into_inner()))
|
||||
.collect::<Vec<_>>();
|
||||
let nested_field_path = if remaining_path.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(remaining_path)
|
||||
};
|
||||
ndc_models_v01::Aggregate::ColumnCount {
|
||||
column: ndc_models_v01::FieldName::from(first_path_element.into_inner()),
|
||||
field_path: nested_field_path,
|
||||
distinct,
|
||||
}
|
||||
} else {
|
||||
ndc_models_v01::Aggregate::StarCount {}
|
||||
}
|
||||
}
|
559
v3/crates/execute/src/execute/ndc_request/v02.rs
Normal file
559
v3/crates/execute/src/execute/ndc_request/v02.rs
Normal file
@ -0,0 +1,559 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use indexmap::IndexMap;
|
||||
use ndc_models as ndc_models_v02;
|
||||
use open_dds::data_connector::DataConnectorColumnName;
|
||||
use open_dds::types::DataConnectorArgumentName;
|
||||
|
||||
use crate::error::{FieldError, FieldInternalError};
|
||||
use plan_types::{
|
||||
AggregateFieldSelection, AggregateSelectionSet, Argument, Field, NestedArray, NestedField,
|
||||
NestedObject, OrderByDirection, OrderByElement, OrderByTarget, QueryExecutionPlan,
|
||||
QueryNodeNew, Relationship, RelationshipArgument, ResolvedFilterExpression, VariableName,
|
||||
};
|
||||
|
||||
pub fn make_query_request(
|
||||
query_execution_plan: QueryExecutionPlan,
|
||||
) -> Result<ndc_models_v02::QueryRequest, FieldError> {
|
||||
let query_request = ndc_models_v02::QueryRequest {
|
||||
collection: ndc_models_v02::CollectionName::from(query_execution_plan.collection.as_str()),
|
||||
query: make_query(query_execution_plan.query_node)?,
|
||||
arguments: make_arguments(query_execution_plan.arguments)?,
|
||||
collection_relationships: make_collection_relationships(
|
||||
query_execution_plan.collection_relationships,
|
||||
),
|
||||
variables: make_variables(query_execution_plan.variables),
|
||||
};
|
||||
Ok(query_request)
|
||||
}
|
||||
|
||||
fn make_variables(
|
||||
variables: Option<Vec<BTreeMap<VariableName, serde_json::Value>>>,
|
||||
) -> Option<Vec<BTreeMap<ndc_models_v02::VariableName, serde_json::Value>>> {
|
||||
variables.map(|variables| {
|
||||
variables
|
||||
.into_iter()
|
||||
.map(|variables_map| {
|
||||
variables_map
|
||||
.into_iter()
|
||||
.map(|(name, value)| {
|
||||
(ndc_models_v02::VariableName::from(name.0.as_str()), value)
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
}
|
||||
|
||||
fn make_query(query_node: QueryNodeNew) -> Result<ndc_models_v02::Query, FieldError> {
|
||||
let ndc_predicate = query_node.predicate.map(make_expression).transpose()?;
|
||||
|
||||
let ndc_fields = query_node
|
||||
.fields
|
||||
.map(|fields| {
|
||||
fields
|
||||
.fields
|
||||
.into_iter()
|
||||
.map(|(name, field)| {
|
||||
Ok((
|
||||
ndc_models_v02::FieldName::from(name.as_str()),
|
||||
make_field(field)?,
|
||||
))
|
||||
})
|
||||
.collect::<Result<IndexMap<_, _>, FieldError>>()
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
Ok(ndc_models_v02::Query {
|
||||
limit: query_node.limit,
|
||||
offset: query_node.offset,
|
||||
order_by: query_node.order_by.map(make_order_by),
|
||||
predicate: ndc_predicate,
|
||||
aggregates: query_node.aggregates.map(make_aggregates),
|
||||
fields: ndc_fields,
|
||||
groups: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn make_arguments(
|
||||
arguments: BTreeMap<DataConnectorArgumentName, Argument>,
|
||||
) -> Result<BTreeMap<ndc_models_v02::ArgumentName, ndc_models_v02::Argument>, FieldError> {
|
||||
arguments
|
||||
.into_iter()
|
||||
.map(|(name, argument)| {
|
||||
Ok((
|
||||
ndc_models_v02::ArgumentName::new(name.into_inner()),
|
||||
make_argument(argument)?,
|
||||
))
|
||||
})
|
||||
.collect::<Result<BTreeMap<_, _>, _>>()
|
||||
}
|
||||
|
||||
fn make_argument(argument: Argument) -> Result<ndc_models_v02::Argument, FieldError> {
|
||||
match argument {
|
||||
Argument::Literal { value } => Ok(ndc_models_v02::Argument::Literal { value }),
|
||||
Argument::Variable { name } => Ok(ndc_models_v02::Argument::Variable {
|
||||
name: ndc_models_v02::VariableName::from(name.0.as_str()),
|
||||
}),
|
||||
Argument::BooleanExpression { predicate } => {
|
||||
let ndc_expression = make_expression(predicate)?;
|
||||
Ok(ndc_models_v02::Argument::Literal {
|
||||
value: serde_json::to_value(ndc_expression).map_err(|e| {
|
||||
FieldError::InternalError(FieldInternalError::ExpressionSerializationError(e))
|
||||
})?,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn make_relationship_arguments_from_arguments(
|
||||
arguments: BTreeMap<DataConnectorArgumentName, Argument>,
|
||||
) -> Result<BTreeMap<ndc_models_v02::ArgumentName, ndc_models_v02::RelationshipArgument>, FieldError>
|
||||
{
|
||||
arguments
|
||||
.into_iter()
|
||||
.map(|(name, argument)| {
|
||||
Ok((
|
||||
ndc_models_v02::ArgumentName::new(name.into_inner()),
|
||||
make_relationship_argument_from_argument(argument)?,
|
||||
))
|
||||
})
|
||||
.collect::<Result<BTreeMap<_, _>, _>>()
|
||||
}
|
||||
|
||||
fn make_relationship_argument_from_argument(
|
||||
argument: Argument,
|
||||
) -> Result<ndc_models_v02::RelationshipArgument, FieldError> {
|
||||
match argument {
|
||||
Argument::Literal { value } => Ok(ndc_models_v02::RelationshipArgument::Literal { value }),
|
||||
Argument::Variable { name } => Ok(ndc_models_v02::RelationshipArgument::Variable {
|
||||
name: ndc_models_v02::VariableName::from(name.0.as_str()),
|
||||
}),
|
||||
Argument::BooleanExpression { predicate } => {
|
||||
let ndc_expression = make_expression(predicate)?;
|
||||
Ok(ndc_models_v02::RelationshipArgument::Literal {
|
||||
value: serde_json::to_value(ndc_expression).map_err(|e| {
|
||||
FieldError::InternalError(FieldInternalError::ExpressionSerializationError(e))
|
||||
})?,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn make_relationship_arguments(
|
||||
arguments: BTreeMap<DataConnectorArgumentName, RelationshipArgument>,
|
||||
) -> BTreeMap<ndc_models_v02::ArgumentName, ndc_models_v02::RelationshipArgument> {
|
||||
arguments
|
||||
.into_iter()
|
||||
.map(|(name, argument)| {
|
||||
(
|
||||
ndc_models_v02::ArgumentName::new(name.into_inner()),
|
||||
make_relationship_argument(argument),
|
||||
)
|
||||
})
|
||||
.collect::<BTreeMap<_, _>>()
|
||||
}
|
||||
|
||||
fn make_relationship_argument(
|
||||
argument: RelationshipArgument,
|
||||
) -> ndc_models_v02::RelationshipArgument {
|
||||
match argument {
|
||||
RelationshipArgument::Column { name } => ndc_models_v02::RelationshipArgument::Column {
|
||||
name: ndc_models_v02::FieldName::new(name.into_inner()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn make_expression(
|
||||
predicate: ResolvedFilterExpression,
|
||||
) -> Result<ndc_models_v02::Expression, FieldError> {
|
||||
match predicate {
|
||||
ResolvedFilterExpression::And { expressions } => {
|
||||
let mut ndc_expressions = Vec::new();
|
||||
|
||||
for expression in expressions {
|
||||
let ndc_expression = make_expression(expression)?;
|
||||
ndc_expressions.push(ndc_expression);
|
||||
}
|
||||
|
||||
Ok(ndc_models_v02::Expression::And {
|
||||
expressions: ndc_expressions,
|
||||
})
|
||||
}
|
||||
ResolvedFilterExpression::Or { expressions } => {
|
||||
let mut ndc_expressions = Vec::new();
|
||||
|
||||
for expression in expressions {
|
||||
let ndc_expression = make_expression(expression)?;
|
||||
ndc_expressions.push(ndc_expression);
|
||||
}
|
||||
|
||||
Ok(ndc_models_v02::Expression::Or {
|
||||
expressions: ndc_expressions,
|
||||
})
|
||||
}
|
||||
ResolvedFilterExpression::Not { expression } => {
|
||||
let ndc_expression = make_expression(*expression)?;
|
||||
|
||||
Ok(ndc_models_v02::Expression::Not {
|
||||
expression: Box::new(ndc_expression),
|
||||
})
|
||||
}
|
||||
ResolvedFilterExpression::LocalFieldComparison(
|
||||
plan_types::LocalFieldComparison::BinaryComparison {
|
||||
column,
|
||||
operator,
|
||||
value,
|
||||
},
|
||||
) => Ok(ndc_models_v02::Expression::BinaryComparisonOperator {
|
||||
column: make_comparison_target(column),
|
||||
operator: ndc_models_v02::ComparisonOperatorName::new(operator.into_inner()),
|
||||
value: make_comparison_value(value),
|
||||
}),
|
||||
|
||||
ResolvedFilterExpression::LocalNestedArray {
|
||||
column,
|
||||
field_path,
|
||||
predicate,
|
||||
} => {
|
||||
let ndc_expression = make_expression(*predicate)?;
|
||||
let field_name = ndc_models_v02::FieldName::new(column.into_inner());
|
||||
|
||||
Ok(ndc_models_v02::Expression::Exists {
|
||||
in_collection: ndc_models_v02::ExistsInCollection::NestedCollection {
|
||||
column_name: field_name,
|
||||
field_path: field_path
|
||||
.into_iter()
|
||||
.map(|f| ndc_models_v02::FieldName::new(f.into_inner()))
|
||||
.collect(),
|
||||
arguments: BTreeMap::new(),
|
||||
},
|
||||
predicate: Some(Box::new(ndc_expression)),
|
||||
})
|
||||
}
|
||||
ResolvedFilterExpression::LocalFieldComparison(
|
||||
plan_types::LocalFieldComparison::UnaryComparison { column, operator },
|
||||
) => Ok(ndc_models_v02::Expression::UnaryComparisonOperator {
|
||||
column: make_comparison_target(column),
|
||||
operator: match operator {
|
||||
metadata_resolve::UnaryComparisonOperator::IsNull => {
|
||||
ndc_models_v02::UnaryComparisonOperator::IsNull
|
||||
}
|
||||
},
|
||||
}),
|
||||
ResolvedFilterExpression::LocalRelationshipComparison {
|
||||
relationship,
|
||||
predicate,
|
||||
} => {
|
||||
let ndc_expression = make_expression(*predicate)?;
|
||||
Ok(ndc_models_v02::Expression::Exists {
|
||||
in_collection: ndc_models_v02::ExistsInCollection::Related {
|
||||
field_path: None,
|
||||
relationship: ndc_models_v02::RelationshipName::from(relationship.as_str()),
|
||||
arguments: BTreeMap::new(),
|
||||
},
|
||||
predicate: Some(Box::new(ndc_expression)),
|
||||
})
|
||||
}
|
||||
// we are generating NDC request for one connector, we can ignore anything remote
|
||||
ResolvedFilterExpression::RemoteRelationshipComparison {
|
||||
remote_predicate_id: _,
|
||||
} => Ok(ndc_models_v02::Expression::And {
|
||||
expressions: vec![],
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_comparison_target(
|
||||
comparison_target: plan_types::ComparisonTarget,
|
||||
) -> ndc_models_v02::ComparisonTarget {
|
||||
match comparison_target {
|
||||
plan_types::ComparisonTarget::Column { name, field_path } => {
|
||||
ndc_models_v02::ComparisonTarget::Column {
|
||||
name: ndc_models_v02::FieldName::new(name.into_inner()),
|
||||
arguments: BTreeMap::new(),
|
||||
field_path: if field_path.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(
|
||||
field_path
|
||||
.into_iter()
|
||||
.map(|f| ndc_models_v02::FieldName::new(f.into_inner()))
|
||||
.collect(),
|
||||
)
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn make_comparison_value(
|
||||
comparison_value: plan_types::ComparisonValue,
|
||||
) -> ndc_models_v02::ComparisonValue {
|
||||
match comparison_value {
|
||||
plan_types::ComparisonValue::Scalar { value } => {
|
||||
ndc_models_v02::ComparisonValue::Scalar { value }
|
||||
}
|
||||
plan_types::ComparisonValue::Variable { name } => {
|
||||
ndc_models_v02::ComparisonValue::Variable {
|
||||
name: ndc_models_v02::VariableName::from(name.0.as_str()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn make_field(field: Field) -> Result<ndc_models_v02::Field, FieldError> {
|
||||
match field {
|
||||
Field::Column {
|
||||
column,
|
||||
fields,
|
||||
arguments,
|
||||
} => {
|
||||
let nested_fields = fields.map(make_nested_field).transpose()?;
|
||||
|
||||
Ok(ndc_models_v02::Field::Column {
|
||||
column: ndc_models_v02::FieldName::new(column.into_inner()),
|
||||
fields: nested_fields,
|
||||
arguments: make_arguments(arguments)?,
|
||||
})
|
||||
}
|
||||
Field::Relationship {
|
||||
query_node,
|
||||
relationship,
|
||||
arguments,
|
||||
} => {
|
||||
let query = make_query(*query_node)?;
|
||||
Ok(ndc_models_v02::Field::Relationship {
|
||||
query: Box::new(query),
|
||||
relationship: ndc_models_v02::RelationshipName::from(relationship.as_str()),
|
||||
arguments: make_relationship_arguments_from_arguments(arguments)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn make_nested_field(nested_field: NestedField) -> Result<ndc_models_v02::NestedField, FieldError> {
|
||||
match nested_field {
|
||||
NestedField::Object(nested_object) => Ok(ndc_models_v02::NestedField::Object(
|
||||
make_nested_object(nested_object)?,
|
||||
)),
|
||||
NestedField::Array(nested_array) => Ok(ndc_models_v02::NestedField::Array(
|
||||
make_nested_array(nested_array)?,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_nested_object(
|
||||
nested_field: NestedObject,
|
||||
) -> Result<ndc_models_v02::NestedObject, FieldError> {
|
||||
let fields = nested_field
|
||||
.fields
|
||||
.into_iter()
|
||||
.map(|(name, field)| {
|
||||
Ok((
|
||||
ndc_models_v02::FieldName::from(name.as_str()),
|
||||
make_field(field)?,
|
||||
))
|
||||
})
|
||||
.collect::<Result<IndexMap<_, _>, FieldError>>()?;
|
||||
Ok(ndc_models_v02::NestedObject { fields })
|
||||
}
|
||||
|
||||
fn make_nested_array(nested_field: NestedArray) -> Result<ndc_models_v02::NestedArray, FieldError> {
|
||||
let fields = make_nested_field(*nested_field.fields)?;
|
||||
Ok(ndc_models_v02::NestedArray {
|
||||
fields: Box::new(fields),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn make_collection_relationships(
|
||||
collection_relationships: BTreeMap<plan_types::NdcRelationshipName, Relationship>,
|
||||
) -> BTreeMap<ndc_models_v02::RelationshipName, ndc_models_v02::Relationship> {
|
||||
collection_relationships
|
||||
.into_iter()
|
||||
.map(|(name, relationship)| {
|
||||
(
|
||||
ndc_models_v02::RelationshipName::from(name.as_str()),
|
||||
make_relationship(relationship),
|
||||
)
|
||||
})
|
||||
.collect::<BTreeMap<_, _>>()
|
||||
}
|
||||
|
||||
fn make_relationship(relationship: Relationship) -> ndc_models_v02::Relationship {
|
||||
ndc_models_v02::Relationship {
|
||||
column_mapping: relationship
|
||||
.column_mapping
|
||||
.into_iter()
|
||||
.map(|(s, t)| {
|
||||
(
|
||||
ndc_models_v02::FieldName::new(s.into_inner()),
|
||||
vec![ndc_models_v02::FieldName::new(t.into_inner())],
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
relationship_type: match relationship.relationship_type {
|
||||
open_dds::relationships::RelationshipType::Object => {
|
||||
ndc_models_v02::RelationshipType::Object
|
||||
}
|
||||
open_dds::relationships::RelationshipType::Array => {
|
||||
ndc_models_v02::RelationshipType::Array
|
||||
}
|
||||
},
|
||||
target_collection: ndc_models_v02::CollectionName::new(
|
||||
relationship.target_collection.into_inner(),
|
||||
),
|
||||
arguments: make_relationship_arguments(relationship.arguments),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_order_by(order_by_elements: Vec<OrderByElement>) -> ndc_models_v02::OrderBy {
|
||||
ndc_models_v02::OrderBy {
|
||||
elements: order_by_elements
|
||||
.into_iter()
|
||||
.map(|element| ndc_models_v02::OrderByElement {
|
||||
order_direction: match element.order_direction {
|
||||
OrderByDirection::Asc => ndc_models_v02::OrderDirection::Asc,
|
||||
OrderByDirection::Desc => ndc_models_v02::OrderDirection::Desc,
|
||||
},
|
||||
target: make_order_by_target(element.target),
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_order_by_target(target: OrderByTarget) -> ndc_models_v02::OrderByTarget {
|
||||
match target {
|
||||
OrderByTarget::Column {
|
||||
name,
|
||||
field_path,
|
||||
relationship_path,
|
||||
} => {
|
||||
let mut order_by_element_path = Vec::new();
|
||||
// When using a nested relationship column, you'll have to provide all the relationships(paths)
|
||||
// NDC has to traverse to access the column. The ordering of that paths is important.
|
||||
// The order decides how to access the column.
|
||||
//
|
||||
// For example, if you have a model called `User` with a relationship column called `Posts`
|
||||
// which has a relationship column called `Comments` which has a non-relationship column
|
||||
// called `text`, you'll have to provide the following paths to access the `text` column:
|
||||
// ["UserPosts", "PostsComments"]
|
||||
for path in relationship_path {
|
||||
order_by_element_path.push(ndc_models_v02::PathElement {
|
||||
field_path: None,
|
||||
relationship: ndc_models_v02::RelationshipName::from(path.0.as_str()),
|
||||
arguments: BTreeMap::new(),
|
||||
// 'AND' predicate indicates that the column can be accessed
|
||||
// by joining all the relationships paths provided
|
||||
predicate: Some(Box::new(ndc_models_v02::Expression::And {
|
||||
// TODO(naveen): Add expressions here, when we support sorting with predicates.
|
||||
//
|
||||
// There are two types of sorting:
|
||||
// 1. Sorting without predicates
|
||||
// 2. Sorting with predicates
|
||||
//
|
||||
// In the 1st sort, we sort all the elements of the results either in ascending
|
||||
// or descing order based on the order_by argument.
|
||||
//
|
||||
// In the 2nd sort, we want fetch the entire result but only sort a subset
|
||||
// of result and put those sorted set either at the beginning or at the end of the
|
||||
// result.
|
||||
//
|
||||
// Currently we only support the 1st type of sort. Hence we don't have any expressions/predicate.
|
||||
expressions: Vec::new(),
|
||||
})),
|
||||
});
|
||||
}
|
||||
|
||||
ndc_models_v02::OrderByTarget::Column {
|
||||
name: ndc_models_v02::FieldName::new(name.into_inner()),
|
||||
arguments: BTreeMap::new(),
|
||||
path: order_by_element_path,
|
||||
field_path: field_path.map(|field_path| {
|
||||
field_path
|
||||
.iter()
|
||||
.map(|name| ndc_models_v02::FieldName::from(name.as_str()))
|
||||
.collect()
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Translates the internal IR 'AggregateSelectionSet' into an NDC query aggregates selection
|
||||
fn make_aggregates(
|
||||
aggregate_selection_set: AggregateSelectionSet,
|
||||
) -> IndexMap<ndc_models_v02::FieldName, ndc_models_v02::Aggregate> {
|
||||
aggregate_selection_set
|
||||
.fields
|
||||
.into_iter()
|
||||
.map(|(field_name, aggregate_selection)| {
|
||||
let aggregate = match aggregate_selection {
|
||||
AggregateFieldSelection::Count { column_path, .. } => {
|
||||
make_count_aggregate(column_path, false)
|
||||
}
|
||||
AggregateFieldSelection::CountDistinct { column_path, .. } => {
|
||||
make_count_aggregate(column_path, true)
|
||||
}
|
||||
AggregateFieldSelection::AggregationFunction {
|
||||
function_name,
|
||||
column_path,
|
||||
} => {
|
||||
let nonempty::NonEmpty {
|
||||
head: column,
|
||||
tail: field_path,
|
||||
} = column_path;
|
||||
let nested_field_path = field_path
|
||||
.into_iter()
|
||||
.map(|column_name| {
|
||||
ndc_models_v02::FieldName::from(column_name.into_inner())
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
ndc_models_v02::Aggregate::SingleColumn {
|
||||
column: ndc_models_v02::FieldName::from(column.into_inner()),
|
||||
arguments: BTreeMap::new(),
|
||||
field_path: if nested_field_path.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(nested_field_path)
|
||||
},
|
||||
function: ndc_models_v02::AggregateFunctionName::from(
|
||||
function_name.as_str(),
|
||||
),
|
||||
}
|
||||
}
|
||||
};
|
||||
(
|
||||
ndc_models_v02::FieldName::from(field_name.as_str()),
|
||||
aggregate,
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Creates the appropriate NDC count aggregation based on whether we're selecting
|
||||
/// a column (nested or otherwise) or not
|
||||
fn make_count_aggregate(
|
||||
column_path: Vec<DataConnectorColumnName>,
|
||||
distinct: bool,
|
||||
) -> ndc_models_v02::Aggregate {
|
||||
let mut column_path_iter = column_path.into_iter();
|
||||
if let Some(first_path_element) = column_path_iter.next() {
|
||||
let remaining_path = column_path_iter
|
||||
.map(|column_name| ndc_models_v02::FieldName::from(column_name.into_inner()))
|
||||
.collect::<Vec<_>>();
|
||||
let nested_field_path = if remaining_path.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(remaining_path)
|
||||
};
|
||||
ndc_models_v02::Aggregate::ColumnCount {
|
||||
column: ndc_models_v02::FieldName::from(first_path_element.into_inner()),
|
||||
arguments: BTreeMap::new(),
|
||||
field_path: nested_field_path,
|
||||
distinct,
|
||||
}
|
||||
} else {
|
||||
ndc_models_v02::Aggregate::StarCount {}
|
||||
}
|
||||
}
|
202
v3/crates/execute/src/execute/remote_joins.rs
Normal file
202
v3/crates/execute/src/execute/remote_joins.rs
Normal file
@ -0,0 +1,202 @@
|
||||
//! Implements execution of Remote Joins
|
||||
//!
|
||||
//! # Introduction
|
||||
//! "Remote Join" is a feature where engine can fetch data from different data
|
||||
//! connectors, and based on some field mapping, can
|
||||
//! [join](https://cghlewis.com/blog/joins/) the dataset from the two different
|
||||
//! data sources.
|
||||
//!
|
||||
//! Let's consider the following simple example. Consider there are two data
|
||||
//! connectors -
|
||||
//! 1. Geographical information about cities in `connector A`
|
||||
//!
|
||||
//! `cities (name text, code text)`
|
||||
//!
|
||||
//! 2. Weather data about cities in `connector B`
|
||||
//!
|
||||
//! `weather (city_code text, temperature float, wind_speed float, humidity float)`
|
||||
//!
|
||||
//!
|
||||
//! Given the above, and assuming there are corresponding models defined for
|
||||
//! these two tables in the metadata, one can define a [remote
|
||||
//! relationship](https://hasura.io/docs/3.0/supergraph-modeling/relationships/).
|
||||
//! In the above two tables, the "city code" is guaranteed to be a unique field.
|
||||
//! So one can use that as the mapping field and define a relationship.
|
||||
//!
|
||||
//! Once a relationship is defined, one can make a GraphQL query like -
|
||||
//!
|
||||
//! ```graphql
|
||||
//! city {
|
||||
//! name
|
||||
//! weather {
|
||||
//! temperature
|
||||
//! humidity
|
||||
//! }
|
||||
//! }
|
||||
//! ```
|
||||
//! #### Note
|
||||
//! There has to be one or more fields across the two datasets which are unique.
|
||||
//! The remote relationship mapping should be defined using those fields. If
|
||||
//! there are no collection of unique fields, then a remote relationship cannot
|
||||
//! be defined.
|
||||
//!
|
||||
//! # How it works
|
||||
//!
|
||||
//! ## IR generation
|
||||
//! - Selection set IR generation function - [generate_selection_set_ir]
|
||||
//! - Model Remote relationship - [build_remote_relationship]
|
||||
//! - Command Remote relationship - [build_remote_command_relationship]
|
||||
//!
|
||||
//! ## Join Tree generation
|
||||
//! - The join tree is generated as part of query plan. See [plan_selection_set] function.
|
||||
//!
|
||||
//! ## Execution
|
||||
//! Following is the high-level algorithm how remote joins execution is performed.
|
||||
//!
|
||||
//! 1. Make the first top-level NDC query, and call the response as LHS response.
|
||||
//!
|
||||
//! 2. Traverse the join tree to get to the next remote join. Iterate through
|
||||
//! all the rows in the LHS response, use the field mapping in the remote join node
|
||||
//! to collect the values of those fields. In the above example, these would be
|
||||
//! collecting the city codes from the LHS city query.
|
||||
//!
|
||||
//! 3. Get the NDC query from the remote join node, and attach the values in the
|
||||
//! above step as variables in the NDC query. This NDC query already has a
|
||||
//! "where" filter clause with a variable on the join mapping field. Make the
|
||||
//! NDC query, and call the response as RHS response.
|
||||
//!
|
||||
//! 4. If there is a sub-tree from this remote join node, recursively perform
|
||||
//! this algorithm.
|
||||
//!
|
||||
//! 5. Perform join on LHS response and RHS response
|
||||
//!
|
||||
//! [plan_selection_set]: crate::plan::selection_set::plan_selection_set
|
||||
//! [generate_selection_set_ir]: graphql_ir::generate_selection_set_ir
|
||||
//! [build_remote_relationship]: graphql_ir::build_remote_relationship
|
||||
//! [build_remote_command_relationship]: graphql_ir::build_remote_command_relationship
|
||||
|
||||
use serde_json as json;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use tracing_util::SpanVisibility;
|
||||
|
||||
use plan_types::ProcessResponseAs;
|
||||
|
||||
use crate::ndc::execute_ndc_query;
|
||||
use crate::{error, HttpContext, ProjectId};
|
||||
|
||||
use collect::ExecutableJoinNode;
|
||||
use plan_types::{JoinLocations, RemoteJoinArgument};
|
||||
mod collect;
|
||||
mod join;
|
||||
|
||||
use async_recursion::async_recursion;
|
||||
|
||||
/// Execute remote joins. As an entry-point it assumes the response is available
|
||||
/// for the top-level query, and executes further remote joins recursively.
|
||||
|
||||
#[async_recursion]
|
||||
pub async fn execute_join_locations(
|
||||
http_context: &HttpContext,
|
||||
execution_span_attribute: &'static str,
|
||||
lhs_response: &mut Vec<ndc_models::RowSet>,
|
||||
lhs_response_type: &ProcessResponseAs,
|
||||
join_locations: &JoinLocations,
|
||||
project_id: Option<&ProjectId>,
|
||||
) -> Result<(), error::FieldError> {
|
||||
let tracer = tracing_util::global_tracer();
|
||||
|
||||
// collect the join column arguments from the LHS response
|
||||
let mut location_path = Vec::new();
|
||||
let next_join_nodes = tracer.in_span(
|
||||
"collect_arguments",
|
||||
"Collect arguments for join",
|
||||
SpanVisibility::Internal,
|
||||
|| {
|
||||
collect::collect_next_join_nodes(
|
||||
lhs_response,
|
||||
lhs_response_type,
|
||||
join_locations,
|
||||
&mut location_path,
|
||||
)
|
||||
},
|
||||
)?;
|
||||
|
||||
for executable_join_node in next_join_nodes {
|
||||
let ExecutableJoinNode {
|
||||
arguments,
|
||||
location_path,
|
||||
mut join_node,
|
||||
sub_tree,
|
||||
remote_alias,
|
||||
} = executable_join_node;
|
||||
|
||||
// if we do not get any join arguments back, we have nothing on the RHS
|
||||
// to execute. Skip execution.
|
||||
if arguments.is_empty() {
|
||||
continue;
|
||||
}
|
||||
// patch the target/RHS IR with variable values
|
||||
let foreach_variables: Vec<BTreeMap<plan_types::VariableName, json::Value>> = arguments
|
||||
.iter()
|
||||
.map(|bmap| bmap.iter().map(|(k, v)| (k.clone(), v.0.clone())).collect())
|
||||
.collect();
|
||||
|
||||
join_node.target_ndc_execution.variables = Some(foreach_variables);
|
||||
|
||||
let ndc_query =
|
||||
super::ndc_request::make_ndc_query_request(join_node.target_ndc_execution.clone())?;
|
||||
|
||||
// execute the remote query
|
||||
let mut target_response = tracer
|
||||
.in_span_async(
|
||||
"execute_remote_join_query",
|
||||
"Execute remote query for join",
|
||||
SpanVisibility::Internal,
|
||||
|| {
|
||||
Box::pin(execute_ndc_query(
|
||||
http_context,
|
||||
&ndc_query,
|
||||
&join_node.target_data_connector,
|
||||
execution_span_attribute,
|
||||
remote_alias.clone(),
|
||||
project_id,
|
||||
))
|
||||
},
|
||||
)
|
||||
.await?
|
||||
.as_latest_rowsets();
|
||||
|
||||
// if the sub-tree is not empty, recursively process the sub-tree; which
|
||||
// will modify the `target_response` with all joins down the tree
|
||||
if !sub_tree.locations.is_empty() {
|
||||
execute_join_locations(
|
||||
http_context,
|
||||
execution_span_attribute,
|
||||
&mut target_response,
|
||||
&join_node.process_response_as,
|
||||
&sub_tree,
|
||||
project_id,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
tracer.in_span(
|
||||
"response_join",
|
||||
"Join responses for remote query",
|
||||
SpanVisibility::Internal,
|
||||
|| {
|
||||
// from `Vec<RowSet>` create `HashMap<Argument, RowSet>`
|
||||
let rhs_response: HashMap<RemoteJoinArgument, ndc_models::RowSet> =
|
||||
arguments.into_iter().zip(target_response).collect();
|
||||
|
||||
join::join_responses(
|
||||
&location_path,
|
||||
&join_node,
|
||||
&remote_alias,
|
||||
lhs_response,
|
||||
&rhs_response,
|
||||
)
|
||||
},
|
||||
)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
331
v3/crates/execute/src/execute/remote_joins/collect.rs
Normal file
331
v3/crates/execute/src/execute/remote_joins/collect.rs
Normal file
@ -0,0 +1,331 @@
|
||||
//! Implements the collection phase of Remote Joins execution
|
||||
//!
|
||||
//! Collection phase is where engine receives a response from a connector, and
|
||||
//! then traverses the response to collect all the values defined in the
|
||||
//! relationship field mapping.
|
||||
|
||||
use indexmap::IndexMap;
|
||||
use lang_graphql::ast::common as ast;
|
||||
use nonempty::NonEmpty;
|
||||
use serde_json as json;
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
|
||||
use json_ext::ValueExt;
|
||||
|
||||
use super::error;
|
||||
use crate::ndc::FUNCTION_IR_VALUE_COLUMN_NAME;
|
||||
use plan_types::{
|
||||
JoinLocations, JoinNode, LocationKind, RemoteJoin, RemoteJoinArgument, SourceFieldAlias,
|
||||
TargetField,
|
||||
};
|
||||
use plan_types::{ProcessResponseAs, VariableName};
|
||||
|
||||
/// An executable join node is a remote join node, it's collected join values
|
||||
/// from a LHS response, and the rest of the join sub-tree
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ExecutableJoinNode {
|
||||
pub(crate) join_node: RemoteJoin,
|
||||
pub(crate) remote_alias: String,
|
||||
pub(crate) location_path: Vec<LocationInfo>,
|
||||
pub(crate) arguments: HashSet<RemoteJoinArgument>,
|
||||
pub(crate) sub_tree: JoinLocations,
|
||||
}
|
||||
|
||||
/// Indicates a field alias which might have more nesting inside
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct LocationInfo {
|
||||
pub(crate) alias: String,
|
||||
pub(crate) location_kind: LocationKind,
|
||||
}
|
||||
|
||||
/// Given a LHS response and `JoinLocations` tree, get the next executable join
|
||||
/// nodes down the tree. Also, extract the join values from the response.
|
||||
pub(crate) fn collect_next_join_nodes(
|
||||
lhs_response: &Vec<ndc_models::RowSet>,
|
||||
lhs_response_type: &ProcessResponseAs,
|
||||
join_locations: &JoinLocations,
|
||||
path: &mut [LocationInfo],
|
||||
) -> Result<Vec<ExecutableJoinNode>, error::FieldError> {
|
||||
let mut arguments_results = Vec::new();
|
||||
|
||||
// if lhs_response is empty, there are no rows to collect arguments from
|
||||
if lhs_response.is_empty() {
|
||||
return Ok(arguments_results);
|
||||
}
|
||||
|
||||
for (alias, location) in &join_locations.locations {
|
||||
match &location.join_node {
|
||||
JoinNode::Remote(join_node) => {
|
||||
let join_fields = get_join_fields(join_node);
|
||||
let arguments = collect_argument_from_rows(
|
||||
lhs_response,
|
||||
lhs_response_type,
|
||||
&join_fields,
|
||||
path,
|
||||
)?;
|
||||
arguments_results.push(ExecutableJoinNode {
|
||||
arguments,
|
||||
location_path: path.to_owned(),
|
||||
join_node: join_node.clone(),
|
||||
sub_tree: location.rest.clone(),
|
||||
remote_alias: alias.clone(),
|
||||
});
|
||||
}
|
||||
JoinNode::Local(location_kind) => {
|
||||
let mut new_path = path.to_owned();
|
||||
new_path.push(LocationInfo {
|
||||
alias: alias.clone(),
|
||||
location_kind: *location_kind,
|
||||
});
|
||||
let inner_arguments_by_remote = collect_next_join_nodes(
|
||||
lhs_response,
|
||||
lhs_response_type,
|
||||
&location.rest,
|
||||
&mut new_path,
|
||||
)?;
|
||||
arguments_results.extend(inner_arguments_by_remote);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(arguments_results)
|
||||
}
|
||||
|
||||
/// Iterate over the `Vec<RowSet>` structure to get to each row, and collect
|
||||
/// arguments from each row
|
||||
fn collect_argument_from_rows(
|
||||
lhs_response: &Vec<ndc_models::RowSet>,
|
||||
lhs_response_type: &ProcessResponseAs,
|
||||
join_fields: &Vec<(&SourceFieldAlias, VariableName)>,
|
||||
path: &[LocationInfo],
|
||||
) -> Result<HashSet<RemoteJoinArgument>, error::FieldError> {
|
||||
let mut arguments = HashSet::new();
|
||||
for row_set in lhs_response {
|
||||
if let Some(ref rows) = row_set.rows {
|
||||
for row in rows {
|
||||
match lhs_response_type {
|
||||
ProcessResponseAs::Array { .. } | ProcessResponseAs::Object { .. } => {
|
||||
collect_argument_from_row(row, join_fields, path, &mut arguments)?;
|
||||
}
|
||||
ProcessResponseAs::Aggregates { .. } => {
|
||||
return Err(error::FieldInternalError::InternalGeneric {
|
||||
description:
|
||||
"Unexpected aggregate response on the LHS of a remote join"
|
||||
.to_owned(),
|
||||
}
|
||||
.into())
|
||||
}
|
||||
ProcessResponseAs::CommandResponse {
|
||||
command_name: _,
|
||||
type_container,
|
||||
response_config: _,
|
||||
} => {
|
||||
let mut command_rows = resolve_command_response_row(row, type_container)?;
|
||||
for command_row in &mut command_rows {
|
||||
collect_argument_from_row(
|
||||
command_row,
|
||||
join_fields,
|
||||
path,
|
||||
&mut arguments,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(arguments)
|
||||
}
|
||||
|
||||
/// From each row gather arguments based on join fields
|
||||
fn collect_argument_from_row(
|
||||
row: &IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue>,
|
||||
join_fields: &Vec<(&SourceFieldAlias, VariableName)>,
|
||||
path: &[LocationInfo],
|
||||
arguments: &mut HashSet<RemoteJoinArgument>,
|
||||
) -> Result<(), error::FieldError> {
|
||||
match NonEmpty::from_slice(path) {
|
||||
None => {
|
||||
let argument = create_argument(join_fields, row);
|
||||
// de-duplicate arguments
|
||||
arguments.insert(argument);
|
||||
}
|
||||
Some(nonempty_path) => {
|
||||
let (
|
||||
LocationInfo {
|
||||
alias,
|
||||
location_kind,
|
||||
},
|
||||
path_tail,
|
||||
) = nonempty_path.split_first();
|
||||
let nested_val =
|
||||
row.get(alias.as_str())
|
||||
.ok_or(error::FieldInternalError::InternalGeneric {
|
||||
description: "invalid NDC response; could not find {key} in response"
|
||||
.to_string(),
|
||||
})?;
|
||||
if let Some(parsed_rows) = rows_from_row_field_value(*location_kind, nested_val)? {
|
||||
for inner_row in parsed_rows {
|
||||
collect_argument_from_row(&inner_row, join_fields, path_tail, arguments)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get all the field aliases in LHS used for the join (i.e. fields used in the
|
||||
/// Relationship mapping), and also the variables used in the RHS IR
|
||||
pub(crate) fn get_join_fields(join_node: &RemoteJoin) -> Vec<(&SourceFieldAlias, VariableName)> {
|
||||
let mut join_fields = vec![];
|
||||
for (src_alias, target_field) in join_node.join_mapping.values() {
|
||||
match target_field {
|
||||
TargetField::ModelField((_, field_mapping)) => {
|
||||
// use the target field name here to create the variable
|
||||
// name to be used in RHS
|
||||
let variable_name = VariableName(format!("${}", &field_mapping.column));
|
||||
join_fields.push((src_alias, variable_name));
|
||||
}
|
||||
TargetField::CommandField(argument_name) => {
|
||||
// use the target argument name here to create the variable
|
||||
// name to be used in RHS
|
||||
let variable_name = VariableName(format!("${}", &argument_name));
|
||||
join_fields.push((src_alias, variable_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
join_fields
|
||||
}
|
||||
|
||||
/// From a row, given the join fields, collect the values of the fields and
|
||||
/// return them as 'RemoteJoinArgument'
|
||||
pub(crate) fn create_argument(
|
||||
join_fields: &Vec<(&SourceFieldAlias, VariableName)>,
|
||||
row: &IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue>,
|
||||
) -> RemoteJoinArgument {
|
||||
let mut argument = BTreeMap::new();
|
||||
for (src_alias, variable_name) in join_fields {
|
||||
let val = get_value(src_alias, row);
|
||||
argument.insert(variable_name.clone(), ValueExt::from(val.clone()));
|
||||
}
|
||||
argument
|
||||
}
|
||||
|
||||
pub(crate) fn get_value<'n>(
|
||||
pick_alias: &SourceFieldAlias,
|
||||
row: &'n IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue>,
|
||||
) -> &'n json::Value {
|
||||
match row.get(pick_alias.0.as_str()) {
|
||||
Some(v) => &v.0,
|
||||
None => &json::Value::Null,
|
||||
}
|
||||
}
|
||||
|
||||
fn rows_from_row_field_value(
|
||||
location_kind: LocationKind,
|
||||
nested_val: &ndc_models::RowFieldValue,
|
||||
) -> Result<
|
||||
Option<Vec<IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue>>>,
|
||||
error::FieldError,
|
||||
> {
|
||||
let rows: Option<Vec<IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue>>> =
|
||||
match location_kind {
|
||||
LocationKind::NestedData => Some(
|
||||
{
|
||||
let this = nested_val.clone();
|
||||
match this.0 {
|
||||
serde_json::Value::Array(_) => serde_json::from_value(this.0).ok(),
|
||||
serde_json::Value::Object(_) => {
|
||||
serde_json::from_value(this.0).ok().map(|v| vec![v])
|
||||
}
|
||||
serde_json::Value::Null => Some(vec![]),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
.ok_or_else(|| error::FieldInternalError::InternalGeneric {
|
||||
description: "unexpected: could not find rows in NDC nested response: "
|
||||
.to_string()
|
||||
+ &nested_val.0.to_string(),
|
||||
}),
|
||||
)
|
||||
.transpose(),
|
||||
LocationKind::LocalRelationship => {
|
||||
// Get the NDC response with nested selection (i.e. in case of
|
||||
// relationships) as a RowSet
|
||||
let row_set = nested_val
|
||||
// TODO: remove clone -> depends on ndc-client providing an API e.g. as_mut_rowset()
|
||||
.clone()
|
||||
.as_rowset()
|
||||
.ok_or_else(|| error::FieldInternalError::InternalGeneric {
|
||||
description: "unexpected: could not find RowSet in NDC nested response: "
|
||||
.to_string()
|
||||
+ &nested_val.0.to_string(),
|
||||
})?;
|
||||
Ok(row_set.rows)
|
||||
}
|
||||
}?;
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
/// resolve/process the command response for remote join execution
|
||||
fn resolve_command_response_row(
|
||||
row: &IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue>,
|
||||
type_container: &ast::TypeContainer<ast::TypeName>,
|
||||
) -> Result<Vec<IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue>>, error::FieldError> {
|
||||
let field_value_result = row.get(FUNCTION_IR_VALUE_COLUMN_NAME).ok_or_else(|| {
|
||||
error::NDCUnexpectedError::BadNDCResponse {
|
||||
summary: format!("missing field: {FUNCTION_IR_VALUE_COLUMN_NAME}"),
|
||||
}
|
||||
})?;
|
||||
|
||||
// If the command has a selection set, then the structure of the
|
||||
// response should either be a `Array <Object>` or `<Object>` or null,
|
||||
// where `<Object>` is the map of the selection set field and it's
|
||||
// value.
|
||||
match &field_value_result.0 {
|
||||
json::Value::String(_) | json::Value::Bool(_) | json::Value::Number(_) => {
|
||||
Err(error::NDCUnexpectedError::BadNDCResponse {
|
||||
summary: "Unable to parse response from NDC, object or array value expected for relationship".into(),
|
||||
})?
|
||||
}
|
||||
json::Value::Null => {
|
||||
if type_container.nullable {
|
||||
Ok(Vec::new())
|
||||
} else {
|
||||
Err(error::NDCUnexpectedError::BadNDCResponse {
|
||||
summary: "Unable to parse response from NDC, null value expected".into(),
|
||||
})?
|
||||
}
|
||||
}
|
||||
json::Value::Object(result_map) => {
|
||||
if type_container.is_list() {
|
||||
Err(error::NDCUnexpectedError::BadNDCResponse {
|
||||
summary: "Unable to parse response from NDC, object value expected".into(),
|
||||
})?
|
||||
} else {
|
||||
let index_map: IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue> =
|
||||
json::from_value(json::Value::Object(result_map.clone()))?;
|
||||
Ok(vec![index_map])
|
||||
}
|
||||
}
|
||||
json::Value::Array(values) => {
|
||||
// There can be cases when the command returns an array of objects,
|
||||
// but the type container is not a list. This can happen when the
|
||||
// command is used in a relationship whose RHS returns a list of objects
|
||||
// which can have the same value for the field on which the relationship
|
||||
// is defined.
|
||||
// In case the container is not a list, we take the first object from
|
||||
// the array and use that as the value for the relationship otherwise
|
||||
// we return the array of objects.
|
||||
let array_values: Vec<IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue>> =
|
||||
json::from_value(json::Value::Array(values.clone()))?;
|
||||
|
||||
if type_container.is_list(){
|
||||
Ok(array_values)
|
||||
} else {
|
||||
Ok(vec![array_values.into_iter().next().ok_or(error::NDCUnexpectedError::BadNDCResponse {
|
||||
summary: "Unable to parse response from NDC, rowset is empty".into(),
|
||||
})?])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
240
v3/crates/execute/src/execute/remote_joins/join.rs
Normal file
240
v3/crates/execute/src/execute/remote_joins/join.rs
Normal file
@ -0,0 +1,240 @@
|
||||
//! Implements the join phase of Remote Joins execution
|
||||
|
||||
use indexmap::IndexMap;
|
||||
use ndc_models::{self};
|
||||
use nonempty::NonEmpty;
|
||||
use serde_json as json;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::ndc::FUNCTION_IR_VALUE_COLUMN_NAME;
|
||||
|
||||
use super::collect::LocationInfo;
|
||||
use super::{collect, error};
|
||||
use plan_types::{LocationKind, RemoteJoin, RemoteJoinArgument};
|
||||
|
||||
/// Inserts values in the LHS response from values in RHS response, based on the
|
||||
/// mapping field
|
||||
///
|
||||
/// Lookup the argument in `rhs_response` and substitute that value in
|
||||
/// `lhs_response`
|
||||
pub(crate) fn join_responses(
|
||||
location_path: &[LocationInfo],
|
||||
join_node: &RemoteJoin,
|
||||
remote_alias: &str,
|
||||
lhs_response: &mut [ndc_models::RowSet],
|
||||
rhs_response: &HashMap<RemoteJoinArgument, ndc_models::RowSet>,
|
||||
) -> Result<(), error::FieldError> {
|
||||
for row_set in lhs_response.iter_mut() {
|
||||
if let Some(rows) = row_set.rows.as_mut() {
|
||||
for row in rows.iter_mut() {
|
||||
// TODO: have a better interface of traversing through the
|
||||
// response tree, especially for commands
|
||||
let command_result_value = row.get_mut(FUNCTION_IR_VALUE_COLUMN_NAME);
|
||||
match command_result_value {
|
||||
// it's a command response; traversing the response tree is
|
||||
// different
|
||||
Some(row_field_value) => join_command_response(
|
||||
location_path,
|
||||
join_node,
|
||||
remote_alias,
|
||||
row_field_value,
|
||||
rhs_response,
|
||||
)?,
|
||||
None => {
|
||||
insert_value_into_row(
|
||||
location_path,
|
||||
join_node,
|
||||
row,
|
||||
ndc_models::FieldName::from(remote_alias),
|
||||
rhs_response,
|
||||
)?;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// In case of a command response, the response tree traversing is different.
|
||||
/// This helper function handles only that case.
|
||||
///
|
||||
/// NDC returns response to a command in a special column name
|
||||
/// 'FUNCTION_IR_VALUE_COLUMN_NAME', which is a serde::json::Value. We
|
||||
/// destructure the serde value, handle the Object and Array case to insert the
|
||||
/// RHS response appropriately.
|
||||
fn join_command_response(
|
||||
location_path: &[LocationInfo],
|
||||
join_node: &RemoteJoin,
|
||||
remote_alias: &str,
|
||||
row_field_value: &mut ndc_models::RowFieldValue,
|
||||
rhs_response: &HashMap<RemoteJoinArgument, ndc_models::RowSet>,
|
||||
) -> Result<(), error::FieldError> {
|
||||
match &mut row_field_value.0 {
|
||||
json::Value::Array(ref mut arr) => {
|
||||
for command_row in arr.iter_mut() {
|
||||
let new_val = command_row.clone();
|
||||
let mut command_row_parsed: IndexMap<
|
||||
ndc_models::FieldName,
|
||||
ndc_models::RowFieldValue,
|
||||
> = json::from_value(new_val)?;
|
||||
insert_value_into_row(
|
||||
location_path,
|
||||
join_node,
|
||||
&mut command_row_parsed,
|
||||
ndc_models::FieldName::from(remote_alias),
|
||||
rhs_response,
|
||||
)?;
|
||||
*command_row = json::to_value(command_row_parsed)?;
|
||||
}
|
||||
}
|
||||
json::Value::Object(obj) => {
|
||||
let mut command_row = obj
|
||||
.into_iter()
|
||||
.map(|(k, v)| {
|
||||
(
|
||||
ndc_models::FieldName::from(k.as_str()),
|
||||
ndc_models::RowFieldValue(v.clone()),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
insert_value_into_row(
|
||||
location_path,
|
||||
join_node,
|
||||
&mut command_row,
|
||||
ndc_models::FieldName::from(remote_alias),
|
||||
rhs_response,
|
||||
)?;
|
||||
*row_field_value = ndc_models::RowFieldValue(json::to_value(command_row)?);
|
||||
}
|
||||
command_json_val => {
|
||||
return Err(error::FieldError::from(
|
||||
error::FieldInternalError::InternalGeneric {
|
||||
description: format!(
|
||||
"unexpected command response: {command_json_val}; expected Array or Object"
|
||||
),
|
||||
},
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Traverse 'LocationInfo' and insert corresponding RHS response value in a LHS
|
||||
/// response row.
|
||||
fn insert_value_into_row(
|
||||
location_path: &[LocationInfo],
|
||||
join_node: &RemoteJoin,
|
||||
row: &mut IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue>,
|
||||
remote_alias: ndc_models::FieldName,
|
||||
rhs_response: &HashMap<RemoteJoinArgument, ndc_models::RowSet>,
|
||||
) -> Result<(), error::FieldError> {
|
||||
match NonEmpty::from_slice(location_path) {
|
||||
// no location path; so remote join available at this level
|
||||
None => {
|
||||
let join_fields = collect::get_join_fields(join_node);
|
||||
let argument = collect::create_argument(&join_fields, row);
|
||||
let rhs_value = json::to_value(rhs_response.get(&argument))?;
|
||||
row.insert(remote_alias, ndc_models::RowFieldValue(rhs_value));
|
||||
Ok(())
|
||||
}
|
||||
// if there is a location path, traverse the location path to get to
|
||||
// nested rows, and insert value
|
||||
Some(nonempty_path) => {
|
||||
let (
|
||||
LocationInfo {
|
||||
alias,
|
||||
location_kind,
|
||||
},
|
||||
path_tail,
|
||||
) = nonempty_path.split_first();
|
||||
let row_field_val =
|
||||
row.get_mut(alias.as_str())
|
||||
.ok_or(error::FieldInternalError::InternalGeneric {
|
||||
description: "unexpected: could not find {key} in row".into(),
|
||||
})?;
|
||||
visit_location_path_and_insert_value(
|
||||
*location_kind,
|
||||
row_field_val,
|
||||
path_tail,
|
||||
join_node,
|
||||
remote_alias,
|
||||
rhs_response,
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// If there is a location path (i.e. the remote join is nested in a
|
||||
/// relationship or nested selection), traverse the location path of LHS row,
|
||||
/// and insert the RHS response.
|
||||
fn visit_location_path_and_insert_value(
|
||||
location_kind: LocationKind,
|
||||
row_field_val: &mut ndc_models::RowFieldValue,
|
||||
path_tail: &[LocationInfo],
|
||||
join_node: &RemoteJoin,
|
||||
remote_alias: ndc_models::FieldName,
|
||||
rhs_response: &HashMap<RemoteJoinArgument, ndc_models::RowSet>,
|
||||
) -> Result<(), error::FieldError> {
|
||||
match location_kind {
|
||||
LocationKind::LocalRelationship => {
|
||||
let mut row_set: ndc_models::RowSet = json::from_value(row_field_val.0.clone())?;
|
||||
let mut rows = row_set
|
||||
.rows
|
||||
.ok_or(error::FieldInternalError::InternalGeneric {
|
||||
description: "expected row; encountered null".into(),
|
||||
})?;
|
||||
|
||||
for inner_row in &mut rows {
|
||||
insert_value_into_row(
|
||||
path_tail,
|
||||
join_node,
|
||||
inner_row,
|
||||
remote_alias.clone(),
|
||||
rhs_response,
|
||||
)?;
|
||||
}
|
||||
row_set.rows = Some(rows);
|
||||
*row_field_val = ndc_models::RowFieldValue(json::to_value(row_set)?);
|
||||
}
|
||||
LocationKind::NestedData => {
|
||||
match row_field_val.0 {
|
||||
serde_json::Value::Array(_) => {
|
||||
if let Ok(mut rows) = serde_json::from_value::<
|
||||
Vec<IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue>>,
|
||||
>(row_field_val.0.clone())
|
||||
{
|
||||
for inner_row in &mut rows {
|
||||
insert_value_into_row(
|
||||
path_tail,
|
||||
join_node,
|
||||
inner_row,
|
||||
remote_alias.clone(),
|
||||
rhs_response,
|
||||
)?;
|
||||
}
|
||||
*row_field_val = ndc_models::RowFieldValue(json::to_value(rows)?);
|
||||
}
|
||||
}
|
||||
serde_json::Value::Object(_) => {
|
||||
if let Ok(mut inner_row) = serde_json::from_value::<
|
||||
IndexMap<ndc_models::FieldName, ndc_models::RowFieldValue>,
|
||||
>(row_field_val.0.clone())
|
||||
{
|
||||
insert_value_into_row(
|
||||
path_tail,
|
||||
join_node,
|
||||
&mut inner_row,
|
||||
remote_alias,
|
||||
rhs_response,
|
||||
)?;
|
||||
*row_field_val = ndc_models::RowFieldValue(json::to_value(inner_row)?);
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
@ -1,4 +1,5 @@
|
||||
mod error;
|
||||
mod execute;
|
||||
pub mod ndc;
|
||||
pub mod plan;
|
||||
mod process_response;
|
||||
@ -8,6 +9,7 @@ mod types;
|
||||
|
||||
// we explicitly export things used by other crates
|
||||
pub use error::{FieldError, QueryUsageAnalyzeError, RequestError};
|
||||
pub use execute::resolve_ndc_query_execution;
|
||||
pub use ndc::fetch_from_data_connector;
|
||||
pub use plan::error::Error as PlanError;
|
||||
pub use plan::filter::plan_remote_predicate;
|
||||
|
@ -43,7 +43,7 @@ use crate::process_response::{process_mutation_response, ProcessedResponse};
|
||||
use graphql_ir::ModelSelection;
|
||||
use graphql_schema::GDSRoleNamespaceGetter;
|
||||
use graphql_schema::GDS;
|
||||
|
||||
use plan_types::ProcessResponseAs;
|
||||
pub type QueryPlan<'n, 's, 'ir> = IndexMap<ast::Alias, NodeQueryPlan<'n, 's, 'ir>>;
|
||||
|
||||
/// Unlike a query, the root nodes of a mutation aren't necessarily independent. Specifically, the
|
||||
@ -151,34 +151,6 @@ pub struct ExecutionTree<'s> {
|
||||
pub remote_join_executions: JoinLocations<'s>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum ProcessResponseAs {
|
||||
Object {
|
||||
is_nullable: bool,
|
||||
},
|
||||
Array {
|
||||
is_nullable: bool,
|
||||
},
|
||||
CommandResponse {
|
||||
command_name: Arc<metadata_resolve::Qualified<open_dds::commands::CommandName>>,
|
||||
type_container: ast::TypeContainer<ast::TypeName>,
|
||||
// how to process a command response
|
||||
response_config: Option<Arc<metadata_resolve::data_connectors::CommandsResponseConfig>>,
|
||||
},
|
||||
Aggregates,
|
||||
}
|
||||
|
||||
impl ProcessResponseAs {
|
||||
pub fn is_nullable(&self) -> bool {
|
||||
match self {
|
||||
ProcessResponseAs::Object { is_nullable }
|
||||
| ProcessResponseAs::Array { is_nullable } => *is_nullable,
|
||||
ProcessResponseAs::CommandResponse { type_container, .. } => type_container.nullable,
|
||||
ProcessResponseAs::Aggregates { .. } => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Build a plan to handle a given request. This plan will either be a mutation plan or a query
|
||||
/// plan, but currently can't be both. This may change when we support protocols other than
|
||||
/// GraphQL.
|
||||
|
@ -15,12 +15,12 @@ use open_dds::commands::CommandName;
|
||||
use open_dds::types::FieldName;
|
||||
|
||||
use super::ndc::FUNCTION_IR_VALUE_COLUMN_NAME;
|
||||
use super::plan::ProcessResponseAs;
|
||||
use crate::error;
|
||||
use graphql_ir::{global_id_col_format, GLOBAL_ID_VERSION};
|
||||
use graphql_schema::{AggregateOutputAnnotation, Annotation, GlobalID, OutputAnnotation, GDS};
|
||||
use metadata_resolve::data_connectors;
|
||||
use metadata_resolve::Qualified;
|
||||
use plan_types::ProcessResponseAs;
|
||||
|
||||
trait KeyValueResponse {
|
||||
fn remove(&mut self, key: &str) -> Option<json::Value>;
|
||||
|
@ -84,9 +84,9 @@ use tracing_util::SpanVisibility;
|
||||
|
||||
use crate::plan;
|
||||
use crate::plan::filter::ResolveFilterExpressionContext;
|
||||
use plan_types::ProcessResponseAs;
|
||||
|
||||
use super::ndc::execute_ndc_query;
|
||||
use super::plan::ProcessResponseAs;
|
||||
use super::{error, HttpContext, ProjectId};
|
||||
|
||||
use self::collect::ExecutableJoinNode;
|
||||
|
@ -17,8 +17,7 @@ use super::types::{
|
||||
Argument, JoinLocations, JoinNode, LocationKind, RemoteJoin, SourceFieldAlias, TargetField,
|
||||
};
|
||||
use crate::ndc::FUNCTION_IR_VALUE_COLUMN_NAME;
|
||||
use crate::plan::ProcessResponseAs;
|
||||
use plan_types::VariableName;
|
||||
use plan_types::{ProcessResponseAs, VariableName};
|
||||
|
||||
/// An executable join node is a remote join node, it's collected join values
|
||||
/// from a LHS response, and the rest of the join sub-tree
|
||||
|
@ -8,7 +8,7 @@ use json_ext::ValueExt;
|
||||
use open_dds::arguments::ArgumentName;
|
||||
use open_dds::types::FieldName;
|
||||
|
||||
use crate::plan::{self, ProcessResponseAs};
|
||||
use crate::plan::{self};
|
||||
|
||||
/// This tree structure captures all the locations (in the selection set IR) where
|
||||
/// remote joins are found.
|
||||
@ -125,7 +125,7 @@ pub struct RemoteJoin<'s> {
|
||||
/// field or an argument name.
|
||||
pub join_mapping: HashMap<SourceFieldName, (SourceFieldAlias, TargetField)>,
|
||||
/// Represents how to process the join response.
|
||||
pub process_response_as: ProcessResponseAs,
|
||||
pub process_response_as: plan_types::ProcessResponseAs,
|
||||
/// Represents the type of the remote join
|
||||
pub remote_join_type: RemoteJoinType,
|
||||
}
|
||||
|
@ -7,8 +7,7 @@ use std::borrow::Cow;
|
||||
use async_recursion::async_recursion;
|
||||
use execute::ndc::client as ndc_client;
|
||||
use execute::plan::{
|
||||
self, ApolloFederationSelect, NDCQueryExecution, NodeQueryPlan, ProcessResponseAs,
|
||||
ResolveFilterExpressionContext,
|
||||
self, ApolloFederationSelect, NDCQueryExecution, NodeQueryPlan, ResolveFilterExpressionContext,
|
||||
};
|
||||
use execute::HttpContext;
|
||||
use execute::{JoinLocations, JoinNode, RemoteJoinType};
|
||||
@ -18,6 +17,7 @@ use lang_graphql as gql;
|
||||
use lang_graphql::ast::common as ast;
|
||||
use lang_graphql::{http::RawRequest, schema::Schema};
|
||||
use nonempty::NonEmpty;
|
||||
use plan_types::ProcessResponseAs;
|
||||
use tracing_util::{AttributeVisibility, SpanVisibility};
|
||||
|
||||
pub async fn execute_explain(
|
||||
|
@ -8,7 +8,9 @@ license.workspace = true
|
||||
bench = false
|
||||
|
||||
[dependencies]
|
||||
json-ext = { path = "../utils/json-ext" }
|
||||
open-dds = { path = "../open-dds" }
|
||||
lang-graphql = { path = "../graphql/lang-graphql" }
|
||||
metadata-resolve = { path = "../metadata-resolve" }
|
||||
|
||||
derive_more = { workspace = true }
|
||||
|
@ -6,7 +6,62 @@ mod filter;
|
||||
mod order_by;
|
||||
mod query;
|
||||
mod relationships;
|
||||
mod remote_joins;
|
||||
use lang_graphql::ast::common as ast;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub use aggregates::{AggregateFieldSelection, AggregateSelectionSet};
|
||||
pub use arguments::Argument;
|
||||
pub use field::{Field, NestedArray, NestedField, NestedObject};
|
||||
pub use filter::ResolvedFilterExpression;
|
||||
pub use order_by::{OrderByDirection, OrderByElement, OrderByTarget};
|
||||
pub use query::QueryExecutionPlan;
|
||||
pub use query::{QueryExecutionPlan, QueryNodeNew};
|
||||
pub use relationships::{Relationship, RelationshipArgument};
|
||||
pub use remote_joins::{
|
||||
JoinLocations, JoinNode, LocationKind, RemoteJoin, RemoteJoinArgument, SourceFieldAlias,
|
||||
TargetField,
|
||||
};
|
||||
|
||||
// these versions of the types are equivalent to the old "Resolved" versions
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct NDCQueryExecution {
|
||||
pub execution_tree: ExecutionTree,
|
||||
pub execution_span_attribute: &'static str,
|
||||
pub field_span_attribute: String,
|
||||
pub process_response_as: ProcessResponseAs,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ExecutionTree {
|
||||
pub query_execution_plan: query::QueryExecutionPlan,
|
||||
pub remote_join_executions: remote_joins::JoinLocations,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum ProcessResponseAs {
|
||||
Object {
|
||||
is_nullable: bool,
|
||||
},
|
||||
Array {
|
||||
is_nullable: bool,
|
||||
},
|
||||
CommandResponse {
|
||||
command_name: Arc<metadata_resolve::Qualified<open_dds::commands::CommandName>>,
|
||||
type_container: ast::TypeContainer<ast::TypeName>,
|
||||
// how to process a command response
|
||||
response_config: Option<Arc<metadata_resolve::data_connectors::CommandsResponseConfig>>,
|
||||
},
|
||||
Aggregates,
|
||||
}
|
||||
|
||||
impl ProcessResponseAs {
|
||||
pub fn is_nullable(&self) -> bool {
|
||||
match self {
|
||||
ProcessResponseAs::Object { is_nullable }
|
||||
| ProcessResponseAs::Array { is_nullable } => *is_nullable,
|
||||
ProcessResponseAs::CommandResponse { type_container, .. } => type_container.nullable,
|
||||
ProcessResponseAs::Aggregates { .. } => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +1,10 @@
|
||||
use super::filter::ResolvedFilterExpression;
|
||||
|
||||
use crate::VariableName;
|
||||
use std::hash::Hash;
|
||||
|
||||
/// Argument plan to express various kinds of arguments
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Hash, Clone, PartialEq, Eq)]
|
||||
pub enum Argument {
|
||||
/// The argument is provided as a literal value
|
||||
Literal {
|
||||
|
156
v3/crates/plan-types/src/execution_plan/remote_joins.rs
Normal file
156
v3/crates/plan-types/src/execution_plan/remote_joins.rs
Normal file
@ -0,0 +1,156 @@
|
||||
//! Join tree and related types for remote joins.
|
||||
//!
|
||||
use super::{query, ProcessResponseAs};
|
||||
use indexmap::IndexMap;
|
||||
use json_ext::ValueExt;
|
||||
use open_dds::arguments::ArgumentName;
|
||||
use open_dds::types::FieldName;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// This tree structure captures all the locations (in the selection set IR) where
|
||||
/// remote joins are found.
|
||||
///
|
||||
/// It also includes other info, like field mapping etc., for the join
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct JoinLocations {
|
||||
pub locations: IndexMap<String, Location>,
|
||||
}
|
||||
|
||||
impl JoinLocations {
|
||||
pub fn new() -> Self {
|
||||
JoinLocations::default()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.locations.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for JoinLocations {
|
||||
fn default() -> Self {
|
||||
JoinLocations {
|
||||
locations: IndexMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum LocationKind {
|
||||
NestedData,
|
||||
LocalRelationship,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum JoinNode {
|
||||
Local(LocationKind),
|
||||
Remote(RemoteJoin),
|
||||
}
|
||||
|
||||
/// Location indicates if the current node/field is a join node.
|
||||
///
|
||||
/// If it is a join node, then there is information about the join (captured as
|
||||
/// [RemoteJoin]). It may further have more join nodes, represented by `rest`.
|
||||
///
|
||||
/// The current node may not have a join node and only have joins in sub-tree.
|
||||
/// This is represented by `join_node` being `None` and `rest` containing the
|
||||
/// sub-tree. This is required to represent where remote join nodes are inside a
|
||||
/// local relationship selection.
|
||||
///
|
||||
/// ### Example
|
||||
/// ```graphql
|
||||
/// city {
|
||||
/// name
|
||||
/// state { # -> local relationship
|
||||
/// name
|
||||
/// census { # -> remote relationship
|
||||
/// data
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
///
|
||||
/// `field mapping: state_id; join_field_alias: __state_id`
|
||||
///
|
||||
/// ```json
|
||||
/// [("state", Location { join_node: None, rest: ... })],
|
||||
/// [("census", Location { join_node: Some(..), rest: ... })]
|
||||
/// ```
|
||||
/// ---
|
||||
/// ```json
|
||||
/// [
|
||||
/// {
|
||||
/// "name": "Bangalore",
|
||||
/// "state": {
|
||||
/// "name": "KA",
|
||||
/// "__state_id": 1
|
||||
/// }
|
||||
/// },
|
||||
/// {
|
||||
/// "name": "Mumbai",
|
||||
/// "state": {
|
||||
/// "name": "MH",
|
||||
/// "__state_id": 2
|
||||
/// }
|
||||
/// }
|
||||
/// ]
|
||||
/// ```
|
||||
///
|
||||
/// Note: `join_node` and `rest` both cannot be empty; it is an invalid/illegal
|
||||
/// state.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Location {
|
||||
pub join_node: JoinNode,
|
||||
pub rest: JoinLocations,
|
||||
}
|
||||
|
||||
/// Contains information to be captured for a join node
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct RemoteJoin {
|
||||
/// target data connector to execute query on
|
||||
pub target_data_connector: Arc<metadata_resolve::DataConnectorLink>,
|
||||
/// NDC node to execute on a data connector
|
||||
pub target_ndc_execution: query::QueryExecutionPlan,
|
||||
/// Mapping of the fields in source to fields in target.
|
||||
/// The HashMap has the following info -
|
||||
/// - key: is the field name in the source
|
||||
/// - value->first item: is the alias we create for the
|
||||
/// source field. If the user did not request the join field in the
|
||||
/// selection set, we include the join mapping field and call it a phantom
|
||||
/// field.
|
||||
/// - value->second item: is the target NDC field. This could be a model
|
||||
/// field or an argument name.
|
||||
pub join_mapping: HashMap<SourceFieldName, (SourceFieldAlias, TargetField)>,
|
||||
/// Represents how to process the join response.
|
||||
pub process_response_as: ProcessResponseAs,
|
||||
/// Represents the type of the remote join
|
||||
pub remote_join_type: RemoteJoinType,
|
||||
}
|
||||
|
||||
/// Name of the source field used in the join mapping
|
||||
pub type SourceFieldName = FieldName;
|
||||
|
||||
/// Alias of the source field used in the join mapping. This is basically a NDC
|
||||
/// field alias (which in the NDC IR is `String`). Change this when modifying
|
||||
/// the IR to have a newtype Alias.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct SourceFieldAlias(pub String);
|
||||
|
||||
/// Target field used in the join mapping
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum TargetField {
|
||||
ModelField((FieldName, metadata_resolve::NdcColumnForComparison)),
|
||||
CommandField(ArgumentName),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum RemoteJoinType {
|
||||
ToModel,
|
||||
ToCommand,
|
||||
}
|
||||
|
||||
/// An 'Argument' is a map of variable name to it's value.
|
||||
/// For example, `{"first_name": "John", "last_name": "Doe"}`
|
||||
pub type RemoteJoinArgument = BTreeMap<crate::VariableName, ValueExt>;
|
@ -7,8 +7,11 @@ mod usage_counts;
|
||||
mod variable_name;
|
||||
|
||||
pub use execution_plan::{
|
||||
AggregateFieldSelection, AggregateSelectionSet, OrderByDirection, OrderByElement,
|
||||
OrderByTarget, QueryExecutionPlan,
|
||||
AggregateFieldSelection, AggregateSelectionSet, Argument, ExecutionTree, Field, JoinLocations,
|
||||
JoinNode, LocationKind, NDCQueryExecution, NestedArray, NestedField, NestedObject,
|
||||
OrderByDirection, OrderByElement, OrderByTarget, ProcessResponseAs, QueryExecutionPlan,
|
||||
QueryNodeNew, Relationship, RelationshipArgument, RemoteJoin, RemoteJoinArgument,
|
||||
ResolvedFilterExpression, SourceFieldAlias, TargetField,
|
||||
};
|
||||
pub use expression::{
|
||||
ComparisonTarget, ComparisonValue, Expression, LocalFieldComparison, RelationshipColumnMapping,
|
||||
|
@ -44,7 +44,7 @@ run-local-with-shell:
|
||||
# start all the docker deps for running tests (not engine)
|
||||
start-docker-test-deps:
|
||||
# start connectors and wait for health
|
||||
docker compose -f ci.docker-compose.yaml up --wait auth_hook postgres postgres_connector custom_connector custom_connector_no_relationships custom_connector_ndc_v01
|
||||
docker compose -f ci.docker-compose.yaml up --wait postgres postgres_connector custom_connector custom_connector_no_relationships custom_connector_ndc_v01
|
||||
|
||||
# start all the docker run time deps for the engine
|
||||
start-docker-run-deps:
|
||||
|
Loading…
Reference in New Issue
Block a user