[PACHA-6] Remove projection pushdown (#967)

<!-- The PR description should answer 2 important questions: -->

### What

Remove the projection pushdown optimization. `datafusion` already
optimizes this to the correct NDC IR.

### How

<!-- How is it trying to accomplish it (what are the implementation
steps)? -->

V3_GIT_ORIGIN_REV_ID: 6e0034e4d920c39b70667f5f521341069a5c53de
This commit is contained in:
Phil Freeman 2024-08-13 11:46:18 -07:00 committed by hasura-bot
parent 1ebafeec1a
commit 714fffad18
4 changed files with 0 additions and 85 deletions

View File

@ -139,9 +139,6 @@ impl Catalog {
.with_config(session_config)
.with_query_planner(query_planner)
.with_optimizer_rule(Arc::new(super::execute::optimizer::ReplaceTableScan {}))
.with_optimizer_rule(Arc::new(
super::execute::optimizer::OpenDdPushDownProjection {},
))
.with_expr_planners(datafusion::SessionStateDefaults::default_expr_planners())
.with_scalar_functions(datafusion::SessionStateDefaults::default_scalar_functions())
.with_aggregate_functions(

View File

@ -1,5 +1,3 @@
mod projection_pushdown;
mod replace_table_scan;
pub(crate) use projection_pushdown::OpenDdPushDownProjection;
pub(crate) use replace_table_scan::ReplaceTableScan;

View File

@ -1,72 +0,0 @@
use std::sync::Arc;
use datafusion::{
common::{internal_err, tree_node::Transformed},
error::Result,
logical_expr::{Expr, Extension, LogicalPlan},
optimizer::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule},
};
pub(crate) struct OpenDdPushDownProjection {}
impl OptimizerRule for OpenDdPushDownProjection {
fn try_optimize(
&self,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called OpenDdPushDownProjection::rewrite")
}
fn name(&self) -> &str {
"open_dd_pushdown_projection"
}
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}
fn supports_rewrite(&self) -> bool {
true
}
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
if let Some((projections, projected_schema, model_query)) = {
match plan {
LogicalPlan::Projection(ref projection) => match projection.input.as_ref() {
LogicalPlan::Extension(Extension { node }) => node
.as_ref()
.as_any()
.downcast_ref::<crate::plan::ModelQuery>()
.map(|model_query| {
(&projection.expr, &projection.schema, model_query.clone())
}),
_ => None,
},
_ => None,
}
} {
let projected_columns = projections_to_columns(projections)?;
let projected_query = model_query.project(projected_schema.clone(), &projected_columns);
let plan = LogicalPlan::Extension(Extension {
node: Arc::new(projected_query),
});
Ok(Transformed::yes(plan))
} else {
Ok(Transformed::no(plan))
}
}
}
fn projections_to_columns(projections: &[Expr]) -> Result<Vec<String>> {
projections
.iter()
.map(|expr| match expr {
Expr::Column(column) => Ok(column.name.clone()),
_ => internal_err!("non-column found in projection of OpenDD query: {}", expr),
})
.collect()
}

View File

@ -138,14 +138,6 @@ impl ModelQuery {
};
Ok(model_query_node)
}
pub(crate) fn project(mut self, schema: DFSchemaRef, projection: &[String]) -> Self {
self.model_selection
.selection
.retain(|k, _v| projection.iter().any(|column| column == k.as_str()));
self.schema = schema;
self
}
}
impl UserDefinedLogicalNodeCore for ModelQuery {