[PACHA-14] Order by pushdown (#970)

<!-- The PR description should answer 2 important questions: -->

### What

Implements a new optimizer pass which pushes sort stages inside
`ModelQuery` stages.

### How

<!-- How is it trying to accomplish it (what are the implementation
steps)? -->

V3_GIT_ORIGIN_REV_ID: 2820f88003aec376b71605c0f753d7b50825ddad
This commit is contained in:
Phil Freeman 2024-08-16 16:36:19 -07:00 committed by hasura-bot
parent 5198969358
commit 3d25939f0c
15 changed files with 543 additions and 280 deletions

View File

@ -428,6 +428,7 @@ fn make_order_by_target(target: ir::OrderByTarget) -> ndc_models_v01::OrderByTar
match target {
ir::OrderByTarget::Column {
name,
field_path,
relationship_path,
} => {
let mut order_by_element_path = Vec::new();
@ -468,7 +469,12 @@ fn make_order_by_target(target: ir::OrderByTarget) -> ndc_models_v01::OrderByTar
ndc_models_v01::OrderByTarget::Column {
name: ndc_models_v01::FieldName::new(name.into_inner()),
path: order_by_element_path,
field_path: None,
field_path: field_path.map(|field_path| {
field_path
.iter()
.map(|name| ndc_models_v01::FieldName::from(name.as_str()))
.collect()
}),
}
}
}

View File

@ -429,6 +429,7 @@ fn make_order_by_target(target: ir::OrderByTarget) -> ndc_models_v02::OrderByTar
match target {
ir::OrderByTarget::Column {
name,
field_path,
relationship_path,
} => {
let mut order_by_element_path = Vec::new();
@ -469,7 +470,12 @@ fn make_order_by_target(target: ir::OrderByTarget) -> ndc_models_v02::OrderByTar
ndc_models_v02::OrderByTarget::Column {
name: ndc_models_v02::FieldName::new(name.into_inner()),
path: order_by_element_path,
field_path: None,
field_path: field_path.map(|field_path| {
field_path
.iter()
.map(|name| ndc_models_v02::FieldName::from(name.as_str()))
.collect()
}),
}
}
}

View File

@ -36,7 +36,7 @@ pub use global_id::{global_id_col_format, GLOBAL_ID_VERSION};
pub use model_selection::ModelSelection;
pub use model_tracking::{get_all_usage_counts_in_query, UsagesCounts};
pub use mutation_root::generate_ir as generate_mutation_ir;
pub use order_by::{OrderByElement, OrderByTarget};
pub use order_by::{OrderByElement, OrderByTarget, ResolvedOrderBy};
pub use permissions::process_model_predicate;
pub use query_root::generate_ir as generate_query_ir;
pub use relationship::{

View File

@ -31,6 +31,7 @@ pub struct OrderByElement {
pub enum OrderByTarget {
Column {
name: DataConnectorColumnName,
field_path: Option<Vec<DataConnectorColumnName>>,
relationship_path: Vec<NdcRelationshipName>,
},
}
@ -143,6 +144,7 @@ pub fn build_ndc_order_by_element<'s>(
// TODO(naveen): When aggregates are supported, extend this to support other ndc_models::OrderByTarget
target: OrderByTarget::Column {
name: ndc_column.clone(),
field_path: None,
relationship_path: relationship_paths,
},
};

View File

@ -5,6 +5,8 @@ use indexmap::IndexMap;
use metadata_resolve::{self as resolved};
use serde::{Deserialize, Serialize};
use crate::execute::optimizer;
mod datafusion {
pub(super) use datafusion::{
catalog::{CatalogProvider, SchemaProvider},
@ -139,7 +141,8 @@ impl Catalog {
let session_state = datafusion::SessionStateBuilder::new()
.with_config(session_config)
.with_query_planner(query_planner)
.with_optimizer_rule(Arc::new(super::execute::optimizer::ReplaceTableScan {}))
.with_optimizer_rule(Arc::new(optimizer::ReplaceTableScan {}))
.with_optimizer_rule(Arc::new(optimizer::NDCPushDownSort {}))
.with_expr_planners(datafusion::SessionStateDefaults::default_expr_planners())
.with_scalar_functions(datafusion::SessionStateDefaults::default_scalar_functions())
.with_aggregate_functions(

View File

@ -1,5 +1,6 @@
//! Describe a model for a SQL table and how to translate datafusion operations on the table
//! to ndc-spec queries.
pub(crate) mod common;
pub(crate) mod filter;
use std::collections::BTreeMap;

View File

@ -0,0 +1,124 @@
use datafusion::{
common::Column,
error::DataFusionError,
logical_expr::{expr::ScalarFunction, Expr},
scalar::ScalarValue,
};
use indexmap::IndexMap;
use open_dds::{
identifier::Identifier,
query::{ObjectFieldOperand, ObjectFieldTarget},
types::FieldName,
};
pub(crate) fn try_into_column(
expr: &Expr,
) -> datafusion::error::Result<Option<(Vec<FieldName>, &Column)>> {
let mut path = vec![];
let mut expr = expr;
loop {
match expr {
Expr::Column(column) => {
return Ok(Some((path, column)));
}
Expr::ScalarFunction(ScalarFunction { func, args }) if func.name() == "get_field" => {
let [inner_expr, Expr::Literal(ScalarValue::Utf8(Some(field_name)))] =
args.as_slice()
else {
return Ok(None);
};
let ident = Identifier::new(field_name).map_err(|e| {
DataFusionError::Internal(format!("invalid identifier in path: {e}"))
})?;
path.push(FieldName::new(ident));
expr = inner_expr;
}
_ => {
return Ok(None);
}
}
}
}
pub(crate) fn to_operand(
column: &datafusion::prelude::Column,
path: Vec<FieldName>,
) -> datafusion::error::Result<open_dds::query::Operand> {
let mut nested = None;
let mut path_rev = path;
path_rev.reverse();
for field_name in path_rev {
nested = Some(Box::new(open_dds::query::Operand::Field(
ObjectFieldOperand {
target: Box::new(ObjectFieldTarget {
field_name,
arguments: IndexMap::new(),
}),
nested: None,
},
)));
}
Ok(open_dds::query::Operand::Field(ObjectFieldOperand {
target: Box::new(ObjectFieldTarget {
field_name: FieldName::new(Identifier::new(column.name.clone()).map_err(|e| {
datafusion::error::DataFusionError::Internal(format!(
"cannot convert binary expr left-hand-side: {e}"
))
})?),
arguments: IndexMap::new(),
}),
nested,
}))
}
pub(crate) fn to_value(
value: &datafusion::scalar::ScalarValue,
) -> datafusion::error::Result<serde_json::Value> {
match value {
datafusion::scalar::ScalarValue::Null => Ok(serde_json::Value::Null),
datafusion::scalar::ScalarValue::Boolean(b) => {
Ok(b.map_or(serde_json::Value::Null, serde_json::Value::Bool))
}
datafusion::scalar::ScalarValue::Float32(f) => {
Ok(f.map_or(serde_json::Value::Null, serde_json::Value::from))
}
datafusion::scalar::ScalarValue::Int32(i) => {
Ok(i.map_or(serde_json::Value::Null, serde_json::Value::from))
}
datafusion::scalar::ScalarValue::Utf8(s) => {
Ok(s.as_ref().map_or(serde_json::Value::Null, |s| {
serde_json::Value::from(s.clone())
}))
}
// datafusion::scalar::ScalarValue::Float16(f) => todo!(),
// datafusion::scalar::ScalarValue::Float64(f) => todo!(),
// datafusion::scalar::ScalarValue::Decimal128(_, _, _) => todo!(),
// datafusion::scalar::ScalarValue::Decimal256(_, _, _) => todo!(),
// datafusion::scalar::ScalarValue::Int8(_) => todo!(),
// datafusion::scalar::ScalarValue::Int16(_) => todo!(),
// datafusion::scalar::ScalarValue::Int64(_) => todo!(),
// datafusion::scalar::ScalarValue::UInt8(_) => todo!(),
// datafusion::scalar::ScalarValue::UInt16(_) => todo!(),
// datafusion::scalar::ScalarValue::UInt32(_) => todo!(),
// datafusion::scalar::ScalarValue::UInt64(_) => todo!(),
// datafusion::scalar::ScalarValue::Date32(_) => todo!(),
// datafusion::scalar::ScalarValue::Date64(_) => todo!(),
// datafusion::scalar::ScalarValue::Time32Second(_) => todo!(),
// datafusion::scalar::ScalarValue::Time32Millisecond(_) => todo!(),
// datafusion::scalar::ScalarValue::Time64Microsecond(_) => todo!(),
// datafusion::scalar::ScalarValue::Time64Nanosecond(_) => todo!(),
// datafusion::scalar::ScalarValue::TimestampSecond(_, _) => todo!(),
// datafusion::scalar::ScalarValue::TimestampMillisecond(_, _) => todo!(),
// datafusion::scalar::ScalarValue::TimestampMicrosecond(_, _) => todo!(),
// datafusion::scalar::ScalarValue::TimestampNanosecond(_, _) => todo!(),
_ => Err(DataFusionError::Internal(format!(
"cannot convert literal to OpenDD literal: {value:?}"
))),
}
}

View File

@ -1,19 +1,9 @@
use std::sync::Arc;
use crate::catalog::model::Model;
use datafusion::{
common::Column,
error::{DataFusionError, Result},
logical_expr::{expr::ScalarFunction, BinaryExpr, Expr, Operator, TableProviderFilterPushDown},
scalar::ScalarValue,
};
use indexmap::IndexMap;
use datafusion::logical_expr::{BinaryExpr, Expr, Operator, TableProviderFilterPushDown};
use metadata_resolve::Metadata;
use open_dds::{
identifier::Identifier,
query::{ObjectFieldOperand, ObjectFieldTarget},
types::{FieldName, OperatorName},
};
use open_dds::types::OperatorName;
pub(crate) fn can_pushdown_filters(
_metadata: &Arc<Metadata>,
@ -62,36 +52,6 @@ pub(crate) fn can_pushdown_filter(expr: &Expr) -> bool {
pushdown_filter(expr).is_ok()
}
pub(crate) fn try_into_column(expr: &Expr) -> Result<Option<(Vec<FieldName>, &Column)>> {
let mut path = vec![];
let mut expr = expr;
loop {
match expr {
Expr::Column(column) => {
return Ok(Some((path, column)));
}
Expr::ScalarFunction(ScalarFunction { func, args }) if func.name() == "get_field" => {
let [inner_expr, Expr::Literal(ScalarValue::Utf8(Some(field_name)))] =
args.as_slice()
else {
return Ok(None);
};
let ident = Identifier::new(field_name).map_err(|e| {
DataFusionError::Internal(format!("invalid identifier in path: {e}"))
})?;
path.push(FieldName::new(ident));
expr = inner_expr;
}
_ => {
return Ok(None);
}
}
}
}
pub(crate) fn pushdown_filter(
expr: &datafusion::prelude::Expr,
) -> datafusion::error::Result<open_dds::query::BooleanExpression> {
@ -103,7 +63,8 @@ pub(crate) fn pushdown_filter(
// | Operator::LtEq
// | Operator::Gt
// | Operator::GtEq
let Some((path, column)) = try_into_column(left.as_ref())? else {
let Some((path, column)) = super::common::try_into_column(left.as_ref())?
else {
return Err(datafusion::error::DataFusionError::Internal(format!(
"unsupported left-hand-side in binary expression: {left:?}"
)));
@ -115,7 +76,7 @@ pub(crate) fn pushdown_filter(
)));
};
let operand = to_operand(column, path)?;
let operand = super::common::to_operand(column, path)?;
// TODO: here we pretend the _eq, _neq etc. operators exist
// in OpenDD, which are then mapped to the underlying NDC
@ -128,7 +89,9 @@ pub(crate) fn pushdown_filter(
_ => panic!("operator not handled in pushdown_filter"),
};
let argument = Box::new(open_dds::query::Value::Literal(to_value(value)?));
let argument = Box::new(open_dds::query::Value::Literal(
super::common::to_value(value)?,
));
Ok(open_dds::query::BooleanExpression::Comparison {
operand,
@ -161,13 +124,13 @@ pub(crate) fn pushdown_filter(
Ok(open_dds::query::BooleanExpression::Not(Box::new(expr)))
}
Expr::IsNull(expr_inner) | Expr::IsNotNull(expr_inner) => {
let Some((path, column)) = try_into_column(expr_inner.as_ref())? else {
let Some((path, column)) = super::common::try_into_column(expr_inner.as_ref())? else {
return Err(datafusion::error::DataFusionError::Internal(format!(
"unsupported argument in IS NULL expression: {expr_inner:?}"
)));
};
let operand = to_operand(column, path)?;
let operand = super::common::to_operand(column, path)?;
Ok(match expr {
Expr::IsNull(_) => open_dds::query::BooleanExpression::IsNull(operand),
@ -183,83 +146,3 @@ pub(crate) fn pushdown_filter(
))),
}
}
pub(crate) fn to_operand(
column: &datafusion::prelude::Column,
path: Vec<FieldName>,
) -> ::datafusion::error::Result<open_dds::query::Operand> {
let mut nested = None;
let mut path_rev = path;
path_rev.reverse();
for field_name in path_rev {
nested = Some(Box::new(open_dds::query::Operand::Field(
ObjectFieldOperand {
target: Box::new(ObjectFieldTarget {
field_name,
arguments: IndexMap::new(),
}),
nested: None,
},
)));
}
Ok(open_dds::query::Operand::Field(ObjectFieldOperand {
target: Box::new(ObjectFieldTarget {
field_name: FieldName::new(Identifier::new(column.name.clone()).map_err(|e| {
datafusion::error::DataFusionError::Internal(format!(
"cannot convert binary expr left-hand-side: {e}"
))
})?),
arguments: IndexMap::new(),
}),
nested,
}))
}
pub(crate) fn to_value(
value: &datafusion::scalar::ScalarValue,
) -> datafusion::error::Result<serde_json::Value> {
match value {
datafusion::scalar::ScalarValue::Null => Ok(serde_json::Value::Null),
datafusion::scalar::ScalarValue::Boolean(b) => {
Ok(b.map_or(serde_json::Value::Null, serde_json::Value::Bool))
}
datafusion::scalar::ScalarValue::Float32(f) => {
Ok(f.map_or(serde_json::Value::Null, serde_json::Value::from))
}
datafusion::scalar::ScalarValue::Int32(i) => {
Ok(i.map_or(serde_json::Value::Null, serde_json::Value::from))
}
datafusion::scalar::ScalarValue::Utf8(s) => {
Ok(s.as_ref().map_or(serde_json::Value::Null, |s| {
serde_json::Value::from(s.clone())
}))
}
// datafusion::scalar::ScalarValue::Float16(f) => todo!(),
// datafusion::scalar::ScalarValue::Float64(f) => todo!(),
// datafusion::scalar::ScalarValue::Decimal128(_, _, _) => todo!(),
// datafusion::scalar::ScalarValue::Decimal256(_, _, _) => todo!(),
// datafusion::scalar::ScalarValue::Int8(_) => todo!(),
// datafusion::scalar::ScalarValue::Int16(_) => todo!(),
// datafusion::scalar::ScalarValue::Int64(_) => todo!(),
// datafusion::scalar::ScalarValue::UInt8(_) => todo!(),
// datafusion::scalar::ScalarValue::UInt16(_) => todo!(),
// datafusion::scalar::ScalarValue::UInt32(_) => todo!(),
// datafusion::scalar::ScalarValue::UInt64(_) => todo!(),
// datafusion::scalar::ScalarValue::Date32(_) => todo!(),
// datafusion::scalar::ScalarValue::Date64(_) => todo!(),
// datafusion::scalar::ScalarValue::Time32Second(_) => todo!(),
// datafusion::scalar::ScalarValue::Time32Millisecond(_) => todo!(),
// datafusion::scalar::ScalarValue::Time64Microsecond(_) => todo!(),
// datafusion::scalar::ScalarValue::Time64Nanosecond(_) => todo!(),
// datafusion::scalar::ScalarValue::TimestampSecond(_, _) => todo!(),
// datafusion::scalar::ScalarValue::TimestampMillisecond(_, _) => todo!(),
// datafusion::scalar::ScalarValue::TimestampMicrosecond(_, _) => todo!(),
// datafusion::scalar::ScalarValue::TimestampNanosecond(_, _) => todo!(),
_ => Err(DataFusionError::Internal(format!(
"cannot convert literal to OpenDD literal: {value:?}"
))),
}
}

View File

@ -1,3 +1,5 @@
mod replace_table_scan;
mod sort_pushdown;
pub(crate) use replace_table_scan::ReplaceTableScan;
pub(crate) use sort_pushdown::NDCPushDownSort;

View File

@ -0,0 +1,70 @@
use std::sync::Arc;
use datafusion::{
common::{internal_err, tree_node::Transformed},
error::Result,
logical_expr::{Extension, LogicalPlan},
optimizer::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule},
};
use crate::execute::planner::model::ModelQuery;
pub struct NDCPushDownSort {}
impl OptimizerRule for NDCPushDownSort {
fn try_optimize(
&self,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called NDCPushDownSort::rewrite")
}
fn name(&self) -> &str {
"ndc_pushdown_sort"
}
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}
fn supports_rewrite(&self) -> bool {
true
}
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let Some((sort_by, fetch, model_query)) = (match plan {
LogicalPlan::Sort(ref sort) => match sort.input.as_ref() {
LogicalPlan::Extension(Extension { node }) => node
.as_ref()
.as_any()
.downcast_ref::<ModelQuery>()
.map(move |model_query| (&sort.expr, &sort.fetch, model_query)),
_ => None,
},
_ => None,
}) else {
return Ok(Transformed::no(plan));
};
if model_query.model_selection.target.limit.is_some() {
return Ok(Transformed::no(plan));
}
if model_query.model_selection.target.offset.is_some() {
return Ok(Transformed::no(plan));
}
if let Some(sorted_query) = model_query.sort(sort_by, *fetch)? {
let plan = LogicalPlan::Extension(Extension {
node: Arc::new(sorted_query),
});
Ok(Transformed::yes(plan))
} else {
Ok(Transformed::no(plan))
}
}
}

View File

@ -1,5 +1,7 @@
pub(crate) mod common;
pub(crate) mod filter;
pub(crate) mod model;
pub(crate) mod order_by;
use hasura_authn_core::Session;
use std::sync::Arc;

View File

@ -0,0 +1,144 @@
use std::collections::BTreeMap;
use datafusion::error::DataFusionError;
use metadata_resolve::{
FieldMapping, Qualified, QualifiedBaseType, QualifiedTypeName, TypeMapping,
};
use open_dds::{data_connector::DataConnectorColumnName, types::CustomTypeName};
#[derive(Debug, Clone)]
pub(crate) struct ResolvedColumn {
pub column_name: DataConnectorColumnName,
pub field_path: Vec<DataConnectorColumnName>,
pub field_mapping: FieldMapping,
}
/// Convert an ObjectFieldOperand into an NDC comparison target.
/// Also returns the FieldMapping for the enclosing object type, so
/// that additional mapping data (e.g. operators) can be extracted.
#[allow(clippy::assigning_clones)]
pub(crate) fn to_resolved_column(
metadata: &metadata_resolve::Metadata,
type_mappings: &BTreeMap<Qualified<CustomTypeName>, TypeMapping>,
type_name: &Qualified<CustomTypeName>,
model_object_type: &metadata_resolve::ObjectTypeWithRelationships,
operand: &open_dds::query::ObjectFieldOperand,
) -> datafusion::error::Result<ResolvedColumn> {
let TypeMapping::Object {
ndc_object_type_name: _,
field_mappings,
} = type_mappings.get(type_name).ok_or_else(|| {
DataFusionError::Internal(format!("can't find mapping object for type: {type_name}"))
})?;
// Walk down the tree of the ObjectFieldOperand, and maintain several pieces
// of state as we go:
// Keep track of the field mapping for the current object type:
let mut field_mapping = field_mappings
.get(&operand.target.field_name)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"can't find field {} in mapping for type: {}",
operand.target.field_name, type_name
))
})?;
// The NDC field name of the top-level column
let column_name = field_mapping.column.clone();
// Keep track of the current field path (consisting of NDC names):
let mut field_path = vec![];
// Keep track of the rest of the tree to consider:
let mut nested = operand.nested.clone();
let field_type = model_object_type
.object_type
.fields
.get(&operand.target.field_name)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"can't find object field definition for field {} in type: {}",
operand.target.field_name, type_name
))
})?;
// Keep track of the type of the current field under consideration
// (this will be an object type until we reach the bottom of the tree):
let mut current_type = field_type.field_type.underlying_type.clone();
loop {
match nested {
None => {
// At the bottom of the tree, return the comparison target with
// the field path that we've accumulated:
return Ok(ResolvedColumn {
column_name,
field_path,
field_mapping: field_mapping.clone(),
});
}
Some(operand) => {
let open_dds::query::Operand::Field(field) = operand.as_ref() else {
return Err(DataFusionError::Internal(format!(
"unsupported operand: {operand:?}"
)));
};
assert!(field.target.arguments.is_empty());
let field_name = &field.target.field_name;
let QualifiedBaseType::Named(QualifiedTypeName::Custom(object_type_name)) =
current_type
else {
return Err(DataFusionError::Internal(format!(
"field access on non-named type: {type_name:?}"
)));
};
let Some(object_type) = metadata.object_types.get(&object_type_name) else {
return Err(DataFusionError::Internal(format!(
"field access on non-object type: {type_name:?}"
)));
};
let field_defn =
object_type
.object_type
.fields
.get(field_name)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"can't find object field definition for field {field_name} in type: {type_name}"
))
})?;
let field_type = &field_defn.field_type.underlying_type;
let TypeMapping::Object {
ndc_object_type_name: _,
field_mappings,
} = type_mappings.get(&object_type_name).ok_or_else(|| {
DataFusionError::Internal(format!(
"can't find mapping object for type: {type_name}"
))
})?;
// Get the latest field mapping
field_mapping = field_mappings.get(field_name).ok_or_else(|| {
DataFusionError::Internal(format!(
"can't find field {field_name} in mapping for type: {type_name}"
))
})?;
// Add the NDC name of the object property to the field path
field_path.push(field_mapping.column.clone());
// Move to the next AST node
nested = field.nested.clone();
// Store the type of the current field:
current_type = field_type.clone();
}
}
}
}

View File

@ -1,11 +1,11 @@
use datafusion::error::{DataFusionError, Result};
use execute::plan::ResolvedFilterExpression;
use metadata_resolve::{
FieldMapping, Qualified, QualifiedBaseType, QualifiedTypeName, TypeMapping,
};
use metadata_resolve::{Qualified, TypeMapping};
use open_dds::{query::BooleanExpression, types::CustomTypeName};
use std::collections::BTreeMap;
use super::common::{to_resolved_column, ResolvedColumn};
pub(crate) fn to_resolved_filter_expr(
metadata: &metadata_resolve::Metadata,
type_mappings: &BTreeMap<Qualified<CustomTypeName>, TypeMapping>,
@ -46,11 +46,17 @@ pub(crate) fn to_resolved_filter_expr(
to_resolved_filter_expr(metadata, type_mappings, type_name, model_object_type, expr)?,
)),
BooleanExpression::IsNull(open_dds::query::Operand::Field(field)) => {
let (column, _field_mapping) =
to_comparison_target(metadata, type_mappings, type_name, model_object_type, field)?;
let ResolvedColumn {
column_name,
field_path,
field_mapping: _,
} = to_resolved_column(metadata, type_mappings, type_name, model_object_type, field)?;
Ok(ResolvedFilterExpression::LocalFieldComparison(
ir::LocalFieldComparison::UnaryComparison {
column,
column: ir::ComparisonTarget::Column {
name: column_name,
field_path,
},
operator: metadata_resolve::UnaryComparisonOperator::IsNull,
},
))
@ -60,8 +66,11 @@ pub(crate) fn to_resolved_filter_expr(
operator,
argument,
} => {
let (column, field_mapping) =
to_comparison_target(metadata, type_mappings, type_name, model_object_type, field)?;
let ResolvedColumn {
column_name,
field_path,
field_mapping,
} = to_resolved_column(metadata, type_mappings, type_name, model_object_type, field)?;
let ndc_operator = field_mapping
.equal_operators
@ -84,7 +93,10 @@ pub(crate) fn to_resolved_filter_expr(
let eq_expr = ResolvedFilterExpression::LocalFieldComparison(
ir::LocalFieldComparison::BinaryComparison {
column,
column: ir::ComparisonTarget::Column {
name: column_name,
field_path,
},
operator: ndc_operator,
value,
},
@ -105,135 +117,3 @@ pub(crate) fn to_resolved_filter_expr(
))),
}
}
/// Convert an ObjectFieldOperand into an NDC comparison target.
/// Also returns the FieldMapping for the enclosing object type, so
/// that additional mapping data (e.g. operators) can be extracted.
#[allow(clippy::assigning_clones)]
pub(crate) fn to_comparison_target(
metadata: &metadata_resolve::Metadata,
type_mappings: &BTreeMap<Qualified<CustomTypeName>, TypeMapping>,
type_name: &Qualified<CustomTypeName>,
model_object_type: &metadata_resolve::ObjectTypeWithRelationships,
operand: &open_dds::query::ObjectFieldOperand,
) -> Result<(ir::ComparisonTarget, FieldMapping)> {
let TypeMapping::Object {
ndc_object_type_name: _,
field_mappings,
} = type_mappings.get(type_name).ok_or_else(|| {
DataFusionError::Internal(format!("can't find mapping object for type: {type_name}"))
})?;
// Walk down the tree of the ObjectFieldOperand, and maintain several pieces
// of state as we go:
// Keep track of the field mapping for the current object type:
let mut field_mapping = field_mappings
.get(&operand.target.field_name)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"can't find field {} in mapping for type: {}",
operand.target.field_name, type_name
))
})?;
// The NDC field name of the top-level column
let column_name = field_mapping.column.clone();
// Keep track of the current field path (consisting of NDC names):
let mut field_path = vec![];
// Keep track of the rest of the tree to consider:
let mut nested = operand.nested.clone();
let field_type = model_object_type
.object_type
.fields
.get(&operand.target.field_name)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"can't find object field definition for field {} in type: {}",
operand.target.field_name, type_name
))
})?;
// Keep track of the type of the current field under consideration
// (this will be an object type until we reach the bottom of the tree):
let mut current_type = field_type.field_type.underlying_type.clone();
loop {
match nested {
None => {
// At the bottom of the tree, return the comparison target with
// the field path that we've accumulated:
return Ok((
ir::ComparisonTarget::Column {
name: column_name,
field_path,
},
field_mapping.clone(),
));
}
Some(operand) => {
let open_dds::query::Operand::Field(field) = operand.as_ref() else {
return Err(DataFusionError::Internal(format!(
"unsupported operand: {operand:?}"
)));
};
assert!(field.target.arguments.is_empty());
let field_name = &field.target.field_name;
let QualifiedBaseType::Named(QualifiedTypeName::Custom(object_type_name)) =
current_type
else {
return Err(DataFusionError::Internal(format!(
"field access on non-named type: {type_name:?}"
)));
};
let Some(object_type) = metadata.object_types.get(&object_type_name) else {
return Err(DataFusionError::Internal(format!(
"field access on non-object type: {type_name:?}"
)));
};
let field_defn =
object_type
.object_type
.fields
.get(field_name)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"can't find object field definition for field {field_name} in type: {type_name}"
))
})?;
let field_type = &field_defn.field_type.underlying_type;
let TypeMapping::Object {
ndc_object_type_name: _,
field_mappings,
} = type_mappings.get(&object_type_name).ok_or_else(|| {
DataFusionError::Internal(format!(
"can't find mapping object for type: {type_name}"
))
})?;
// Get the latest field mapping
field_mapping = field_mappings.get(field_name).ok_or_else(|| {
DataFusionError::Internal(format!(
"can't find field {field_name} in mapping for type: {type_name}"
))
})?;
// Add the NDC name of the object property to the field path
field_path.push(field_mapping.column.clone());
// Move to the next AST node
nested = field.nested.clone();
// Store the type of the current field:
current_type = field_type.clone();
}
}
}
}

View File

@ -1,13 +1,14 @@
use core::fmt;
use std::{any::Any, collections::BTreeMap, hash::Hash, sync::Arc};
use crate::catalog::model::common::{to_operand, try_into_column};
use datafusion::{
arrow::{
array::RecordBatch, datatypes::SchemaRef, error::ArrowError, json::reader as arrow_json,
},
common::{internal_err, plan_err, DFSchemaRef},
error::{DataFusionError, Result},
logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore},
logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore},
physical_expr::EquivalenceProperties,
physical_plan::{
stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionMode,
@ -20,7 +21,8 @@ use indexmap::IndexMap;
use metadata_resolve::{FilterPermission, Qualified, QualifiedTypeReference, TypeMapping};
use open_dds::{
arguments::ArgumentName,
query::Alias,
models::OrderByDirection,
query::{Alias, OrderByElement},
types::{DataConnectorArgumentName, FieldName},
};
use open_dds::{
@ -42,13 +44,11 @@ use execute::{
},
HttpContext,
};
use ir::{NdcFieldAlias, NdcRelationshipName};
use ir::{NdcFieldAlias, NdcRelationshipName, ResolvedOrderBy};
use open_dds::data_connector::CollectionName;
use crate::catalog::model::filter;
use super::filter::to_resolved_filter_expr;
#[derive(Debug, thiserror::Error)]
pub enum ExecutionPlanError {
#[error("{0}")]
@ -156,6 +156,62 @@ impl ModelQuery {
};
Ok(model_query_node)
}
pub(crate) fn sort(
&self,
sort_by: &[Expr],
limit: Option<usize>,
) -> datafusion::error::Result<Option<Self>> {
let mut new_order_by_elements: Vec<OrderByElement> = vec![];
for expr in sort_by {
let Expr::Sort(sort) = expr else {
return Ok(None);
};
let Some((column, path)) = try_into_column(&sort.expr)? else {
return Ok(None);
};
let operand = to_operand(path, column)?;
match operand {
open_dds::query::Operand::Field(_) => {
new_order_by_elements.push(OrderByElement {
direction: if sort.asc {
OrderByDirection::Asc
} else {
OrderByDirection::Desc
},
operand,
});
}
_ => {
return Ok(None);
}
}
}
let order_by = [
new_order_by_elements,
self.model_selection.target.order_by.clone(),
]
.concat();
let new_query = ModelQuery {
model_selection: ModelSelection {
target: ModelTarget {
order_by,
limit,
offset: None,
..self.model_selection.target.clone()
},
selection: self.model_selection.selection.clone(),
},
schema: self.schema.clone(),
};
Ok(Some(new_query))
}
}
impl ModelQuery {
@ -329,7 +385,7 @@ impl ModelQuery {
.filter
.as_ref()
.map(|expr| {
to_resolved_filter_expr(
super::filter::to_resolved_filter_expr(
metadata,
&model_source.type_mappings,
&model.model.data_type,
@ -349,6 +405,26 @@ impl ModelQuery {
}
};
let order_by_elements = model_target
.order_by
.iter()
.map(|element| {
super::order_by::to_resolved_order_by_element(
metadata,
&model_source.type_mappings,
&model.model.data_type,
model_object_type,
element,
)
})
.collect::<datafusion::error::Result<Vec<_>>>()?;
let limit = model_target
.limit
.map(u32::try_from)
.transpose()
.map_err(|_| DataFusionError::Internal("limit out of range".into()))?;
let ndc_pushdown = NDCQueryPushDown::new(
http_context.clone(),
self.schema.inner().clone(),
@ -356,6 +432,11 @@ impl ModelQuery {
ndc_arguments,
ndc_fields,
filter,
ir::ResolvedOrderBy {
order_by_elements,
relationships: BTreeMap::new(),
},
limit,
relationships,
Arc::new(model_source.data_connector.clone()),
);
@ -481,6 +562,8 @@ pub(crate) struct NDCQueryPushDown {
arguments: BTreeMap<DataConnectorArgumentName, serde_json::Value>,
fields: IndexMap<NdcFieldAlias, ResolvedField<'static>>,
filter: Option<ResolvedFilterExpression>,
order_by: ResolvedOrderBy<'static>,
limit: Option<u32>,
collection_relationships: BTreeMap<NdcRelationshipName, Relationship>,
data_connector: Arc<metadata_resolve::DataConnectorLink>,
projected_schema: SchemaRef,
@ -495,6 +578,8 @@ impl NDCQueryPushDown {
arguments: BTreeMap<DataConnectorArgumentName, serde_json::Value>,
fields: IndexMap<NdcFieldAlias, ResolvedField<'static>>,
filter: Option<ResolvedFilterExpression>,
order_by: ResolvedOrderBy<'static>,
limit: Option<u32>,
collection_relationships: BTreeMap<NdcRelationshipName, Relationship>,
data_connector: Arc<metadata_resolve::DataConnectorLink>,
) -> Self {
@ -505,6 +590,8 @@ impl NDCQueryPushDown {
arguments,
fields,
filter,
order_by,
limit,
collection_relationships,
data_connector,
projected_schema: schema,
@ -574,9 +661,9 @@ impl ExecutionPlan for NDCQueryPushDown {
.collect(),
),
aggregates: None,
limit: None,
limit: self.limit,
offset: None,
order_by: None,
order_by: Some(self.order_by.order_by_elements.clone()),
predicate: self.filter.clone(),
},
collection: self.collection_name.clone(),

View File

@ -0,0 +1,53 @@
use std::collections::BTreeMap;
use datafusion::error::{DataFusionError, Result};
use metadata_resolve::{Qualified, TypeMapping};
use open_dds::{query::OrderByElement, types::CustomTypeName};
use super::common::{to_resolved_column, ResolvedColumn};
pub(crate) fn to_resolved_order_by_element(
metadata: &metadata_resolve::Metadata,
type_mappings: &BTreeMap<Qualified<CustomTypeName>, TypeMapping>,
type_name: &Qualified<CustomTypeName>,
model_object_type: &metadata_resolve::ObjectTypeWithRelationships,
element: &OrderByElement,
) -> Result<ir::OrderByElement> {
match &element.operand {
open_dds::query::Operand::Field(operand) => {
let ResolvedColumn {
column_name,
field_path,
field_mapping: _,
} = to_resolved_column(
metadata,
type_mappings,
type_name,
model_object_type,
operand,
)?;
let target = ir::OrderByTarget::Column {
name: column_name,
field_path: if field_path.is_empty() {
None
} else {
Some(field_path)
},
relationship_path: vec![],
};
Ok(ir::OrderByElement {
order_direction: match element.direction {
open_dds::models::OrderByDirection::Asc => schema::ModelOrderByDirection::Asc,
open_dds::models::OrderByDirection::Desc => schema::ModelOrderByDirection::Desc,
},
target,
})
}
_ => Err(DataFusionError::Internal(format!(
"unsupported operand in sort: {:?}",
element.operand
))),
}
}