Use new plan-types in OpenDD pipeline (#1368)

<!-- The PR description should answer 2 important questions: -->

### What

Incrementally moving to new execution plan stuff - this removes more
duplicates types from `execute::plan` and makes the new OpenDD pipeline
use the new types and execution functions.

Functional no-op.

V3_GIT_ORIGIN_REV_ID: c258e5e2991504fc8fbea6cce8e3135a2170f2f3
This commit is contained in:
Daniel Harvey 2024-11-22 14:59:07 +00:00 committed by hasura-bot
parent a68523a75c
commit 28f14c235c
39 changed files with 446 additions and 471 deletions

2
v3/Cargo.lock generated
View File

@ -1830,6 +1830,7 @@ dependencies = [
"oas3",
"open-dds",
"plan",
"plan-types",
"pre-parse-plugin",
"pre-response-plugin",
"pretty_assertions",
@ -3063,6 +3064,7 @@ dependencies = [
"oas3",
"open-dds",
"plan",
"plan-types",
"reqwest",
"serde",
"serde_json",

View File

@ -65,6 +65,8 @@ tower-http = { workspace = true }
build-data = { workspace = true }
[dev-dependencies]
plan-types = { path = "../plan-types" }
criterion = { workspace = true }
goldenfile = { workspace = true }
insta = { workspace = true }

View File

@ -6,10 +6,12 @@ use graphql_schema::GDS;
use hasura_authn_core::{
Identity, JsonSessionVariableValue, Role, Session, SessionError, SessionVariableValue,
};
use indexmap::IndexMap;
use lang_graphql::ast::common as ast;
use lang_graphql::{http::RawRequest, schema::Schema};
use metadata_resolve::{data_connectors::NdcVersion, LifecyclePluginConfigs};
use open_dds::session_variables::{SessionVariableName, SESSION_VARIABLE_ROLE};
use plan_types::{ExecutionTree, JoinLocations, NDCQueryExecution, ProcessResponseAs};
use pretty_assertions::assert_eq;
use serde_json as json;
use sql::catalog::CatalogSerializable;
@ -24,7 +26,6 @@ use std::{
path::Path,
path::PathBuf,
};
extern crate json_value_merge;
use json_value_merge::Merge;
use serde_json::Value;
@ -882,7 +883,7 @@ pub async fn open_dd_pipeline_test(
);
// create a query execution plan for a single node with the new pipeline
let (query_execution_plan, _) = plan::plan_query_request(
let (execution_plan, _) = plan::plan_query_request(
&query_ir,
metadata,
&Arc::new(session.clone()),
@ -892,14 +893,27 @@ pub async fn open_dd_pipeline_test(
.await
.unwrap();
match query_execution_plan {
match execution_plan {
plan::SingleNodeExecutionPlan::Mutation(_) => {
todo!("Executing mutations in OpenDD IR pipeline tests not implemented yet")
}
plan::SingleNodeExecutionPlan::Query(plan) => {
// run the pipeline using functions from GraphQL frontend
let rowsets =
graphql_frontend::resolve_ndc_query_execution(http_context, plan)
plan::SingleNodeExecutionPlan::Query(query_execution_plan) => {
let ndc_query_execution = NDCQueryExecution {
execution_span_attribute: "Engine GraphQL OpenDD pipeline tests",
execution_tree: ExecutionTree {
query_execution_plan,
remote_join_executions: JoinLocations {
locations: IndexMap::new(),
},
},
field_span_attribute: "Engine GraphQL OpenDD pipeline tests".into(),
process_response_as: ProcessResponseAs::Array { is_nullable: false },
};
let rowsets = execute::resolve_ndc_query_execution(
http_context,
ndc_query_execution,
None,
)
.await
.map_err(|e| e.to_string());

View File

@ -3,23 +3,19 @@
// frontend
mod ndc_request;
mod remote_joins;
use super::ndc;
use super::process_response::process_response;
use super::{HttpContext, ProjectId};
use crate::error::FieldError;
use crate::process_response::ProcessedResponse;
use gql::normalized_ast;
use graphql_schema::GDS;
use lang_graphql as gql;
use plan_types::{JoinLocations, NDCQueryExecution, ProcessResponseAs, QueryExecutionPlan};
use crate::ndc;
pub use ndc_request::{make_ndc_mutation_request, make_ndc_query_request};
use plan_types::{
JoinLocations, NDCMutationExecution, NDCQueryExecution, ProcessResponseAs, QueryExecutionPlan,
};
// run ndc query, do any joins, and process result
pub async fn resolve_ndc_query_execution<'ir>(
pub async fn resolve_ndc_query_execution(
http_context: &HttpContext,
ndc_query: NDCQueryExecution,
selection_set: &normalized_ast::SelectionSet<'ir, GDS>,
project_id: Option<&ProjectId>,
) -> Result<ProcessedResponse, FieldError> {
) -> Result<Vec<ndc_models::RowSet>, FieldError> {
let NDCQueryExecution {
execution_tree,
execution_span_attribute,
@ -40,7 +36,6 @@ pub async fn resolve_ndc_query_execution<'ir>(
http_context,
execution_tree.remote_join_executions,
execution_span_attribute,
selection_set,
process_response_as,
project_id,
response_rowsets,
@ -48,7 +43,7 @@ pub async fn resolve_ndc_query_execution<'ir>(
.await
}
async fn execute_ndc_query<'s, 'ir>(
async fn execute_ndc_query<'s>(
http_context: &HttpContext,
query_execution_plan: QueryExecutionPlan,
field_span_attribute: &str,
@ -76,11 +71,10 @@ async fn process_ndc_query_response<'s, 'ir>(
http_context: &HttpContext,
remote_join_executions: JoinLocations,
execution_span_attribute: &'static str,
selection_set: &'ir normalized_ast::SelectionSet<'s, GDS>,
process_response_as: ProcessResponseAs,
project_id: Option<&ProjectId>,
mut response_rowsets: Vec<ndc_models::RowSet>,
) -> Result<ProcessedResponse, FieldError> {
) -> Result<Vec<ndc_models::RowSet>, FieldError> {
// TODO: Failures in remote joins should result in partial response
// https://github.com/hasura/v3-engine/issues/229
remote_joins::execute_join_locations(
@ -93,6 +87,36 @@ async fn process_ndc_query_response<'s, 'ir>(
)
.await?;
// this ties all of this to GraphQL, let's not do this
process_response(selection_set, response_rowsets, &process_response_as)
Ok(response_rowsets)
}
pub async fn resolve_ndc_mutation_execution(
http_context: &HttpContext,
ndc_mutation_execution: NDCMutationExecution,
project_id: Option<&ProjectId>,
) -> Result<ndc_models::MutationResponse, FieldError> {
let NDCMutationExecution {
execution_node,
data_connector,
execution_span_attribute,
field_span_attribute,
process_response_as: _,
// TODO: remote joins are not handled for mutations
join_locations: _,
} = ndc_mutation_execution;
let mutation_request = ndc_request::make_ndc_mutation_request(execution_node)?;
let mutation_response = ndc::execute_ndc_mutation(
http_context,
&mutation_request,
&data_connector,
execution_span_attribute,
field_span_attribute,
project_id,
)
.await?
.as_latest();
Ok(mutation_response)
}

View File

@ -3,7 +3,7 @@ pub mod v02;
use crate::{error, ndc};
use metadata_resolve::data_connectors::NdcVersion;
use plan_types::QueryExecutionPlan;
use plan_types::{MutationExecutionPlan, QueryExecutionPlan};
pub fn make_ndc_query_request(
query_execution_plan: QueryExecutionPlan,
@ -21,3 +21,20 @@ pub fn make_ndc_query_request(
)?)),
}
}
pub fn make_ndc_mutation_request(
mutation_execution_plan: MutationExecutionPlan,
) -> Result<ndc::NdcMutationRequest, error::FieldError> {
match mutation_execution_plan
.data_connector
.capabilities
.supported_ndc_version
{
NdcVersion::V01 => Ok(ndc::NdcMutationRequest::V01(v01::make_mutation_request(
mutation_execution_plan,
)?)),
NdcVersion::V02 => Ok(ndc::NdcMutationRequest::V02(v02::make_mutation_request(
mutation_execution_plan,
)?)),
}
}

View File

@ -6,9 +6,10 @@ use open_dds::types::DataConnectorArgumentName;
use crate::error::{FieldError, FieldInternalError};
use plan_types::{
AggregateFieldSelection, AggregateSelectionSet, Argument, Field, NestedArray, NestedField,
NestedObject, OrderByDirection, OrderByElement, OrderByTarget, QueryExecutionPlan,
QueryNodeNew, Relationship, RelationshipArgument, ResolvedFilterExpression, VariableName,
AggregateFieldSelection, AggregateSelectionSet, Argument, Field, MutationArgument,
MutationExecutionPlan, NestedArray, NestedField, NestedObject, OrderByDirection,
OrderByElement, OrderByTarget, QueryExecutionPlan, QueryNodeNew, Relationship,
RelationshipArgument, ResolvedFilterExpression, VariableName,
};
pub fn make_query_request(
@ -26,6 +27,29 @@ pub fn make_query_request(
Ok(query_request)
}
pub fn make_mutation_request(
mutation_execution_plan: MutationExecutionPlan,
) -> Result<ndc_models_v01::MutationRequest, FieldError> {
let mutation_operation = ndc_models_v01::MutationOperation::Procedure {
name: ndc_models_v01::ProcedureName::new(
mutation_execution_plan.procedure_name.into_inner(),
),
arguments: make_mutation_arguments(mutation_execution_plan.procedure_arguments)?,
fields: mutation_execution_plan
.procedure_fields
.map(make_nested_field)
.transpose()?,
};
let mutation_request = ndc_models_v01::MutationRequest {
operations: vec![mutation_operation],
collection_relationships: make_collection_relationships(
mutation_execution_plan.collection_relationships,
),
};
Ok(mutation_request)
}
fn make_variables(
variables: Option<Vec<BTreeMap<VariableName, serde_json::Value>>>,
) -> Option<Vec<BTreeMap<ndc_models_v01::VariableName, serde_json::Value>>> {
@ -544,3 +568,29 @@ fn make_count_aggregate(
ndc_models_v01::Aggregate::StarCount {}
}
}
fn make_mutation_arguments(
arguments: BTreeMap<DataConnectorArgumentName, MutationArgument>,
) -> Result<BTreeMap<ndc_models_v01::ArgumentName, serde_json::Value>, FieldError> {
arguments
.into_iter()
.map(|(name, argument)| {
Ok((
ndc_models_v01::ArgumentName::new(name.into_inner()),
make_mutation_argument(argument)?,
))
})
.collect::<Result<BTreeMap<_, _>, _>>()
}
fn make_mutation_argument(argument: MutationArgument) -> Result<serde_json::Value, FieldError> {
match argument {
MutationArgument::Literal { value } => Ok(value),
MutationArgument::BooleanExpression { predicate } => {
let ndc_expression = make_expression(predicate)?;
Ok(serde_json::to_value(ndc_expression).map_err(|e| {
FieldError::InternalError(FieldInternalError::ExpressionSerializationError(e))
})?)
}
}
}

View File

@ -7,9 +7,10 @@ use open_dds::types::DataConnectorArgumentName;
use crate::error::{FieldError, FieldInternalError};
use plan_types::{
AggregateFieldSelection, AggregateSelectionSet, Argument, Field, NestedArray, NestedField,
NestedObject, OrderByDirection, OrderByElement, OrderByTarget, QueryExecutionPlan,
QueryNodeNew, Relationship, RelationshipArgument, ResolvedFilterExpression, VariableName,
AggregateFieldSelection, AggregateSelectionSet, Argument, Field, MutationArgument,
MutationExecutionPlan, NestedArray, NestedField, NestedObject, OrderByDirection,
OrderByElement, OrderByTarget, QueryExecutionPlan, QueryNodeNew, Relationship,
RelationshipArgument, ResolvedFilterExpression, VariableName,
};
pub fn make_query_request(
@ -27,6 +28,30 @@ pub fn make_query_request(
Ok(query_request)
}
pub fn make_mutation_request(
mutation_execution_plan: MutationExecutionPlan,
) -> Result<ndc_models_v02::MutationRequest, FieldError> {
let mutation_operation = ndc_models_v02::MutationOperation::Procedure {
name: ndc_models_v02::ProcedureName::new(
mutation_execution_plan.procedure_name.into_inner(),
),
arguments: make_mutation_arguments(mutation_execution_plan.procedure_arguments)?,
fields: mutation_execution_plan
.procedure_fields
.map(make_nested_field)
.transpose()?,
};
let mutation_request = ndc_models_v02::MutationRequest {
operations: vec![mutation_operation],
collection_relationships: make_collection_relationships(
mutation_execution_plan.collection_relationships,
),
};
Ok(mutation_request)
}
fn make_variables(
variables: Option<Vec<BTreeMap<VariableName, serde_json::Value>>>,
) -> Option<Vec<BTreeMap<ndc_models_v02::VariableName, serde_json::Value>>> {
@ -557,3 +582,29 @@ fn make_count_aggregate(
ndc_models_v02::Aggregate::StarCount {}
}
}
fn make_mutation_arguments(
arguments: BTreeMap<DataConnectorArgumentName, MutationArgument>,
) -> Result<BTreeMap<ndc_models_v02::ArgumentName, serde_json::Value>, FieldError> {
arguments
.into_iter()
.map(|(name, argument)| {
Ok((
ndc_models_v02::ArgumentName::new(name.into_inner()),
make_mutation_argument(argument)?,
))
})
.collect::<Result<BTreeMap<_, _>, _>>()
}
fn make_mutation_argument(argument: MutationArgument) -> Result<serde_json::Value, FieldError> {
match argument {
MutationArgument::Literal { value } => Ok(value),
MutationArgument::BooleanExpression { predicate } => {
let ndc_expression = make_expression(predicate)?;
Ok(serde_json::to_value(ndc_expression).map_err(|e| {
FieldError::InternalError(FieldInternalError::ExpressionSerializationError(e))
})?)
}
}
}

View File

@ -9,7 +9,10 @@ mod types;
// we explicitly export things used by other crates
pub use error::{FieldError, QueryUsageAnalyzeError, RequestError};
pub use execute::resolve_ndc_query_execution;
pub use execute::{
make_ndc_mutation_request, make_ndc_query_request, resolve_ndc_mutation_execution,
resolve_ndc_query_execution,
};
pub use ndc::fetch_from_data_connector;
pub use plan::error::Error as PlanError;
pub use plan::filter::plan_remote_predicate;

View File

@ -15,13 +15,10 @@ pub(crate) mod selection_set;
pub use arguments::{Argument, MutationArgument, ResolvedArgument};
pub use field::{ResolvedField, ResolvedNestedField};
pub use filter::{
plan_expression, resolve_expression, PredicateQueryTrees, ResolveFilterExpressionContext,
ResolvedFilterExpression,
};
pub use filter::{plan_expression, resolve_expression, ResolveFilterExpressionContext};
pub use mutation::ResolvedMutationExecutionPlan;
pub use query::{ResolvedQueryExecutionPlan, ResolvedQueryNode, UnresolvedQueryNode};
pub use relationships::{process_model_relationship_definition, Relationship};
pub use relationships::process_model_relationship_definition;
use gql::normalized_ast;
use gql::schema::NamespacedGetter;
@ -43,7 +40,8 @@ use crate::process_response::{process_mutation_response, ProcessedResponse};
use graphql_ir::ModelSelection;
use graphql_schema::GDSRoleNamespaceGetter;
use graphql_schema::GDS;
use plan_types::ProcessResponseAs;
use plan_types::{ProcessResponseAs, ResolvedFilterExpression};
pub type QueryPlan<'n, 's, 'ir> = IndexMap<ast::Alias, NodeQueryPlan<'n, 's, 'ir>>;
/// Unlike a query, the root nodes of a mutation aren't necessarily independent. Specifically, the

View File

@ -3,15 +3,12 @@ use std::collections::BTreeMap;
use super::error as plan_error;
use super::filter;
use super::filter::PredicateQueryTrees;
use super::filter::ResolveFilterExpressionContext;
use super::relationships;
use crate::error;
use plan_types::NdcRelationshipName;
use plan_types::VariableName;
use plan_types::{NdcRelationshipName, PredicateQueryTrees, Relationship, VariableName};
pub type UnresolvedArgument<'s> = Argument<plan_types::Expression<'s>>;
pub type ResolvedArgument = Argument<filter::ResolvedFilterExpression>;
pub type ResolvedArgument = Argument<plan_types::ResolvedFilterExpression>;
/// Argument plan to express various kinds of arguments
#[derive(Debug, Clone, PartialEq)]
@ -33,7 +30,7 @@ impl<'s> UnresolvedArgument<'s> {
/// Generate the argument plan from IR argument
pub fn plan<'a>(
ir_argument: &'a graphql_ir::Argument<'s>,
relationships: &mut BTreeMap<NdcRelationshipName, relationships::Relationship>,
relationships: &mut BTreeMap<NdcRelationshipName, Relationship>,
) -> Result<Self, super::error::Error> {
let planned_argument = match ir_argument {
graphql_ir::Argument::Literal { value } => Argument::Literal {
@ -72,7 +69,7 @@ impl<'s> UnresolvedArgument<'s> {
}
pub type UnresolvedMutationArgument<'s> = MutationArgument<plan_types::Expression<'s>>;
pub type ResolvedMutationArgument = MutationArgument<filter::ResolvedFilterExpression>;
pub type ResolvedMutationArgument = MutationArgument<plan_types::ResolvedFilterExpression>;
/// Argument plan to express various kinds of arguments
#[derive(Debug, Clone, PartialEq)]
@ -90,7 +87,7 @@ impl<'s> UnresolvedMutationArgument<'s> {
/// Generate the argument plan from IR argument
pub fn plan<'a>(
ir_argument: &'a graphql_ir::Argument<'s>,
relationships: &mut BTreeMap<NdcRelationshipName, relationships::Relationship>,
relationships: &mut BTreeMap<NdcRelationshipName, Relationship>,
) -> Result<Self, super::error::Error> {
let planned_argument = match ir_argument {
graphql_ir::Argument::Literal { value } => MutationArgument::Literal {
@ -129,7 +126,7 @@ impl<'s> UnresolvedMutationArgument<'s> {
pub fn plan_arguments<'s>(
arguments: &BTreeMap<DataConnectorArgumentName, graphql_ir::Argument<'s>>,
relationships: &mut BTreeMap<NdcRelationshipName, relationships::Relationship>,
relationships: &mut BTreeMap<NdcRelationshipName, Relationship>,
) -> Result<BTreeMap<DataConnectorArgumentName, UnresolvedArgument<'s>>, plan_error::Error> {
let mut result = BTreeMap::new();
for (argument_name, argument_value) in arguments {
@ -143,7 +140,7 @@ pub fn plan_arguments<'s>(
pub fn plan_mutation_arguments<'s>(
arguments: &BTreeMap<DataConnectorArgumentName, graphql_ir::Argument<'s>>,
relationships: &mut BTreeMap<NdcRelationshipName, relationships::Relationship>,
relationships: &mut BTreeMap<NdcRelationshipName, Relationship>,
) -> Result<BTreeMap<DataConnectorArgumentName, UnresolvedMutationArgument<'s>>, plan_error::Error>
{
arguments
@ -161,7 +158,7 @@ pub(crate) async fn resolve_arguments<'s>(
resolve_context: &ResolveFilterExpressionContext<'_>,
arguments: BTreeMap<DataConnectorArgumentName, Argument<plan_types::Expression<'s>>>,
) -> Result<
BTreeMap<DataConnectorArgumentName, Argument<filter::ResolvedFilterExpression>>,
BTreeMap<DataConnectorArgumentName, Argument<plan_types::ResolvedFilterExpression>>,
error::FieldError,
> {
let mut result = BTreeMap::new();
@ -178,7 +175,7 @@ pub(crate) async fn resolve_mutation_arguments<'s>(
resolve_context: &ResolveFilterExpressionContext<'_>,
arguments: BTreeMap<DataConnectorArgumentName, MutationArgument<plan_types::Expression<'s>>>,
) -> Result<
BTreeMap<DataConnectorArgumentName, MutationArgument<filter::ResolvedFilterExpression>>,
BTreeMap<DataConnectorArgumentName, MutationArgument<plan_types::ResolvedFilterExpression>>,
error::FieldError,
> {
let mut result = BTreeMap::new();

View File

@ -6,20 +6,20 @@ use std::collections::BTreeMap;
use super::arguments;
use super::error;
use super::field;
use super::filter::PredicateQueryTrees;
use super::mutation;
use super::query;
use super::relationships;
use super::selection_set;
use crate::ndc::FUNCTION_IR_VALUE_COLUMN_NAME;
use crate::remote_joins::types::JoinLocations;
use graphql_ir::{CommandInfo, FunctionBasedCommand, ProcedureBasedCommand};
use open_dds::commands::ProcedureName;
use plan_types::{NdcFieldAlias, NdcRelationshipName, VariableName};
use plan_types::{
NdcFieldAlias, NdcRelationshipName, PredicateQueryTrees, Relationship, VariableName,
};
pub(crate) fn plan_query_node<'s>(
ir: &CommandInfo<'s>,
relationships: &mut BTreeMap<NdcRelationshipName, relationships::Relationship>,
relationships: &mut BTreeMap<NdcRelationshipName, Relationship>,
) -> Result<(query::UnresolvedQueryNode<'s>, JoinLocations<'s>), error::Error> {
let mut ndc_nested_field = None;
let mut jl = JoinLocations::new();

View File

@ -7,12 +7,11 @@ use plan_types::NdcRelationshipName;
use std::collections::BTreeMap;
use super::arguments;
use super::filter;
use super::filter::ResolveFilterExpressionContext;
use super::query;
pub type UnresolvedField<'s> = Field<plan_types::Expression<'s>>;
pub type ResolvedField = Field<filter::ResolvedFilterExpression>;
pub type ResolvedField = Field<plan_types::ResolvedFilterExpression>;
/// Field plan
#[derive(Debug, Clone, PartialEq)]
@ -74,7 +73,7 @@ impl<'s> UnresolvedField<'s> {
}
pub type UnresolvedNestedField<'s> = NestedField<plan_types::Expression<'s>>;
pub type ResolvedNestedField = NestedField<filter::ResolvedFilterExpression>;
pub type ResolvedNestedField = NestedField<plan_types::ResolvedFilterExpression>;
#[derive(Debug, Clone, PartialEq)]
pub enum NestedField<TFilterExpression> {
@ -103,7 +102,7 @@ impl<'s> UnresolvedNestedField<'s> {
}
pub type UnresolvedNestedObject<'s> = NestedObject<plan_types::Expression<'s>>;
pub type ResolvedNestedObject = NestedObject<filter::ResolvedFilterExpression>;
pub type ResolvedNestedObject = NestedObject<plan_types::ResolvedFilterExpression>;
#[derive(Debug, Clone, PartialEq)]
pub struct NestedObject<TFilterExpression> {
@ -124,7 +123,7 @@ impl<'s> UnresolvedNestedObject<'s> {
}
pub type UnresolvedNestedArray<'s> = NestedArray<plan_types::Expression<'s>>;
pub type ResolvedNestedArray = NestedArray<filter::ResolvedFilterExpression>;
pub type ResolvedNestedArray = NestedArray<plan_types::ResolvedFilterExpression>;
#[derive(Debug, Clone, PartialEq)]
pub struct NestedArray<TFilterExpression> {

View File

@ -2,17 +2,16 @@ use async_recursion::async_recursion;
use indexmap::{IndexMap, IndexSet};
use std::collections::BTreeMap;
use tracing_util::SpanVisibility;
use uuid::Uuid;
use super::error as plan_error;
use super::field;
use super::ndc_request;
use super::query;
use super::relationships::{self, process_model_relationship_definition};
use super::relationships::process_model_relationship_definition;
use crate::{error, ndc, HttpContext};
use open_dds::data_connector::DataConnectorColumnName;
use plan_types::NdcFieldAlias;
use plan_types::NdcRelationshipName;
use plan_types::{
NdcFieldAlias, NdcRelationshipName, PredicateQueryTrees, Relationship, ResolvedFilterExpression,
};
/// Plan the filter expression IR.
/// This function will take the filter expression IR and convert it into a planned filter expression
@ -24,7 +23,7 @@ pub(crate) fn plan_filter_expression<'s>(
permission_filter,
relationship_join_filter,
}: &graphql_ir::FilterExpression<'s>,
relationships: &mut BTreeMap<NdcRelationshipName, relationships::Relationship>,
relationships: &mut BTreeMap<NdcRelationshipName, Relationship>,
) -> Result<Option<plan_types::Expression<'s>>, plan_error::Error> {
let mut expressions = Vec::new();
let mut remote_predicates = PredicateQueryTrees::new();
@ -65,37 +64,10 @@ pub(crate) fn plan_filter_expression<'s>(
Ok(plan_types::Expression::mk_and(expressions).remove_always_true_expression())
}
/// A tree of queries that are used to execute remote predicates
#[derive(Debug, Clone, PartialEq)]
pub struct PredicateQueryTree {
pub query: query::ResolvedQueryExecutionPlan,
pub children: PredicateQueryTrees,
}
#[derive(Debug, Clone, PartialEq)]
pub struct PredicateQueryTrees(pub BTreeMap<Uuid, PredicateQueryTree>);
impl Default for PredicateQueryTrees {
fn default() -> Self {
Self::new()
}
}
impl PredicateQueryTrees {
pub fn new() -> Self {
Self(BTreeMap::new())
}
pub fn insert(&mut self, value: PredicateQueryTree) -> Uuid {
let key = Uuid::new_v4();
self.0.insert(key, value);
key
}
}
/// Plan the expression IR type.
pub fn plan_expression<'s, 'a>(
expression: &'a plan_types::Expression<'s>,
relationships: &'a mut BTreeMap<NdcRelationshipName, relationships::Relationship>,
relationships: &'a mut BTreeMap<NdcRelationshipName, Relationship>,
_remote_predicates: &'a mut PredicateQueryTrees,
) -> Result<plan_types::Expression<'s>, plan_error::Error> {
match expression {
@ -187,7 +159,7 @@ pub fn plan_remote_predicate<'s, 'a>(
(
query::UnresolvedQueryNode<'s>,
PredicateQueryTrees,
BTreeMap<NdcRelationshipName, relationships::Relationship>,
BTreeMap<NdcRelationshipName, Relationship>,
),
plan_error::Error,
> {
@ -227,117 +199,6 @@ fn build_ndc_query_fields<'s>(
fields
}
/// Filter expression plan to be resolved
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ResolvedFilterExpression {
And {
expressions: Vec<ResolvedFilterExpression>,
},
Or {
expressions: Vec<ResolvedFilterExpression>,
},
Not {
expression: Box<ResolvedFilterExpression>,
},
LocalFieldComparison(plan_types::LocalFieldComparison),
LocalNestedArray {
column: DataConnectorColumnName,
field_path: Vec<DataConnectorColumnName>,
predicate: Box<ResolvedFilterExpression>,
},
LocalRelationshipComparison {
relationship: NdcRelationshipName,
predicate: Box<ResolvedFilterExpression>,
},
RemoteRelationshipComparison {
remote_predicate_id: Uuid,
},
}
impl ResolvedFilterExpression {
pub fn remove_always_true_expression(self) -> Option<ResolvedFilterExpression> {
match &self {
ResolvedFilterExpression::And { expressions } if expressions.is_empty() => None,
ResolvedFilterExpression::Not { expression } => match expression.as_ref() {
ResolvedFilterExpression::Or { expressions } if expressions.is_empty() => None,
_ => Some(self),
},
_ => Some(self),
}
}
/// Creates a 'FilterExpression::And' and applies some basic expression simplification logic
/// to remove redundant boolean logic operators
pub fn mk_and(expressions: Vec<ResolvedFilterExpression>) -> ResolvedFilterExpression {
// If the `and` only contains one expression, we can unwrap it and get rid of the `and`
// ie. and([x]) == x
if expressions.len() == 1 {
expressions.into_iter().next().unwrap()
}
// If all subexpressions are also `and`, we can flatten into a single `and`
// ie. and([and([x,y]), and([a,b])]) == and([x,y,a,b])
else if expressions
.iter()
.all(|expr| matches!(expr, ResolvedFilterExpression::And { .. }))
{
let subexprs = expressions
.into_iter()
.flat_map(|expr| match expr {
ResolvedFilterExpression::And { expressions } => expressions,
_ => vec![],
})
.collect();
ResolvedFilterExpression::And {
expressions: subexprs,
}
} else {
ResolvedFilterExpression::And { expressions }
}
}
/// Creates a 'FilterExpression::Or' and applies some basic expression simplification logic
/// to remove redundant boolean logic operators
pub fn mk_or(expressions: Vec<ResolvedFilterExpression>) -> ResolvedFilterExpression {
// If the `or` only contains one expression, we can unwrap it and get rid of the `or`
// ie. or([x]) == x
if expressions.len() == 1 {
expressions.into_iter().next().unwrap()
}
// If all subexpressions are also `or`, we can flatten into a single `or`
// ie. or([or([x,y]), or([a,b])]) == or([x,y,a,b])
else if expressions
.iter()
.all(|expr| matches!(expr, ResolvedFilterExpression::Or { .. }))
{
let subexprs = expressions
.into_iter()
.flat_map(|expr| match expr {
ResolvedFilterExpression::Or { expressions } => expressions,
_ => vec![],
})
.collect();
ResolvedFilterExpression::Or {
expressions: subexprs,
}
} else {
ResolvedFilterExpression::Or { expressions }
}
}
/// Creates a 'FilterExpression::Not' and applies some basic expression simplification logic
/// to remove redundant boolean logic operators
pub fn mk_not(expression: ResolvedFilterExpression) -> ResolvedFilterExpression {
match expression {
// Double negations can be removed
// ie. not(not(x))) == x
ResolvedFilterExpression::Not { expression } => *expression,
_ => ResolvedFilterExpression::Not {
expression: Box::new(expression),
},
}
}
}
/// Context required to resolve the filter expressions
pub enum ResolveFilterExpressionContext<'req> {
/// Allow only expressions that can be pushed down to NDC.

View File

@ -10,13 +10,13 @@ use super::relationships;
use super::selection_set;
use crate::remote_joins::types::JoinLocations;
use graphql_ir::ModelSelection;
use plan_types::NdcRelationshipName;
use plan_types::{NdcRelationshipName, PredicateQueryTrees, Relationship};
/// Create an NDC `Query` based on the internal IR `ModelSelection` settings
// #[async_recursion]
pub(crate) fn plan_query_node<'s>(
ir: &ModelSelection<'s>,
relationships: &mut BTreeMap<NdcRelationshipName, relationships::Relationship>,
relationships: &mut BTreeMap<NdcRelationshipName, Relationship>,
) -> Result<(query::UnresolvedQueryNode<'s>, JoinLocations<'s>), error::Error> {
let mut query_fields = None;
let mut join_locations = JoinLocations::new();
@ -52,7 +52,7 @@ pub(crate) fn plan_query_execution<'s>(
// collection relationships from order_by clause
relationships::collect_relationships_from_order_by(ir, &mut collection_relationships)?;
let execution_node = query::UnresolvedQueryExecutionPlan {
remote_predicates: filter::PredicateQueryTrees::new(),
remote_predicates: PredicateQueryTrees::new(),
query_node: query,
collection: ir.collection.clone(),
arguments: arguments::plan_arguments(&ir.arguments, &mut collection_relationships)?,

View File

@ -1,17 +1,16 @@
use crate::error;
use open_dds::{commands::ProcedureName, types::DataConnectorArgumentName};
use plan_types::NdcRelationshipName;
use plan_types::{NdcRelationshipName, Relationship};
use std::collections::BTreeMap;
use std::sync::Arc;
use super::arguments;
use super::field;
use super::filter;
use super::filter::ResolveFilterExpressionContext;
use super::relationships;
pub type UnresolvedMutationExecutionPlan<'s> = MutationExecutionPlan<plan_types::Expression<'s>>;
pub type ResolvedMutationExecutionPlan = MutationExecutionPlan<filter::ResolvedFilterExpression>;
pub type ResolvedMutationExecutionPlan =
MutationExecutionPlan<plan_types::ResolvedFilterExpression>;
#[derive(Debug)]
pub struct MutationExecutionPlan<TFilterExpression> {
@ -23,7 +22,7 @@ pub struct MutationExecutionPlan<TFilterExpression> {
/// The fields to return from the result, or null to return everything
pub procedure_fields: Option<field::NestedField<TFilterExpression>>,
/// Any relationships between collections involved in the query request
pub collection_relationships: BTreeMap<NdcRelationshipName, relationships::Relationship>,
pub collection_relationships: BTreeMap<NdcRelationshipName, Relationship>,
/// The data connector used to fetch the data
pub data_connector: Arc<metadata_resolve::DataConnectorLink>,
}

View File

@ -6,14 +6,12 @@ use open_dds::types::DataConnectorArgumentName;
use super::super::arguments;
use super::super::field;
use super::super::filter;
use super::super::mutation;
use super::super::query;
use super::super::relationships;
use crate::error::{FieldError, FieldInternalError};
use plan_types::{
AggregateFieldSelection, AggregateSelectionSet, OrderByDirection, OrderByElement,
OrderByTarget, VariableName,
OrderByTarget, Relationship, RelationshipArgument, ResolvedFilterExpression, VariableName,
};
pub fn make_query_request(
@ -151,7 +149,7 @@ fn make_relationship_argument_from_argument(
}
fn make_relationship_arguments(
arguments: BTreeMap<DataConnectorArgumentName, relationships::RelationshipArgument>,
arguments: BTreeMap<DataConnectorArgumentName, RelationshipArgument>,
) -> BTreeMap<ndc_models_v01::ArgumentName, ndc_models_v01::RelationshipArgument> {
arguments
.into_iter()
@ -165,14 +163,12 @@ fn make_relationship_arguments(
}
fn make_relationship_argument(
argument: relationships::RelationshipArgument,
argument: RelationshipArgument,
) -> ndc_models_v01::RelationshipArgument {
match argument {
relationships::RelationshipArgument::Column { name } => {
ndc_models_v01::RelationshipArgument::Column {
RelationshipArgument::Column { name } => ndc_models_v01::RelationshipArgument::Column {
name: ndc_models_v01::FieldName::new(name.into_inner()),
}
}
},
}
}
@ -205,10 +201,10 @@ fn make_mutation_argument(
}
fn make_expression(
predicate: filter::ResolvedFilterExpression,
predicate: ResolvedFilterExpression,
) -> Result<ndc_models_v01::Expression, FieldError> {
match predicate {
filter::ResolvedFilterExpression::And { expressions } => {
ResolvedFilterExpression::And { expressions } => {
let mut ndc_expressions = Vec::new();
for expression in expressions {
let ndc_expression = make_expression(expression)?;
@ -218,7 +214,7 @@ fn make_expression(
expressions: ndc_expressions,
})
}
filter::ResolvedFilterExpression::Or { expressions } => {
ResolvedFilterExpression::Or { expressions } => {
let mut ndc_expressions = Vec::new();
for expression in expressions {
let ndc_expression = make_expression(expression)?;
@ -228,13 +224,13 @@ fn make_expression(
expressions: ndc_expressions,
})
}
filter::ResolvedFilterExpression::Not { expression } => {
ResolvedFilterExpression::Not { expression } => {
let ndc_expression = make_expression(*expression)?;
Ok(ndc_models_v01::Expression::Not {
expression: Box::new(ndc_expression),
})
}
filter::ResolvedFilterExpression::LocalFieldComparison(
ResolvedFilterExpression::LocalFieldComparison(
plan_types::LocalFieldComparison::BinaryComparison {
column,
operator,
@ -245,7 +241,7 @@ fn make_expression(
operator: ndc_models_v01::ComparisonOperatorName::new(operator.into_inner()),
value: make_comparison_value(value),
}),
filter::ResolvedFilterExpression::LocalNestedArray {
ResolvedFilterExpression::LocalNestedArray {
column,
field_path,
predicate,
@ -265,7 +261,7 @@ fn make_expression(
predicate: Some(Box::new(ndc_expression)),
})
}
filter::ResolvedFilterExpression::LocalFieldComparison(
ResolvedFilterExpression::LocalFieldComparison(
plan_types::LocalFieldComparison::UnaryComparison { column, operator },
) => Ok(ndc_models_v01::Expression::UnaryComparisonOperator {
column: make_comparison_target(column),
@ -275,7 +271,7 @@ fn make_expression(
}
},
}),
filter::ResolvedFilterExpression::LocalRelationshipComparison {
ResolvedFilterExpression::LocalRelationshipComparison {
relationship,
predicate,
} => {
@ -289,7 +285,7 @@ fn make_expression(
})
}
// we are generating NDC request for one connector, we can ignore anything remote
filter::ResolvedFilterExpression::RemoteRelationshipComparison {
ResolvedFilterExpression::RemoteRelationshipComparison {
remote_predicate_id: _,
} => Ok(ndc_models_v01::Expression::And {
expressions: vec![],
@ -404,10 +400,7 @@ fn make_nested_array(
}
fn make_collection_relationships(
collection_relationships: BTreeMap<
plan_types::NdcRelationshipName,
relationships::Relationship,
>,
collection_relationships: BTreeMap<plan_types::NdcRelationshipName, Relationship>,
) -> BTreeMap<ndc_models_v01::RelationshipName, ndc_models_v01::Relationship> {
collection_relationships
.into_iter()
@ -420,7 +413,7 @@ fn make_collection_relationships(
.collect::<BTreeMap<_, _>>()
}
fn make_relationship(relationship: relationships::Relationship) -> ndc_models_v01::Relationship {
fn make_relationship(relationship: Relationship) -> ndc_models_v01::Relationship {
ndc_models_v01::Relationship {
column_mapping: relationship
.column_mapping

View File

@ -7,14 +7,12 @@ use open_dds::types::DataConnectorArgumentName;
use super::super::arguments;
use super::super::field;
use super::super::filter;
use super::super::mutation;
use super::super::query;
use super::super::relationships;
use crate::error::{FieldError, FieldInternalError};
use plan_types::{
AggregateFieldSelection, AggregateSelectionSet, OrderByDirection, OrderByElement,
OrderByTarget, VariableName,
OrderByTarget, Relationship, RelationshipArgument, ResolvedFilterExpression, VariableName,
};
pub fn make_query_request(
@ -153,7 +151,7 @@ fn make_relationship_argument_from_argument(
}
fn make_relationship_arguments(
arguments: BTreeMap<DataConnectorArgumentName, relationships::RelationshipArgument>,
arguments: BTreeMap<DataConnectorArgumentName, RelationshipArgument>,
) -> BTreeMap<ndc_models_v02::ArgumentName, ndc_models_v02::RelationshipArgument> {
arguments
.into_iter()
@ -167,14 +165,12 @@ fn make_relationship_arguments(
}
fn make_relationship_argument(
argument: relationships::RelationshipArgument,
argument: RelationshipArgument,
) -> ndc_models_v02::RelationshipArgument {
match argument {
relationships::RelationshipArgument::Column { name } => {
ndc_models_v02::RelationshipArgument::Column {
RelationshipArgument::Column { name } => ndc_models_v02::RelationshipArgument::Column {
name: ndc_models_v02::FieldName::new(name.into_inner()),
}
}
},
}
}
@ -207,10 +203,10 @@ fn make_mutation_argument(
}
pub fn make_expression(
predicate: filter::ResolvedFilterExpression,
predicate: ResolvedFilterExpression,
) -> Result<ndc_models_v02::Expression, FieldError> {
match predicate {
filter::ResolvedFilterExpression::And { expressions } => {
ResolvedFilterExpression::And { expressions } => {
let mut ndc_expressions = Vec::new();
for expression in expressions {
@ -222,7 +218,7 @@ pub fn make_expression(
expressions: ndc_expressions,
})
}
filter::ResolvedFilterExpression::Or { expressions } => {
ResolvedFilterExpression::Or { expressions } => {
let mut ndc_expressions = Vec::new();
for expression in expressions {
@ -234,14 +230,14 @@ pub fn make_expression(
expressions: ndc_expressions,
})
}
filter::ResolvedFilterExpression::Not { expression } => {
ResolvedFilterExpression::Not { expression } => {
let ndc_expression = make_expression(*expression)?;
Ok(ndc_models_v02::Expression::Not {
expression: Box::new(ndc_expression),
})
}
filter::ResolvedFilterExpression::LocalFieldComparison(
ResolvedFilterExpression::LocalFieldComparison(
plan_types::LocalFieldComparison::BinaryComparison {
column,
operator,
@ -253,7 +249,7 @@ pub fn make_expression(
value: make_comparison_value(value),
}),
filter::ResolvedFilterExpression::LocalNestedArray {
ResolvedFilterExpression::LocalNestedArray {
column,
field_path,
predicate,
@ -273,7 +269,7 @@ pub fn make_expression(
predicate: Some(Box::new(ndc_expression)),
})
}
filter::ResolvedFilterExpression::LocalFieldComparison(
ResolvedFilterExpression::LocalFieldComparison(
plan_types::LocalFieldComparison::UnaryComparison { column, operator },
) => Ok(ndc_models_v02::Expression::UnaryComparisonOperator {
column: make_comparison_target(column),
@ -283,7 +279,7 @@ pub fn make_expression(
}
},
}),
filter::ResolvedFilterExpression::LocalRelationshipComparison {
ResolvedFilterExpression::LocalRelationshipComparison {
relationship,
predicate,
} => {
@ -298,7 +294,7 @@ pub fn make_expression(
})
}
// we are generating NDC request for one connector, we can ignore anything remote
filter::ResolvedFilterExpression::RemoteRelationshipComparison {
ResolvedFilterExpression::RemoteRelationshipComparison {
remote_predicate_id: _,
} => Ok(ndc_models_v02::Expression::And {
expressions: vec![],
@ -413,10 +409,7 @@ fn make_nested_array(
}
pub fn make_collection_relationships(
collection_relationships: BTreeMap<
plan_types::NdcRelationshipName,
relationships::Relationship,
>,
collection_relationships: BTreeMap<plan_types::NdcRelationshipName, Relationship>,
) -> BTreeMap<ndc_models_v02::RelationshipName, ndc_models_v02::Relationship> {
collection_relationships
.into_iter()
@ -429,7 +422,7 @@ pub fn make_collection_relationships(
.collect::<BTreeMap<_, _>>()
}
fn make_relationship(relationship: relationships::Relationship) -> ndc_models_v02::Relationship {
fn make_relationship(relationship: Relationship) -> ndc_models_v02::Relationship {
ndc_models_v02::Relationship {
column_mapping: relationship
.column_mapping

View File

@ -5,7 +5,8 @@ use async_recursion::async_recursion;
use indexmap::IndexMap;
use open_dds::{data_connector::CollectionName, types::DataConnectorArgumentName};
use plan_types::{
AggregateSelectionSet, NdcFieldAlias, NdcRelationshipName, OrderByElement, VariableName,
AggregateSelectionSet, NdcFieldAlias, NdcRelationshipName, OrderByElement, PredicateQueryTrees,
Relationship, VariableName,
};
use std::collections::BTreeMap;
@ -13,21 +14,20 @@ use super::arguments;
use super::field;
use super::filter;
use super::filter::ResolveFilterExpressionContext;
use super::relationships;
pub type UnresolvedQueryExecutionPlan<'s> = QueryExecutionPlan<plan_types::Expression<'s>>;
pub type ResolvedQueryExecutionPlan = QueryExecutionPlan<filter::ResolvedFilterExpression>;
pub type ResolvedQueryExecutionPlan = QueryExecutionPlan<plan_types::ResolvedFilterExpression>;
#[derive(Debug, Clone, PartialEq)]
pub struct QueryExecutionPlan<TFilterExpression> {
pub remote_predicates: filter::PredicateQueryTrees,
pub remote_predicates: PredicateQueryTrees,
pub query_node: QueryNode<TFilterExpression>,
/// The name of a collection
pub collection: CollectionName,
/// Values to be provided to any collection arguments
pub arguments: BTreeMap<DataConnectorArgumentName, arguments::Argument<TFilterExpression>>,
/// Any relationships between collections involved in the query request
pub collection_relationships: BTreeMap<NdcRelationshipName, relationships::Relationship>,
pub collection_relationships: BTreeMap<NdcRelationshipName, Relationship>,
/// One set of named variables for each rowset to fetch. Each variable set
/// should be subtituted in turn, and a fresh set of rows returned.
pub variables: Option<Vec<BTreeMap<VariableName, serde_json::Value>>>,
@ -63,7 +63,7 @@ impl<'s> UnresolvedQueryExecutionPlan<'s> {
}
pub type UnresolvedQueryNode<'s> = QueryNode<plan_types::Expression<'s>>;
pub type ResolvedQueryNode = QueryNode<filter::ResolvedFilterExpression>;
pub type ResolvedQueryNode = QueryNode<plan_types::ResolvedFilterExpression>;
/// Query plan for fetching data
#[derive(Debug, Clone, PartialEq)]

View File

@ -1,6 +1,6 @@
//! NDC query generation from 'ModelSelection' IR for relationships.
use open_dds::data_connector::{CollectionName, DataConnectorColumnName};
use open_dds::data_connector::CollectionName;
use open_dds::relationships::RelationshipType;
use open_dds::types::DataConnectorArgumentName;
use std::collections::BTreeMap;
@ -8,23 +8,9 @@ use std::collections::BTreeMap;
use crate::plan::error;
use graphql_ir::LocalCommandRelationshipInfo;
use graphql_ir::ModelSelection;
use plan_types::{LocalModelRelationshipInfo, NdcRelationshipName};
#[derive(Debug, Clone, PartialEq)]
pub struct Relationship {
/// A mapping between columns on the source collection to columns on the target collection
pub column_mapping: BTreeMap<DataConnectorColumnName, DataConnectorColumnName>,
pub relationship_type: RelationshipType,
/// The name of a collection
pub target_collection: CollectionName,
/// Values to be provided to any collection arguments
pub arguments: BTreeMap<DataConnectorArgumentName, RelationshipArgument>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum RelationshipArgument {
Column { name: DataConnectorColumnName },
}
use plan_types::{
LocalModelRelationshipInfo, NdcRelationshipName, Relationship, RelationshipArgument,
};
/// collect relationships from OrderBy IR component containing relationships.
pub(crate) fn collect_relationships_from_order_by(

View File

@ -15,14 +15,13 @@ use indexmap::IndexMap;
use metadata_resolve::data_connectors::NdcVersion;
use metadata_resolve::FieldMapping;
use open_dds::data_connector::DataConnectorColumnName;
use plan_types::NdcFieldAlias;
use plan_types::NdcRelationshipName;
use plan_types::{NdcFieldAlias, NdcRelationshipName, Relationship};
use std::collections::{BTreeMap, HashMap};
pub(crate) fn plan_nested_selection<'s>(
nested_selection: &NestedSelection<'s>,
ndc_version: NdcVersion,
relationships: &mut BTreeMap<NdcRelationshipName, relationships::Relationship>,
relationships: &mut BTreeMap<NdcRelationshipName, Relationship>,
) -> Result<(field::UnresolvedNestedField<'s>, JoinLocations<'s>), error::Error> {
match nested_selection {
NestedSelection::Object(model_selection) => {
@ -54,7 +53,7 @@ pub(crate) fn plan_nested_selection<'s>(
pub(crate) fn plan_selection_set<'s, 'ir>(
model_selection: &'ir ResultSelectionSet<'s>,
ndc_version: NdcVersion,
relationships: &mut BTreeMap<NdcRelationshipName, relationships::Relationship>,
relationships: &mut BTreeMap<NdcRelationshipName, Relationship>,
) -> Result<
(
IndexMap<NdcFieldAlias, field::UnresolvedField<'s>>,

View File

@ -159,7 +159,7 @@ async fn explain_query_predicate<'s>(
.map_err(|e| execute::RequestError::ExplainError(e.to_string()))?;
let query_execution_plan = execute::ResolvedQueryExecutionPlan {
remote_predicates: plan::PredicateQueryTrees::new(),
remote_predicates: plan_types::PredicateQueryTrees::new(),
query_node: resolved_query_node,
collection: target_model_source.collection.clone(),
arguments: BTreeMap::new(),

View File

@ -6,10 +6,7 @@ mod types;
pub use explain::execute_explain;
pub use explain::types::{redact_ndc_explain, ExplainResponse};
pub use steps::{
build_ir, build_request_plan, generate_ir, normalize_request, parse_query,
resolve_ndc_query_execution,
};
pub use steps::{build_ir, build_request_plan, generate_ir, normalize_request, parse_query};
pub use to_opendd_ir::to_opendd_ir;
pub use query::{

View File

@ -157,27 +157,3 @@ pub fn analyze_query_usage<'s>(
},
)
}
// run ndc query, do any joins, and process result
// we only use this in engine tests at the moment, and it will be removed as soon
// as we're able to use the new pipeline with the existing execution / process response
// for now, it allows us a cheap way to test our GraphQL -> OpenDD IR -> execute pipeline
pub async fn resolve_ndc_query_execution<'ir>(
http_context: &execute::HttpContext,
query_execution_plan: execute::ResolvedQueryExecutionPlan,
) -> Result<Vec<ndc_models::RowSet>, execute::FieldError> {
let data_connector = query_execution_plan.data_connector.clone();
let query_request = execute::plan::ndc_request::make_ndc_query_request(query_execution_plan)?;
let response = execute::ndc::execute_ndc_query(
http_context,
&query_request,
&data_connector,
"graphql",
"graphql".to_owned(),
None, // TODO: plumb in project id
)
.await?;
Ok(response.as_latest_rowsets())
}

View File

@ -11,6 +11,7 @@ jsonpath = { path = "../utils/jsonpath" }
metadata-resolve = { path = "../metadata-resolve" }
open-dds = { path = "../open-dds" }
plan = { path = "../plan" }
plan-types = { path = "../plan-types" }
tracing-util = { path = "../utils/tracing-util" }
axum = { workspace = true }

View File

@ -6,7 +6,9 @@ use super::types::{QueryResult, RelationshipTree, RequestError};
use crate::catalog::{Catalog, Model, State};
use axum::http::{HeaderMap, Method, Uri};
use hasura_authn_core::Session;
use indexmap::IndexMap;
use metadata_resolve::Metadata;
use plan_types::{ExecutionTree, JoinLocations, NDCQueryExecution, ProcessResponseAs};
use tracing_util::SpanVisibility;
#[allow(clippy::unused_async)]
@ -103,8 +105,20 @@ async fn query_engine_execute(
.await
.map_err(RequestError::PlanError)?;
match query_execution_plan {
plan::SingleNodeExecutionPlan::Query(plan) => {
let rowsets = resolve_ndc_query_execution(http_context, plan)
plan::SingleNodeExecutionPlan::Query(query_execution_plan) => {
let ndc_query_execution = NDCQueryExecution {
execution_span_attribute: "REST",
execution_tree: ExecutionTree {
query_execution_plan,
remote_join_executions: JoinLocations {
locations: IndexMap::new(),
},
},
field_span_attribute: "REST".into(),
process_response_as: ProcessResponseAs::Array { is_nullable: false },
};
let rowsets =
execute::resolve_ndc_query_execution(http_context, ndc_query_execution, None)
.await
.map_err(RequestError::ExecuteError)?;
@ -118,24 +132,3 @@ async fn query_engine_execute(
}
}
}
// run ndc query, do any joins, and process result
async fn resolve_ndc_query_execution<'ir>(
http_context: &execute::HttpContext,
query_execution_plan: execute::ResolvedQueryExecutionPlan,
) -> Result<Vec<ndc_models::RowSet>, execute::FieldError> {
let data_connector = query_execution_plan.data_connector.clone();
let query_request = execute::plan::ndc_request::make_ndc_query_request(query_execution_plan)?;
let response = execute::ndc::execute_ndc_query(
http_context,
&query_request,
&data_connector,
"jsonapi",
"jsonapi".to_owned(),
None, // TODO: plumb in project id
)
.await?;
Ok(response.as_latest_rowsets())
}

View File

@ -3,6 +3,7 @@ mod aggregates;
mod arguments;
mod field;
mod filter;
mod mutation;
mod order_by;
mod query;
mod relationships;
@ -11,11 +12,12 @@ use lang_graphql::ast::common as ast;
use std::sync::Arc;
pub use aggregates::{AggregateFieldSelection, AggregateSelectionSet};
pub use arguments::Argument;
pub use arguments::{Argument, MutationArgument};
pub use field::{Field, NestedArray, NestedField, NestedObject};
pub use filter::ResolvedFilterExpression;
pub use mutation::MutationExecutionPlan;
pub use order_by::{OrderByDirection, OrderByElement, OrderByTarget};
pub use query::{QueryExecutionPlan, QueryNodeNew};
pub use query::{FieldsSelection, PredicateQueryTrees, QueryExecutionPlan, QueryNodeNew};
pub use relationships::{Relationship, RelationshipArgument};
pub use remote_joins::{
JoinLocations, JoinNode, LocationKind, RemoteJoin, RemoteJoinArgument, SourceFieldAlias,
@ -32,6 +34,18 @@ pub struct NDCQueryExecution {
pub process_response_as: ProcessResponseAs,
}
#[derive(Debug)]
pub struct NDCMutationExecution {
pub execution_node: mutation::MutationExecutionPlan,
pub join_locations: JoinLocations,
pub data_connector: Arc<metadata_resolve::DataConnectorLink>,
pub execution_span_attribute: String,
pub field_span_attribute: String,
pub process_response_as: ProcessResponseAs,
// leaving this out for now as it's GraphQL specific stuff
// pub selection_set: &'n normalized_ast::SelectionSet<'s, GDS>,
}
#[derive(Debug)]
pub struct ExecutionTree {
pub query_execution_plan: query::QueryExecutionPlan,

View File

@ -18,3 +18,15 @@ pub enum Argument {
predicate: ResolvedFilterExpression,
},
}
/// Argument plan to express various kinds of arguments
#[derive(Debug, Clone, PartialEq)]
pub enum MutationArgument {
/// The argument is provided as a literal value
Literal {
value: serde_json::Value,
},
BooleanExpression {
predicate: ResolvedFilterExpression,
},
}

View File

@ -0,0 +1,22 @@
use crate::NdcRelationshipName;
use open_dds::{commands::ProcedureName, types::DataConnectorArgumentName};
use std::collections::BTreeMap;
use std::sync::Arc;
use super::arguments;
use super::field;
use super::relationships;
#[derive(Debug)]
pub struct MutationExecutionPlan {
/// The name of a procedure
pub procedure_name: ProcedureName,
/// Any named procedure arguments
pub procedure_arguments: BTreeMap<DataConnectorArgumentName, arguments::MutationArgument>,
/// The fields to return from the result, or null to return everything
pub procedure_fields: Option<field::NestedField>,
/// Any relationships between collections involved in the query request
pub collection_relationships: BTreeMap<NdcRelationshipName, relationships::Relationship>,
/// The data connector used to fetch the data
pub data_connector: Arc<metadata_resolve::DataConnectorLink>,
}

View File

@ -7,11 +7,12 @@ mod usage_counts;
mod variable_name;
pub use execution_plan::{
AggregateFieldSelection, AggregateSelectionSet, Argument, ExecutionTree, Field, JoinLocations,
JoinNode, LocationKind, NDCQueryExecution, NestedArray, NestedField, NestedObject,
OrderByDirection, OrderByElement, OrderByTarget, ProcessResponseAs, QueryExecutionPlan,
QueryNodeNew, Relationship, RelationshipArgument, RemoteJoin, RemoteJoinArgument,
ResolvedFilterExpression, SourceFieldAlias, TargetField,
AggregateFieldSelection, AggregateSelectionSet, Argument, ExecutionTree, Field,
FieldsSelection, JoinLocations, JoinNode, LocationKind, MutationArgument,
MutationExecutionPlan, NDCMutationExecution, NDCQueryExecution, NestedArray, NestedField,
NestedObject, OrderByDirection, OrderByElement, OrderByTarget, PredicateQueryTrees,
ProcessResponseAs, QueryExecutionPlan, QueryNodeNew, Relationship, RelationshipArgument,
RemoteJoin, RemoteJoinArgument, ResolvedFilterExpression, SourceFieldAlias, TargetField,
};
pub use expression::{
ComparisonTarget, ComparisonValue, Expression, LocalFieldComparison, RelationshipColumnMapping,

View File

@ -1,9 +1,9 @@
use super::types::PlanError;
use execute::plan::ResolvedFilterExpression;
use metadata_resolve::{Qualified, TypeMapping};
use open_dds::{
data_connector::DataConnectorName, query::BooleanExpression, types::CustomTypeName,
};
use plan_types::ResolvedFilterExpression;
use std::collections::BTreeMap;
use super::column::{to_resolved_column, ResolvedColumn};

View File

@ -18,11 +18,12 @@ pub use types::{NDCFunction, NDCProcedure, NDCQuery, QueryContext};
use hasura_authn_core::Session;
use metadata_resolve::Metadata;
use open_dds::query::{Query, QueryRequest};
use plan_types::FieldsSelection;
// temporary type, we assume only one node atm
pub enum SingleNodeExecutionPlan {
Query(execute::plan::ResolvedQueryExecutionPlan),
Mutation(execute::plan::ResolvedMutationExecutionPlan),
Query(plan_types::QueryExecutionPlan),
Mutation(plan_types::MutationExecutionPlan),
}
// make a query execution plan, assuming an OpenDD IR with a single model request
@ -96,7 +97,9 @@ where
let query_execution_plan = model::ndc_query_to_query_execution_plan(
&ndc_query,
&IndexMap::new(),
&FieldsSelection {
fields: IndexMap::new(),
},
&aggregate_fields,
);
let query_context = QueryContext { type_name };

View File

@ -6,11 +6,6 @@ use super::field_selection;
use crate::PlanError;
use crate::{NDCFunction, NDCProcedure};
use execute::ndc::FUNCTION_IR_VALUE_COLUMN_NAME;
use execute::plan::{
field::{NestedArray, NestedField, ResolvedField, ResolvedNestedField},
Argument, MutationArgument, ResolvedMutationExecutionPlan, ResolvedQueryExecutionPlan,
ResolvedQueryNode,
};
use hasura_authn_core::Session;
use metadata_resolve::{
unwrap_custom_type_name, Metadata, Qualified, QualifiedBaseType, QualifiedTypeName,
@ -22,7 +17,10 @@ use open_dds::{
data_connector::{CollectionName, DataConnectorColumnName},
types::CustomTypeName,
};
use plan_types::NdcFieldAlias;
use plan_types::{
Argument, Field, MutationArgument, MutationExecutionPlan, NdcFieldAlias, NestedArray,
NestedField, NestedObject, PredicateQueryTrees, QueryExecutionPlan, QueryNodeNew,
};
#[derive(Debug)]
pub enum CommandPlan {
@ -142,7 +140,7 @@ pub fn from_command(
&command_source.type_mappings,
)?;
let ndc_field = ResolvedField::Column {
let ndc_field = Field::Column {
column: field_mapping.column.clone(),
fields,
arguments: BTreeMap::new(),
@ -158,11 +156,9 @@ pub fn from_command(
// response headers in the SQL layer yet
Some(response_config) if !command_source.ndc_type_opendd_type_same => {
let result_field_name = NdcFieldAlias::from(response_config.result_field.as_str());
let result_field = ResolvedField::Column {
let result_field = Field::Column {
column: response_config.result_field.clone(),
fields: Some(execute::plan::field::NestedField::Object(
execute::plan::field::NestedObject { fields: ndc_fields },
)),
fields: Some(NestedField::Object(NestedObject { fields: ndc_fields })),
arguments: BTreeMap::new(),
};
let fields = IndexMap::from_iter([(result_field_name, result_field)]);
@ -237,15 +233,12 @@ pub fn from_command(
fn wrap_procedure_ndc_fields(
output_shape: &OutputShape,
ndc_fields: IndexMap<NdcFieldAlias, ResolvedField>,
) -> ResolvedNestedField {
ndc_fields: IndexMap<NdcFieldAlias, Field>,
) -> NestedField {
match output_shape {
OutputShape::Object => {
NestedField::Object(execute::plan::field::NestedObject { fields: ndc_fields })
}
OutputShape::Object => NestedField::Object(NestedObject { fields: ndc_fields }),
OutputShape::ListOfObjects => {
let nested_fields =
NestedField::Object(execute::plan::field::NestedObject { fields: ndc_fields });
let nested_fields = NestedField::Object(NestedObject { fields: ndc_fields });
NestedField::Array(NestedArray {
fields: Box::new(nested_fields),
})
@ -261,15 +254,12 @@ enum OutputShape {
fn wrap_function_ndc_fields(
output_shape: &OutputShape,
ndc_fields: IndexMap<NdcFieldAlias, ResolvedField>,
) -> IndexMap<NdcFieldAlias, ResolvedField> {
ndc_fields: IndexMap<NdcFieldAlias, Field>,
) -> IndexMap<NdcFieldAlias, Field> {
let value_field = match output_shape {
OutputShape::Object => {
NestedField::Object(execute::plan::field::NestedObject { fields: ndc_fields })
}
OutputShape::Object => NestedField::Object(NestedObject { fields: ndc_fields }),
OutputShape::ListOfObjects => {
let nested_fields =
NestedField::Object(execute::plan::field::NestedObject { fields: ndc_fields });
let nested_fields = NestedField::Object(NestedObject { fields: ndc_fields });
NestedField::Array(NestedArray {
fields: Box::new(nested_fields),
})
@ -277,7 +267,7 @@ fn wrap_function_ndc_fields(
};
IndexMap::from([(
NdcFieldAlias::from(FUNCTION_IR_VALUE_COLUMN_NAME),
execute::plan::field::Field::Column {
Field::Column {
column: open_dds::data_connector::DataConnectorColumnName::from(
FUNCTION_IR_VALUE_COLUMN_NAME,
),
@ -306,17 +296,13 @@ fn return_type_shape(output_type: &QualifiedTypeReference) -> Option<OutputShape
}
}
pub fn execute_plan_from_function(function: &NDCFunction) -> ResolvedQueryExecutionPlan {
ResolvedQueryExecutionPlan {
remote_predicates: execute::plan::PredicateQueryTrees::new(),
query_node: ResolvedQueryNode {
fields: Some(
function
.fields
.iter()
.map(|(field_name, field)| (field_name.clone(), field.clone()))
.collect(),
),
pub fn execute_plan_from_function(function: &NDCFunction) -> QueryExecutionPlan {
QueryExecutionPlan {
remote_predicates: PredicateQueryTrees::new(),
query_node: QueryNodeNew {
fields: Some(plan_types::FieldsSelection {
fields: function.fields.clone(),
}),
aggregates: None,
limit: None,
offset: None,
@ -342,8 +328,8 @@ pub fn execute_plan_from_function(function: &NDCFunction) -> ResolvedQueryExecut
}
}
pub fn execute_plan_from_procedure(procedure: &NDCProcedure) -> ResolvedMutationExecutionPlan {
ResolvedMutationExecutionPlan {
pub fn execute_plan_from_procedure(procedure: &NDCProcedure) -> MutationExecutionPlan {
MutationExecutionPlan {
procedure_name: procedure.procedure_name.clone(),
procedure_arguments: procedure
.arguments

View File

@ -4,10 +4,6 @@ use indexmap::IndexMap;
use std::collections::BTreeMap;
use std::sync::Arc;
use execute::plan::{
field::{Field, NestedArray, NestedField},
ResolvedFilterExpression,
};
use metadata_resolve::{Metadata, Qualified, QualifiedTypeReference, TypeMapping};
use open_dds::{
models::ModelName,
@ -15,7 +11,7 @@ use open_dds::{
query::ObjectFieldSelection,
types::{CustomTypeName, FieldName},
};
use plan_types::NdcFieldAlias;
use plan_types::{Field, NdcFieldAlias, NestedArray, NestedField, NestedObject};
pub fn from_field_selection(
field_selection: &ObjectFieldSelection,
@ -27,7 +23,7 @@ pub fn from_field_selection(
model_object_type: &metadata_resolve::ObjectTypeWithRelationships,
field_mappings: &BTreeMap<FieldName, metadata_resolve::FieldMapping>,
type_permissions: &TypeOutputPermission,
) -> Result<Field<ResolvedFilterExpression>, PlanError> {
) -> Result<Field, PlanError> {
if !type_permissions
.allowed_fields
.contains(&field_selection.target.field_name)
@ -77,7 +73,7 @@ pub fn ndc_nested_field_selection_for(
metadata: &Metadata,
column_type: &QualifiedTypeReference,
type_mappings: &BTreeMap<Qualified<CustomTypeName>, TypeMapping>,
) -> Result<Option<NestedField<ResolvedFilterExpression>>, PlanError> {
) -> Result<Option<NestedField>, PlanError> {
match &column_type.underlying_type {
metadata_resolve::QualifiedBaseType::Named(name) => match name {
metadata_resolve::QualifiedTypeName::Custom(name) => {
@ -98,8 +94,7 @@ pub fn ndc_nested_field_selection_for(
let field_def = object_type.object_type.fields.get(field_name).ok_or_else(|| PlanError::Internal(format!(
"can't find object field definition for field {field_name} in type: {name}"
)))?;
let nested_fields: Option<NestedField<ResolvedFilterExpression>> =
ndc_nested_field_selection_for(
let nested_fields: Option<NestedField> = ndc_nested_field_selection_for(
metadata,
&field_def.field_type,
type_mappings,
@ -114,9 +109,7 @@ pub fn ndc_nested_field_selection_for(
);
}
return Ok(Some(NestedField::Object(
execute::plan::field::NestedObject { fields },
)));
return Ok(Some(NestedField::Object(NestedObject { fields })));
}
Err(PlanError::Internal(format!(

View File

@ -8,10 +8,6 @@ use indexmap::IndexMap;
use std::collections::BTreeMap;
use std::sync::Arc;
use execute::{
plan::{field::Field, ResolvedFilterExpression},
QueryExecutionPlan, QueryNode,
};
use hasura_authn_core::Session;
use metadata_resolve::{Metadata, Qualified};
use open_dds::query::{
@ -20,7 +16,8 @@ use open_dds::query::{
};
use open_dds::types::CustomTypeName;
use plan_types::{
AggregateFieldSelection, AggregateSelectionSet, NdcFieldAlias, NdcRelationshipName,
AggregateFieldSelection, AggregateSelectionSet, Field, FieldsSelection, NdcFieldAlias,
NdcRelationshipName, PredicateQueryTrees, QueryExecutionPlan, QueryNodeNew, Relationship,
};
pub async fn from_model_aggregate_selection(
@ -116,14 +113,7 @@ pub async fn from_model_selection(
session: &Arc<Session>,
http_context: &Arc<execute::HttpContext>,
request_headers: &reqwest::header::HeaderMap,
) -> Result<
(
Qualified<CustomTypeName>,
NDCQuery,
IndexMap<NdcFieldAlias, Field<ResolvedFilterExpression>>,
),
PlanError,
> {
) -> Result<(Qualified<CustomTypeName>, NDCQuery, FieldsSelection), PlanError> {
let model_target = &model_selection.target;
let qualified_model_name = metadata_resolve::Qualified::new(
model_target.subgraph.clone(),
@ -223,7 +213,11 @@ pub async fn from_model_selection(
// collect relationships accummulated in this scope.
query.collection_relationships.append(&mut relationships);
Ok((model.model.data_type.clone(), query, ndc_fields))
Ok((
model.model.data_type.clone(),
query,
FieldsSelection { fields: ndc_fields },
))
}
pub async fn from_relationship_selection(
@ -235,8 +229,8 @@ pub async fn from_relationship_selection(
model: &metadata_resolve::ModelWithArgumentPresets,
model_source: &Arc<metadata_resolve::ModelSource>,
model_object_type: &metadata_resolve::ObjectTypeWithRelationships,
relationships: &mut BTreeMap<plan_types::NdcRelationshipName, execute::plan::Relationship>,
) -> Result<Field<ResolvedFilterExpression>, PlanError> {
relationships: &mut BTreeMap<plan_types::NdcRelationshipName, Relationship>,
) -> Result<Field, PlanError> {
let RelationshipSelection { target, selection } = relationship_selection;
let (_, relationship_field) = model_object_type
.relationship_fields
@ -360,18 +354,13 @@ pub async fn from_relationship_selection(
// take NDCQuery and fields and make a sweet execution plan
pub fn ndc_query_to_query_execution_plan(
query: &NDCQuery,
fields: &IndexMap<NdcFieldAlias, Field<ResolvedFilterExpression>>,
fields: &FieldsSelection,
aggregate_fields: &IndexMap<NdcFieldAlias, AggregateFieldSelection>,
) -> QueryExecutionPlan<ResolvedFilterExpression> {
let query_fields: Option<IndexMap<_, _>> = if fields.is_empty() {
) -> QueryExecutionPlan {
let query_fields: Option<FieldsSelection> = if fields.fields.is_empty() {
None
} else {
Some(
fields
.iter()
.map(|(field_name, field)| (field_name.clone(), field.clone()))
.collect(),
)
Some(fields.clone())
};
let query_aggregate_fields = if aggregate_fields.is_empty() {
@ -390,8 +379,8 @@ pub fn ndc_query_to_query_execution_plan(
};
QueryExecutionPlan {
remote_predicates: execute::plan::PredicateQueryTrees::new(),
query_node: QueryNode {
remote_predicates: PredicateQueryTrees::new(),
query_node: QueryNodeNew {
fields: query_fields,
aggregates: query_aggregate_fields,
limit: query.limit,

View File

@ -5,22 +5,23 @@ use crate::order_by::to_resolved_order_by_element;
use crate::types::PlanError;
use std::collections::BTreeMap;
use execute::{plan::ResolvedFilterExpression, HttpContext};
use execute::HttpContext;
use hasura_authn_core::Session;
use metadata_resolve::{FilterPermission, ModelExpressionType};
use open_dds::query::ModelTarget;
use plan_types::{Argument, Relationship, ResolvedFilterExpression};
// Turn a `plan_types::Expression` into `execute::ResolvedFilterExpression`
// Currently this works by running all the remote predicates, soon it won't need to
async fn resolve_filter_expression(
filter_ir: &plan_types::Expression<'_>,
relationships: &mut BTreeMap<plan_types::NdcRelationshipName, execute::plan::Relationship>,
relationships: &mut BTreeMap<plan_types::NdcRelationshipName, Relationship>,
http_context: &HttpContext,
) -> Result<ResolvedFilterExpression, PlanError> {
let filter_plan = execute::plan::plan_expression(
filter_ir,
relationships,
&mut execute::plan::PredicateQueryTrees::new(),
&mut plan_types::PredicateQueryTrees::new(),
)
.map_err(|e| PlanError::Internal(format!("error constructing permission filter plan: {e}")))?;
@ -62,7 +63,7 @@ pub async fn model_target_to_ndc_query(
})?;
let mut usage_counts = plan_types::UsagesCounts::default();
let mut relationships: BTreeMap<plan_types::NdcRelationshipName, execute::plan::Relationship> =
let mut relationships: BTreeMap<plan_types::NdcRelationshipName, Relationship> =
BTreeMap::new();
let permission_filter = match &model_select_permission.filter {
@ -134,11 +135,11 @@ pub async fn model_target_to_ndc_query(
let resolved_filter_expression =
resolve_filter_expression(predicate, &mut relationships, http_context).await?;
execute::plan::Argument::BooleanExpression {
Argument::BooleanExpression {
predicate: resolved_filter_expression,
}
}
graphql_ir::Argument::Literal { value } => execute::plan::Argument::Literal {
graphql_ir::Argument::Literal { value } => Argument::Literal {
value: value.clone(),
},
};

View File

@ -1,14 +1,16 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use execute::plan::{Relationship, ResolvedField, ResolvedFilterExpression, ResolvedNestedField};
use metadata_resolve::Qualified;
use open_dds::{
commands::{FunctionName, ProcedureName},
data_connector::CollectionName,
types::{CustomTypeName, DataConnectorArgumentName},
};
use plan_types::{NdcFieldAlias, NdcRelationshipName};
use plan_types::{
Argument, Field, NdcFieldAlias, NdcRelationshipName, NestedField, Relationship,
ResolvedFilterExpression,
};
use indexmap::IndexMap;
@ -24,7 +26,7 @@ pub struct QueryContext {
// intermediate type constructed whilst planning a model selection
pub struct NDCQuery {
pub collection_name: CollectionName,
pub arguments: BTreeMap<DataConnectorArgumentName, execute::plan::ResolvedArgument>,
pub arguments: BTreeMap<DataConnectorArgumentName, Argument>,
pub filter: Option<ResolvedFilterExpression>,
pub order_by: ResolvedOrderBy<'static>,
pub limit: Option<u32>,
@ -36,7 +38,7 @@ pub struct NDCQuery {
#[derive(Debug, Clone)]
pub struct NDCFunction {
pub function_name: FunctionName,
pub fields: IndexMap<NdcFieldAlias, ResolvedField>,
pub fields: IndexMap<NdcFieldAlias, Field>,
pub arguments: BTreeMap<DataConnectorArgumentName, serde_json::Value>,
pub collection_relationships: BTreeMap<NdcRelationshipName, Relationship>,
pub data_connector: Arc<metadata_resolve::DataConnectorLink>,
@ -46,7 +48,7 @@ pub struct NDCFunction {
pub struct NDCProcedure {
pub procedure_name: ProcedureName,
pub arguments: BTreeMap<DataConnectorArgumentName, serde_json::Value>,
pub fields: Option<ResolvedNestedField>,
pub fields: Option<NestedField>,
pub collection_relationships: BTreeMap<NdcRelationshipName, Relationship>,
pub data_connector: Arc<metadata_resolve::DataConnectorLink>,
}

View File

@ -150,9 +150,7 @@ impl ExecutionPlan for NDCFunctionPushDown {
let query_execution_plan = plan::execute_plan_from_function(&self.function);
let query_request = execute::plan::ndc_request::make_ndc_query_request(
query_execution_plan,
)
let query_request = execute::make_ndc_query_request(query_execution_plan)
.map_err(|e| DataFusionError::Internal(format!("error creating ndc request: {e}")))?;
let fut = fetch_from_data_connector(

View File

@ -167,10 +167,8 @@ impl ExecutionPlan for NDCProcedurePushDown {
let execution_plan = plan::execute_plan_from_procedure(&self.procedure);
let query_request = execute::plan::ndc_request::make_ndc_mutation_request(execution_plan)
.map_err(|e| {
DataFusionError::Internal(format!("error creating ndc request: {e}"))
})?;
let query_request = execute::make_ndc_mutation_request(execution_plan)
.map_err(|e| DataFusionError::Internal(format!("error creating ndc request: {e}")))?;
let fut = fetch_from_data_connector(
self.projected_schema.clone(),

View File

@ -36,8 +36,8 @@ use tracing_util::{FutureExt, SpanVisibility, TraceableError};
use super::common::from_plan_error;
use crate::catalog::model::filter;
use execute::{ndc::NdcQueryResponse, plan::ResolvedField, HttpContext};
use plan_types::{AggregateFieldSelection, NdcFieldAlias};
use execute::{ndc::NdcQueryResponse, HttpContext};
use plan_types::{AggregateFieldSelection, FieldsSelection, NdcFieldAlias};
use plan::{
from_model_aggregate_selection, from_model_selection, ndc_query_to_query_execution_plan,
@ -600,12 +600,15 @@ impl ExecutionPlan for NDCAggregatePushdown {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let query_execution_plan =
ndc_query_to_query_execution_plan(&self.query, &IndexMap::new(), &self.fields);
let query_execution_plan = ndc_query_to_query_execution_plan(
&self.query,
&FieldsSelection {
fields: IndexMap::new(),
},
&self.fields,
);
let query_request = execute::plan::ndc_request::make_ndc_query_request(
query_execution_plan,
)
let query_request = execute::make_ndc_query_request(query_execution_plan)
.map_err(|e| DataFusionError::Internal(format!("error creating ndc request: {e}")))?;
let fut = fetch_aggregates_from_data_connector(
@ -628,7 +631,7 @@ impl ExecutionPlan for NDCAggregatePushdown {
#[derive(Debug, Clone)]
pub(crate) struct NDCQueryPushDown {
http_context: Arc<execute::HttpContext>,
fields: IndexMap<NdcFieldAlias, ResolvedField>,
fields: FieldsSelection,
query: NDCQuery,
projected_schema: SchemaRef,
cache: PlanProperties,
@ -639,7 +642,7 @@ impl NDCQueryPushDown {
pub(crate) fn new(
http_context: Arc<HttpContext>,
schema: SchemaRef,
fields: IndexMap<NdcFieldAlias, ResolvedField>,
fields: FieldsSelection,
query: NDCQuery,
) -> Self {
let cache = Self::compute_properties(schema.clone());
@ -716,9 +719,7 @@ impl ExecutionPlan for NDCQueryPushDown {
let query_execution_plan =
ndc_query_to_query_execution_plan(&self.query, &self.fields, &IndexMap::new());
let query_request = execute::plan::ndc_request::make_ndc_query_request(
query_execution_plan,
)
let query_request = execute::make_ndc_query_request(query_execution_plan)
.map_err(|e| DataFusionError::Internal(format!("error creating ndc request: {e}")))?;
let fut = fetch_from_data_connector(