simplify the sql context that powers datafusion (#921)

Prior to this, on every request, a datafusion catalog provider was
created from the stored sql context. This PR reworks it so that this is
cheap and also more maintainable will fewer intermediate steps. There is
also some work done towards supporting table valued functions.

---------

Co-authored-by: Abhinav Gupta <127770473+abhinav-hasura@users.noreply.github.com>
V3_GIT_ORIGIN_REV_ID: 8c30485366969d81d2a35760962e0383ed5e488c
This commit is contained in:
Vamshi Surabhi 2024-08-01 14:04:08 -07:00 committed by hasura-bot
parent 9af7a2b4b7
commit d41170b06a
23 changed files with 838 additions and 588 deletions

1
v3/Cargo.lock generated
View File

@ -4403,6 +4403,7 @@ dependencies = [
"open-dds",
"schema",
"serde",
"serde_json",
"thiserror",
"tracing-util",
]

View File

@ -116,7 +116,7 @@ struct EngineState {
schema: gql::schema::Schema<GDS>,
auth_config: AuthConfig,
pre_execution_plugins_config: Vec<PrePluginConfig>,
sql_context: sql::catalog::Context,
sql_context: Arc<sql::catalog::Catalog>,
}
#[tokio::main]
@ -695,7 +695,7 @@ async fn handle_sql_request(
|| {
Box::pin(async {
sql::execute::execute_sql(
&state.sql_context,
state.sql_context.clone(),
Arc::new(session),
Arc::new(state.http_context.clone()),
&request,
@ -765,7 +765,7 @@ fn build_state(
client: reqwest::Client::new(),
ndc_response_size_limit: None,
};
let sql_context = sql::catalog::Context::from_metadata(&resolved_metadata);
let sql_context = sql::catalog::Catalog::from_metadata(&resolved_metadata);
let schema = schema::GDS {
metadata: resolved_metadata,
}
@ -776,7 +776,7 @@ fn build_state(
schema,
auth_config,
pre_execution_plugins_config,
sql_context,
sql_context: sql_context.into(),
});
Ok(state)
}

View File

@ -10,6 +10,7 @@ pub(crate) mod query;
mod relationships;
pub(crate) mod selection_set;
pub use arguments::Argument;
pub use field::ResolvedField;
pub use filter::{plan_expression, resolve_expression, ResolvedFilterExpression};
pub use query::{ResolvedQueryExecutionPlan, ResolvedQueryNode};

View File

@ -86,9 +86,9 @@ pub(crate) fn process_model_arguments_presets<'s, 'a>(
type_mappings: &'s BTreeMap<Qualified<CustomTypeName>, TypeMapping>,
argument_presets: &'a ArgumentPresets,
session_variables: &SessionVariables,
model_arguments: &mut BTreeMap<DataConnectorArgumentName, Argument<'s>>,
mut model_arguments: BTreeMap<DataConnectorArgumentName, Argument<'s>>,
usage_counts: &mut UsagesCounts,
) -> Result<(), error::Error>
) -> Result<BTreeMap<DataConnectorArgumentName, Argument<'s>>, error::Error>
where
'a: 's,
{
@ -154,7 +154,7 @@ where
}
}
}
Ok(())
Ok(model_arguments)
}
// fetch input values from annotations and turn them into either JSON or an Expression

View File

@ -159,12 +159,12 @@ pub fn generate_aggregate_model_selection_ir<'s>(
if let Some(model_argument_presets) =
permissions::get_argument_presets(field_call.info.namespaced)?
{
arguments::process_model_arguments_presets(
arguments.model_arguments = arguments::process_model_arguments_presets(
&model_source.data_connector,
&model_source.type_mappings,
model_argument_presets,
session_variables,
&mut arguments.model_arguments,
arguments.model_arguments,
usage_counts,
)?;
}

View File

@ -133,12 +133,12 @@ pub fn select_many_generate_ir<'n, 's>(
permissions::get_argument_presets(field_call_argument.info.namespaced)?
{
// add any preset arguments from model permissions
arguments::process_model_arguments_presets(
model_arguments = arguments::process_model_arguments_presets(
&model_source.data_connector,
&model_source.type_mappings,
argument_presets,
session_variables,
&mut model_arguments,
model_arguments,
&mut usage_counts,
)?;
}

View File

@ -104,12 +104,12 @@ pub fn select_one_generate_ir<'n, 's>(
if let Some(argument_presets) = permissions::get_argument_presets(field_call.info.namespaced)? {
// add any preset arguments from model permissions
arguments::process_model_arguments_presets(
model_arguments = arguments::process_model_arguments_presets(
&model_source.data_connector,
&model_source.type_mappings,
argument_presets,
session_variables,
&mut model_arguments,
model_arguments,
&mut usage_counts,
)?;
}

View File

@ -32,8 +32,8 @@ pub use stages::commands::Command;
pub use stages::data_connectors;
pub use stages::data_connectors::DataConnectorLink;
pub use stages::model_permissions::{
FilterPermission, ModelPredicate, ModelTargetSource, ModelWithPermissions, SelectPermission,
UnaryComparisonOperator,
ArgumentPresets, FilterPermission, ModelPredicate, ModelTargetSource, ModelWithPermissions,
SelectPermission, UnaryComparisonOperator,
};
pub use stages::models::{Model, ModelSource};
pub use stages::scalar_boolean_expressions::ResolvedScalarBooleanExpressionType;

View File

@ -7,7 +7,7 @@ use indexmap::IndexMap;
use open_dds::{data_connector::DataConnectorName, models::ModelName, types::CustomTypeName};
use std::collections::BTreeMap;
pub use types::{
FilterPermission, ModelPredicate, ModelTargetSource, ModelWithPermissions,
ArgumentPresets, FilterPermission, ModelPredicate, ModelTargetSource, ModelWithPermissions,
PredicateRelationshipInfo, SelectPermission, UnaryComparisonOperator,
};
mod model_permission;

View File

@ -107,7 +107,7 @@ fn resolve_model_predicate_with_model(
}
// get the ndc_models::Type for an argument if it is available
fn get_model_source_argument<'a>(
pub fn get_model_source_argument<'a>(
argument_name: &'a ArgumentName,
model: &'a models::Model,
) -> Option<&'a ndc_models::Type> {

View File

@ -31,12 +31,14 @@ pub enum FilterPermission {
Filter(ModelPredicate),
}
pub type ArgumentPresets =
BTreeMap<ArgumentName, (QualifiedTypeReference, ValueExpressionOrPredicate)>;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct SelectPermission {
pub filter: FilterPermission,
// pub allow_aggregations: bool,
pub argument_presets:
BTreeMap<ArgumentName, (QualifiedTypeReference, ValueExpressionOrPredicate)>,
pub argument_presets: ArgumentPresets,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]

View File

@ -19,6 +19,7 @@ datafusion = { version = "40.0.0", features = ["serde"] }
async-trait = "0.1.81"
futures = "0.3.30"
serde = { workspace = true, features = ["rc"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
[lints]

View File

@ -1,40 +1,34 @@
use std::{any::Any, collections::HashMap, sync::Arc};
use ::datafusion::{
execution::{context::SessionState, runtime_env::RuntimeEnv},
sql::TableReference,
};
use async_trait::async_trait;
use ::datafusion::execution::{context::SessionState, runtime_env::RuntimeEnv};
use hasura_authn_core::Session;
use indexmap::IndexMap;
use metadata_resolve::{self as resolved};
use open_dds::permissions::Role;
use schema::OpenDDSchemaProvider;
use serde::{Deserialize, Serialize};
mod datafusion {
pub(super) use datafusion::{
catalog::{schema::SchemaProvider, CatalogProvider},
datasource::TableProvider,
error::Result,
prelude::{SessionConfig, SessionContext},
scalar::ScalarValue,
};
}
pub mod introspection;
pub mod schema;
pub mod table;
pub mod model;
pub mod subgraph;
/// The context in which to compile and execute SQL queries.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct Context {
pub(crate) subgraphs: IndexMap<String, schema::Subgraph>,
pub(crate) type_permissions: HashMap<Role, Arc<table::TypePermissionsOfRole>>,
pub(crate) introspection: introspection::Introspection,
pub struct Catalog {
pub(crate) subgraphs: IndexMap<String, Arc<subgraph::Subgraph>>,
pub(crate) type_permissions: HashMap<Role, Arc<model::TypePermissionsOfRole>>,
pub(crate) introspection: Arc<introspection::IntrospectionSchemaProvider>,
pub(crate) default_schema: Option<String>,
}
impl Context {
impl Catalog {
/// Derive a SQL Context from resolved Open DDS metadata.
pub fn from_metadata(metadata: &resolved::Metadata) -> Self {
let mut subgraphs = IndexMap::new();
@ -44,25 +38,25 @@ impl Context {
let subgraph =
subgraphs
.entry(schema_name.clone())
.or_insert_with(|| schema::Subgraph {
models: IndexMap::new(),
.or_insert_with(|| subgraph::Subgraph {
tables: IndexMap::new(),
});
subgraph.models.insert(
subgraph.tables.insert(
table_name.to_string(),
table::Model::from_resolved_model(model),
Arc::new(model::ModelWithPermissions::from_resolved_model(model)),
);
}
let mut type_permissions = HashMap::new();
for (type_name, object_type) in &metadata.object_types {
for (role, output_permission) in &object_type.type_output_permissions {
let output_permission = table::TypePermission {
let output_permission = model::TypePermission {
output: output_permission.clone(),
};
let role_permissions =
type_permissions
.entry(role)
.or_insert_with(|| table::TypePermissionsOfRole {
.or_insert_with(|| model::TypePermissionsOfRole {
permissions: HashMap::new(),
});
role_permissions
@ -70,140 +64,62 @@ impl Context {
.insert(type_name.clone(), output_permission);
}
}
let introspection = introspection::Introspection::from_metadata(metadata, &subgraphs);
Context {
subgraphs,
let introspection = introspection::IntrospectionSchemaProvider::new(
&introspection::Introspection::from_metadata(metadata, &subgraphs),
);
let default_schema = if subgraphs.len() == 1 {
subgraphs.get_index(0).map(|v| v.0.clone())
} else {
None
};
Catalog {
subgraphs: subgraphs
.into_iter()
.map(|(k, v)| (k, Arc::new(v)))
.collect(),
type_permissions: type_permissions
.into_iter()
.map(|(role, role_permissions)| (role.clone(), Arc::new(role_permissions)))
.collect(),
introspection,
introspection: Arc::new(introspection),
default_schema,
}
}
}
pub struct OpenDDCatalogProvider {
schemas: IndexMap<String, Arc<HasuraSchemaProvider>>,
}
impl OpenDDCatalogProvider {
fn new(
session: &Arc<Session>,
http_context: &Arc<execute::HttpContext>,
context: &Context,
) -> Self {
let type_permissions = context.type_permissions.get(&session.role).cloned();
let mut schemas = IndexMap::new();
for (subgraph_name, subgraph) in &context.subgraphs {
let mut tables = IndexMap::new();
for model in subgraph.models.values() {
let select_permission = model.permissions.get(&session.role).cloned();
let provider = table::OpenDDTableProvider {
session: session.clone(),
http_context: http_context.clone(),
name: model.name.clone(),
data_type: model.data_type.clone(),
source: model.source.clone(),
schema: model.schema.clone(),
select_permission,
type_permissions: type_permissions.clone(),
};
tables.insert(model.name.to_string(), Arc::new(provider));
}
let provider = HasuraSchemaProvider::OpenDD(schema::OpenDDSchemaProvider { tables });
schemas.insert(subgraph_name.clone(), Arc::new(provider));
}
schemas.insert(
introspection::HASURA_METADATA_SCHEMA.to_string(),
Arc::new(HasuraSchemaProvider::Introspection(
introspection::IntrospectionSchemaProvider::new(&context.introspection),
)),
);
OpenDDCatalogProvider { schemas }
}
pub(crate) fn get(
&self,
default_schema: Option<&str>,
table: &TableReference,
) -> Option<&table::OpenDDTableProvider> {
let schema = table.schema().or(default_schema);
let table = table.table();
if let Some(schema) = schema {
if let HasuraSchemaProvider::OpenDD(schema) = self.schemas.get(schema)?.as_ref() {
schema.tables.get(table).map(std::convert::AsRef::as_ref)
} else {
None
}
} else {
None
}
}
}
enum HasuraSchemaProvider {
OpenDD(OpenDDSchemaProvider),
Introspection(introspection::IntrospectionSchemaProvider),
}
#[async_trait]
impl datafusion::SchemaProvider for HasuraSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
match self {
HasuraSchemaProvider::OpenDD(schema) => schema.table_names(),
HasuraSchemaProvider::Introspection(schema) => schema.table_names(),
}
}
async fn table(
&self,
name: &str,
) -> datafusion::Result<Option<Arc<dyn datafusion::TableProvider>>> {
match self {
HasuraSchemaProvider::OpenDD(schema) => schema.table(name).await,
HasuraSchemaProvider::Introspection(schema) => schema.table(name).await,
}
}
fn table_exist(&self, name: &str) -> bool {
match self {
HasuraSchemaProvider::OpenDD(schema) => schema.table_exist(name),
HasuraSchemaProvider::Introspection(schema) => schema.table_exist(name),
}
}
}
impl datafusion::CatalogProvider for OpenDDCatalogProvider {
impl datafusion::CatalogProvider for model::WithSession<Catalog> {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.schemas.keys().cloned().collect()
let mut schema_names: Vec<String> = self.value.subgraphs.keys().cloned().collect();
schema_names.push(introspection::HASURA_METADATA_SCHEMA.to_string());
schema_names
}
fn schema(&self, name: &str) -> Option<Arc<dyn datafusion::SchemaProvider>> {
self.schemas
.get(name)
.cloned()
.map(|schema| schema as Arc<dyn datafusion::SchemaProvider>)
let subgraph_provider = self.value.subgraphs.get(name).cloned().map(|schema| {
Arc::new(model::WithSession {
value: schema,
session: self.session.clone(),
}) as Arc<dyn datafusion::SchemaProvider>
});
if subgraph_provider.is_none() && name == introspection::HASURA_METADATA_SCHEMA {
Some(self.value.introspection.clone() as Arc<dyn datafusion::SchemaProvider>)
} else {
subgraph_provider
}
}
}
impl Context {
impl Catalog {
pub fn create_session_context(
&self,
self: Arc<Self>,
session: &Arc<Session>,
http_context: &Arc<execute::HttpContext>,
) -> datafusion::SessionContext {
let default_schema_name = if self.subgraphs.len() == 1 {
self.subgraphs.get_index(0).map(|v| v.0)
} else {
None
};
let session_config = datafusion::SessionConfig::new()
.set(
"datafusion.catalog.default_catalog",
@ -226,7 +142,7 @@ impl Context {
datafusion::ScalarValue::Boolean(Some(false)),
);
let session_config = if let Some(default_schema_name) = default_schema_name {
let session_config = if let Some(default_schema_name) = &self.default_schema {
session_config.set(
"datafusion.catalog.default_schema",
datafusion::ScalarValue::Utf8(Some(default_schema_name.clone())),
@ -234,26 +150,26 @@ impl Context {
} else {
session_config
};
let catalog = Arc::new(OpenDDCatalogProvider::new(session, http_context, self));
let query_planner = Arc::new(super::execute::planner::NDCQueryPlanner {
default_schema: default_schema_name.map(|s| Arc::new(s.clone())),
catalog: catalog.clone(),
let query_planner = Arc::new(super::execute::planner::OpenDDQueryPlanner {
catalog: self.clone(),
session: session.clone(),
http_context: http_context.clone(),
});
let mut session_state =
let session_state =
SessionState::new_with_config_rt(session_config, Arc::new(RuntimeEnv::default()))
.with_query_planner(query_planner)
.add_optimizer_rule(Arc::new(super::execute::optimizer::ReplaceTableScan {}))
.add_optimizer_rule(Arc::new(
super::execute::optimizer::NDCPushDownProjection {},
));
// add_analyzer_rule takes a mut &self instead of mut self because of which we can't chain
// the creation of session_state
session_state.add_analyzer_rule(Arc::new(super::execute::analyzer::ReplaceTableScan::new(
default_schema_name.map(|s| Arc::new(s.clone())),
catalog.clone(),
)));
let session_context = datafusion::SessionContext::new_with_state(session_state);
session_context
.register_catalog("default", catalog as Arc<dyn datafusion::CatalogProvider>);
session_context.register_catalog(
"default",
Arc::new(model::WithSession {
session: session.clone(),
value: self,
}) as Arc<dyn datafusion::CatalogProvider>,
);
session_context
}
}

View File

@ -5,7 +5,7 @@ use std::{any::Any, sync::Arc};
use async_trait::async_trait;
use indexmap::IndexMap;
use metadata_resolve::{self as resolved, ModelRelationshipTarget};
mod df {
mod datafusion {
pub(super) use datafusion::{
arrow::{
array::RecordBatch,
@ -33,26 +33,26 @@ pub const INFERRED_FOREIGN_KEY_CONSTRAINTS: &str = "inferred_foreign_key_constra
pub(crate) struct Introspection {
pub(crate) table_metadata: TableMetadata,
pub(crate) column_metadata: ColumnMetadata,
pub(crate) inferred_foreign_key_constraints: InferredForeignKeys,
pub(crate) inferred_foreign_key_constraints: InferredatafusionoreignKeys,
}
impl Introspection {
/// Derive SQL schema from the Open DDS metadata.
pub fn from_metadata(
metadata: &resolved::Metadata,
schemas: &IndexMap<String, crate::catalog::schema::Subgraph>,
schemas: &IndexMap<String, crate::catalog::subgraph::Subgraph>,
) -> Self {
let mut table_metadata_rows = Vec::new();
let mut column_metadata_rows = Vec::new();
let mut foreign_key_constraint_rows = Vec::new();
for (schema_name, schema) in schemas {
for (table_name, table) in &schema.models {
for (table_name, table) in &schema.tables {
table_metadata_rows.push(TableRow::new(
schema_name.clone(),
table_name.to_string(),
table.description.clone(),
table.model.description.clone(),
));
for (column_name, column_description) in &table.columns {
for (column_name, column_description) in &table.model.columns {
column_metadata_rows.push(ColumnRow {
schema_name: schema_name.clone(),
table_name: table_name.clone(),
@ -65,7 +65,7 @@ impl Introspection {
// 1. Need to check if the target_model is part of subgraphs
// 2. Need to also check for array relationships in case the corresponding
// object relationship isn't present
if let Some(object_type) = metadata.object_types.get(&table.data_type) {
if let Some(object_type) = metadata.object_types.get(&table.model.data_type) {
for relationship in object_type.relationship_fields.values() {
if let metadata_resolve::RelationshipTarget::Model(
ModelRelationshipTarget {
@ -94,24 +94,29 @@ impl Introspection {
Introspection {
table_metadata: TableMetadata::new(table_metadata_rows),
column_metadata: ColumnMetadata::new(column_metadata_rows),
inferred_foreign_key_constraints: InferredForeignKeys::new(foreign_key_constraint_rows),
inferred_foreign_key_constraints: InferredatafusionoreignKeys::new(
foreign_key_constraint_rows,
),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct TableMetadata {
schema: df::SchemaRef,
schema: datafusion::SchemaRef,
rows: Vec<TableRow>,
}
impl TableMetadata {
pub(crate) fn new(rows: Vec<TableRow>) -> Self {
let schema_name = df::Field::new("schema_name", df::DataType::Utf8, false);
let table_name = df::Field::new("table_name", df::DataType::Utf8, false);
let description = df::Field::new("description", df::DataType::Utf8, true);
let schema =
df::SchemaRef::new(df::Schema::new(vec![schema_name, table_name, description]));
let schema_name = datafusion::Field::new("schema_name", datafusion::DataType::Utf8, false);
let table_name = datafusion::Field::new("table_name", datafusion::DataType::Utf8, false);
let description = datafusion::Field::new("description", datafusion::DataType::Utf8, true);
let schema = datafusion::SchemaRef::new(datafusion::Schema::new(vec![
schema_name,
table_name,
description,
]));
TableMetadata { schema, rows }
}
}
@ -125,9 +130,9 @@ impl TableMetadata {
.iter()
.map(|row| {
vec![
df::ScalarValue::Utf8(Some(row.schema_name.clone())),
df::ScalarValue::Utf8(Some(row.table_name.clone())),
df::ScalarValue::Utf8(row.description.clone()),
ScalarValue::Utf8(Some(row.schema_name.clone())),
ScalarValue::Utf8(Some(row.table_name.clone())),
ScalarValue::Utf8(row.description.clone()),
]
})
.collect(),
@ -158,17 +163,17 @@ impl TableRow {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct ColumnMetadata {
pub(crate) schema: df::SchemaRef,
pub(crate) schema: datafusion::SchemaRef,
pub(crate) rows: Vec<ColumnRow>,
}
impl ColumnMetadata {
fn new(rows: Vec<ColumnRow>) -> Self {
let schema_name = df::Field::new("schema_name", df::DataType::Utf8, false);
let table_name = df::Field::new("table_name", df::DataType::Utf8, false);
let column_name = df::Field::new("column_name", df::DataType::Utf8, false);
let description = df::Field::new("description", df::DataType::Utf8, true);
let schema = df::SchemaRef::new(df::Schema::new(vec![
let schema_name = datafusion::Field::new("schema_name", datafusion::DataType::Utf8, false);
let table_name = datafusion::Field::new("table_name", datafusion::DataType::Utf8, false);
let column_name = datafusion::Field::new("column_name", datafusion::DataType::Utf8, false);
let description = datafusion::Field::new("description", datafusion::DataType::Utf8, true);
let schema = datafusion::SchemaRef::new(datafusion::Schema::new(vec![
schema_name,
table_name,
column_name,
@ -184,10 +189,10 @@ impl ColumnMetadata {
.iter()
.map(|row| {
vec![
df::ScalarValue::Utf8(Some(row.schema_name.clone())),
df::ScalarValue::Utf8(Some(row.table_name.clone())),
df::ScalarValue::Utf8(Some(row.column_name.clone())),
df::ScalarValue::Utf8(row.description.clone()),
ScalarValue::Utf8(Some(row.schema_name.clone())),
ScalarValue::Utf8(Some(row.table_name.clone())),
ScalarValue::Utf8(Some(row.column_name.clone())),
ScalarValue::Utf8(row.description.clone()),
]
})
.collect(),
@ -204,20 +209,26 @@ pub(crate) struct ColumnRow {
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct InferredForeignKeys {
schema: df::SchemaRef,
pub(crate) struct InferredatafusionoreignKeys {
schema: datafusion::SchemaRef,
rows: Vec<ForeignKeyRow>,
}
impl InferredForeignKeys {
impl InferredatafusionoreignKeys {
fn new(rows: Vec<ForeignKeyRow>) -> Self {
let from_schema_name = df::Field::new("from_schema_name", df::DataType::Utf8, false);
let from_table_name = df::Field::new("from_table_name", df::DataType::Utf8, false);
let from_column_name = df::Field::new("from_column_name", df::DataType::Utf8, false);
let to_schema_name = df::Field::new("to_schema_name", df::DataType::Utf8, false);
let to_table_name = df::Field::new("to_table_name", df::DataType::Utf8, false);
let to_column_name = df::Field::new("to_column_name", df::DataType::Utf8, false);
let schema = df::SchemaRef::new(df::Schema::new(vec![
let from_schema_name =
datafusion::Field::new("from_schema_name", datafusion::DataType::Utf8, false);
let from_table_name =
datafusion::Field::new("from_table_name", datafusion::DataType::Utf8, false);
let from_column_name =
datafusion::Field::new("from_column_name", datafusion::DataType::Utf8, false);
let to_schema_name =
datafusion::Field::new("to_schema_name", datafusion::DataType::Utf8, false);
let to_table_name =
datafusion::Field::new("to_table_name", datafusion::DataType::Utf8, false);
let to_column_name =
datafusion::Field::new("to_column_name", datafusion::DataType::Utf8, false);
let schema = datafusion::SchemaRef::new(datafusion::Schema::new(vec![
from_schema_name,
from_table_name,
from_column_name,
@ -225,7 +236,7 @@ impl InferredForeignKeys {
to_table_name,
to_column_name,
]));
InferredForeignKeys { schema, rows }
InferredatafusionoreignKeys { schema, rows }
}
fn to_values_table(&self) -> ValuesTable {
ValuesTable {
@ -235,12 +246,12 @@ impl InferredForeignKeys {
.iter()
.map(|row| {
vec![
df::ScalarValue::Utf8(Some(row.from_schema_name.clone())),
df::ScalarValue::Utf8(Some(row.from_table_name.clone())),
df::ScalarValue::Utf8(Some(row.from_column_name.clone())),
df::ScalarValue::Utf8(Some(row.to_schema_name.clone())),
df::ScalarValue::Utf8(Some(row.to_table_name.clone())),
df::ScalarValue::Utf8(Some(row.to_column_name.clone())),
ScalarValue::Utf8(Some(row.from_schema_name.clone())),
ScalarValue::Utf8(Some(row.from_table_name.clone())),
ScalarValue::Utf8(Some(row.from_column_name.clone())),
ScalarValue::Utf8(Some(row.to_schema_name.clone())),
ScalarValue::Utf8(Some(row.to_table_name.clone())),
ScalarValue::Utf8(Some(row.to_column_name.clone())),
]
})
.collect(),
@ -248,6 +259,19 @@ impl InferredForeignKeys {
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
enum ScalarValue {
Utf8(Option<String>),
}
impl ScalarValue {
fn into_datafusion_scalar_value(self) -> datafusion::ScalarValue {
match self {
ScalarValue::Utf8(value) => datafusion::ScalarValue::Utf8(value),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
struct ForeignKeyRow {
from_schema_name: String,
@ -258,8 +282,9 @@ struct ForeignKeyRow {
to_column_name: String,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct IntrospectionSchemaProvider {
tables: IndexMap<String, Arc<dyn df::TableProvider>>,
tables: IndexMap<String, Arc<ValuesTable>>,
}
impl IntrospectionSchemaProvider {
@ -281,14 +306,14 @@ impl IntrospectionSchemaProvider {
),
]
.into_iter()
.map(|(k, table)| (k.to_string(), Arc::new(table) as Arc<dyn df::TableProvider>))
.map(|(k, table)| (k.to_string(), Arc::new(table)))
.collect();
IntrospectionSchemaProvider { tables }
}
}
#[async_trait]
impl df::SchemaProvider for IntrospectionSchemaProvider {
impl datafusion::SchemaProvider for IntrospectionSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
@ -300,8 +325,12 @@ impl df::SchemaProvider for IntrospectionSchemaProvider {
async fn table(
&self,
name: &str,
) -> datafusion::error::Result<Option<Arc<dyn df::TableProvider>>> {
Ok(self.tables.get(name).cloned())
) -> datafusion::Result<Option<Arc<dyn datafusion::TableProvider>>> {
Ok(self
.tables
.get(name)
.cloned()
.map(|table| table as Arc<dyn datafusion::TableProvider>))
}
fn table_exist(&self, name: &str) -> bool {
@ -310,42 +339,47 @@ impl df::SchemaProvider for IntrospectionSchemaProvider {
}
// A table with static rows
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
struct ValuesTable {
schema: df::SchemaRef,
rows: Vec<Vec<df::ScalarValue>>,
schema: datafusion::SchemaRef,
rows: Vec<Vec<ScalarValue>>,
}
#[async_trait]
impl df::TableProvider for ValuesTable {
impl datafusion::TableProvider for ValuesTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> df::SchemaRef {
fn schema(&self) -> datafusion::SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> df::TableType {
df::TableType::View
fn table_type(&self) -> datafusion::TableType {
datafusion::TableType::View
}
async fn scan(
&self,
_state: &df::SessionState,
_state: &datafusion::SessionState,
projection: Option<&Vec<usize>>,
// filters and limit can be used here to inject some push-down operations if needed
_filters: &[df::Expr],
_filters: &[datafusion::Expr],
_limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn df::ExecutionPlan>> {
) -> datafusion::Result<Arc<dyn datafusion::ExecutionPlan>> {
let projected_schema = Arc::new(self.schema.project(projection.unwrap_or(&vec![]))?);
let columnar_projection = projection
.unwrap_or(&vec![])
.iter()
.map(|j| self.rows.iter().map(|row| row[*j].clone()))
.map(df::ScalarValue::iter_to_array)
.collect::<df::Result<Vec<_>>>()?;
Ok(Arc::new(df::ValuesExec::try_new_from_batches(
.map(|j| {
self.rows
.iter()
.map(|row| row[*j].clone().into_datafusion_scalar_value())
})
.map(datafusion::ScalarValue::iter_to_array)
.collect::<datafusion::Result<Vec<_>>>()?;
Ok(Arc::new(datafusion::ValuesExec::try_new_from_batches(
projected_schema.clone(),
vec![df::RecordBatch::try_new(
vec![datafusion::RecordBatch::try_new(
projected_schema,
columnar_projection,
)?],

View File

@ -0,0 +1,385 @@
//! Describe a model for a SQL table and how to translate datafusion operations on the table
//! to ndc-spec queries.
use std::collections::{BTreeMap, HashMap};
use std::{any::Any, sync::Arc};
use async_trait::async_trait;
use hasura_authn_core::Session;
use indexmap::IndexMap;
use metadata_resolve::{self as resolved, Qualified};
use open_dds::arguments::ArgumentName;
use open_dds::permissions::Role;
use open_dds::{
models::ModelName,
types::{CustomTypeName, FieldName},
};
use serde::{Deserialize, Serialize};
use crate::plan::NDCQuery;
mod datafusion {
pub(super) use datafusion::{
arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef, TimeUnit},
common::{DFSchema, DFSchemaRef},
datasource::function::TableFunctionImpl,
datasource::{TableProvider, TableType},
error::Result,
execution::context::SessionState,
logical_expr::Expr,
logical_expr::Extension,
logical_expr::LogicalPlan,
physical_plan::ExecutionPlan,
};
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub(crate) struct TypePermission {
pub output: open_dds::permissions::TypeOutputPermission,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct TypePermissionsOfRole {
pub(crate) permissions: HashMap<Qualified<CustomTypeName>, TypePermission>,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct ModelWithPermissions {
pub(crate) model: Arc<Model>,
// Permisisons for the model. Note that the type permissions will need to be retrieved from the
// global context
pub(crate) permissions: HashMap<Role, Arc<resolved::SelectPermission>>,
// The underlying source to execute ndc queries
pub source: Option<Arc<metadata_resolve::ModelSource>>,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct ArgumentInfo {
pub argument_type: datafusion::DataType,
pub description: Option<String>,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct Model {
pub subgraph: String,
pub name: ModelName,
pub description: Option<String>,
pub arguments: IndexMap<ArgumentName, ArgumentInfo>,
// Datafusion table schema
pub schema: datafusion::SchemaRef,
// For now, descriptions of fields
pub columns: IndexMap<String, Option<String>>,
// This is the entry point for the type mappings stored
// in ModelSource
pub data_type: Qualified<CustomTypeName>,
}
impl ModelWithPermissions {
pub fn from_resolved_model(model: &resolved::ModelWithPermissions) -> Self {
let (schema, columns) = {
let mut columns = IndexMap::new();
let mut builder = datafusion::SchemaBuilder::new();
for (field_name, field_definition) in &model.model.type_fields {
let ndc_type_representation = get_type_representation(&model.model, field_name);
let field_type =
to_arrow_type(&field_definition.field_type, ndc_type_representation);
if let Some(field_type) = field_type {
builder.push(datafusion::Field::new(
field_name.to_string(),
field_type,
field_definition.field_type.nullable,
));
let description = if let Some(ndc_models::TypeRepresentation::Enum { one_of }) =
ndc_type_representation
{
// TODO: Instead of stuffing the possible enum values in description,
// surface them in the metadata tables.
Some(
field_definition
.description
.clone()
.unwrap_or_else(String::new)
+ &format!(" Possible values: {}", one_of.join(", ")),
)
} else {
field_definition.description.clone()
};
columns.insert(field_name.to_string(), description);
}
}
let fields = builder.finish().fields;
(
datafusion::SchemaRef::new(datafusion::Schema::new(fields)),
columns,
)
};
let permissions = model
.select_permissions
.iter()
.map(|(role, select_permission)| (role.clone(), Arc::new(select_permission.clone())))
.collect();
let model_source = model
.model
.source
.as_ref()
.map(|source| Arc::new(source.clone()));
let model = Model {
subgraph: model.model.name.subgraph.clone(),
name: model.model.name.name.clone(),
description: model.model.raw.description.clone(),
arguments: IndexMap::new(),
schema,
data_type: model.model.data_type.clone(),
columns,
};
ModelWithPermissions {
model: model.into(),
source: model_source,
permissions,
}
}
}
pub(crate) struct WithSession<T> {
pub(crate) session: Arc<Session>,
pub(crate) value: Arc<T>,
}
// TODO: this will be removed when table valued functions are fully supported
#[allow(dead_code)]
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct TableValuedFunction {
/// Metadata about the model
pub(crate) model: Arc<Model>,
pub(crate) source: Arc<resolved::ModelSource>,
pub(crate) session: Arc<Session>,
pub(crate) permission: Arc<resolved::SelectPermission>,
}
impl TableValuedFunction {
// TODO: this will be removed when table valued functions are fully supported
#[allow(dead_code)]
pub(crate) fn new(
model: Arc<Model>,
source: Arc<resolved::ModelSource>,
session: Arc<Session>,
permission: Arc<resolved::SelectPermission>,
) -> Self {
TableValuedFunction {
model,
source,
session,
permission,
}
}
}
impl datafusion::TableFunctionImpl for TableValuedFunction {
fn call(
&self,
// TODO: table valued function implementation is not yet complete
_exprs: &[datafusion::Expr],
) -> datafusion::Result<Arc<dyn datafusion::TableProvider>> {
let arguments = BTreeMap::new();
let table = Table::new(
self.model.clone(),
arguments,
self.source.clone(),
self.permission.clone(),
);
Ok(Arc::new(table) as Arc<dyn datafusion::TableProvider>)
}
}
/// A Table represents an OpenDD entity which can provide a set of rows.
/// Currently, this is an instatation of a model with concrete arguments
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct Table {
/// Metadata about the model
pub(crate) model: Arc<Model>,
/// This will be empty if the model doesn't take any arguments
pub(crate) arguments: BTreeMap<ArgumentName, serde_json::Value>,
pub(crate) source: Arc<resolved::ModelSource>,
pub(crate) permission: Arc<resolved::SelectPermission>,
}
impl Table {
pub(crate) fn new(
model: Arc<Model>,
arguments: BTreeMap<ArgumentName, serde_json::Value>,
source: Arc<resolved::ModelSource>,
permission: Arc<resolved::SelectPermission>,
) -> Self {
Table {
model,
arguments,
source,
permission,
}
}
pub(crate) fn new_no_args(
model: Arc<Model>,
source: Arc<resolved::ModelSource>,
permission: Arc<resolved::SelectPermission>,
) -> Self {
Table {
model,
arguments: BTreeMap::new(),
source,
permission,
}
}
pub(crate) fn to_logical_plan(
&self,
projected_schema: datafusion::DFSchemaRef,
) -> datafusion::Result<datafusion::LogicalPlan> {
let ndc_query_node = NDCQuery::new(
self.model.clone(),
self.source.clone(),
&self.arguments,
self.permission.clone(),
projected_schema,
)?;
let logical_plan = datafusion::LogicalPlan::Extension(datafusion::Extension {
node: Arc::new(ndc_query_node),
});
Ok(logical_plan)
}
}
#[async_trait]
impl datafusion::TableProvider for Table {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> datafusion::SchemaRef {
self.model.schema.clone()
}
fn table_type(&self) -> datafusion::TableType {
datafusion::TableType::Base
}
async fn scan(
&self,
state: &datafusion::SessionState,
projection: Option<&Vec<usize>>,
// filters and limit can be used here to inject some push-down operations if needed
_filters: &[datafusion::Expr],
_limit: Option<usize>,
) -> datafusion::Result<Arc<dyn datafusion::ExecutionPlan>> {
let projected_schema = self.model.schema.project(projection.unwrap_or(&vec![]))?;
let qualified_projected_schema = datafusion::DFSchema::from_unqualified_fields(
projected_schema.fields,
projected_schema.metadata,
)?;
let logical_plan = self.to_logical_plan(Arc::new(qualified_projected_schema))?;
state.create_physical_plan(&logical_plan).await
}
}
/// Converts an opendd type to an arrow type.
/// TODO: need to handle complex types
#[allow(clippy::match_same_arms)]
fn to_arrow_type(
ty: &resolved::QualifiedTypeReference,
ndc_type_representation: Option<&ndc_models::TypeRepresentation>,
) -> Option<datafusion::DataType> {
match &ty.underlying_type {
resolved::QualifiedBaseType::Named(resolved::QualifiedTypeName::Inbuilt(inbuilt_type)) => {
let data_type = match inbuilt_type {
open_dds::types::InbuiltType::ID => datafusion::DataType::Utf8,
open_dds::types::InbuiltType::Int => datafusion::DataType::Int32,
open_dds::types::InbuiltType::Float => datafusion::DataType::Float32,
open_dds::types::InbuiltType::Boolean => datafusion::DataType::Boolean,
open_dds::types::InbuiltType::String => datafusion::DataType::Utf8,
};
Some(data_type)
}
resolved::QualifiedBaseType::Named(resolved::QualifiedTypeName::Custom(custom_type)) => {
if let Some(type_representation) = ndc_type_representation {
match type_representation {
ndc_models::TypeRepresentation::Boolean => Some(datafusion::DataType::Boolean),
ndc_models::TypeRepresentation::String => Some(datafusion::DataType::Utf8),
ndc_models::TypeRepresentation::Int8 => Some(datafusion::DataType::Int8),
ndc_models::TypeRepresentation::Int16 => Some(datafusion::DataType::Int16),
ndc_models::TypeRepresentation::Int32 => Some(datafusion::DataType::Int32),
ndc_models::TypeRepresentation::Int64 => Some(datafusion::DataType::Int64),
ndc_models::TypeRepresentation::Float32 => Some(datafusion::DataType::Float32),
ndc_models::TypeRepresentation::Float64 => Some(datafusion::DataType::Float64),
// Can't do anything better for BigInteger, so we just use String.
ndc_models::TypeRepresentation::BigInteger => Some(datafusion::DataType::Utf8),
// BigDecimal128 is not supported by arrow.
ndc_models::TypeRepresentation::BigDecimal => {
Some(datafusion::DataType::Float64)
}
ndc_models::TypeRepresentation::UUID => Some(datafusion::DataType::Utf8),
ndc_models::TypeRepresentation::Date => Some(datafusion::DataType::Date32),
ndc_models::TypeRepresentation::Timestamp => Some(
datafusion::DataType::Timestamp(datafusion::TimeUnit::Microsecond, None),
),
ndc_models::TypeRepresentation::TimestampTZ => Some(
datafusion::DataType::Timestamp(datafusion::TimeUnit::Microsecond, None),
),
ndc_models::TypeRepresentation::Enum { .. } => Some(datafusion::DataType::Utf8),
_ => None,
}
} else {
match custom_type.name.to_string().to_lowercase().as_str() {
"bool" => Some(datafusion::DataType::Boolean),
"int8" => Some(datafusion::DataType::Int8),
"int16" => Some(datafusion::DataType::Int16),
"int32" => Some(datafusion::DataType::Int32),
"int64" => Some(datafusion::DataType::Int64),
"float32" => Some(datafusion::DataType::Float32),
"float64" => Some(datafusion::DataType::Float64),
"varchar" => Some(datafusion::DataType::Utf8),
"text" => Some(datafusion::DataType::Utf8),
"timestamp" => Some(datafusion::DataType::Timestamp(
datafusion::TimeUnit::Microsecond,
None,
)),
"timestamptz" => Some(datafusion::DataType::Timestamp(
datafusion::TimeUnit::Microsecond,
None,
)),
// BigDecimal128 is not supported by arrow.
"bigdecimal" => Some(datafusion::DataType::Float64),
_ => None,
}
}
}
resolved::QualifiedBaseType::List(_) => None,
}
}
fn get_type_representation<'a>(
model: &'a resolved::Model,
field: &FieldName,
) -> Option<&'a ndc_models::TypeRepresentation> {
model
.source
.as_ref()
.and_then(|source| {
source
.type_mappings
.get(&model.data_type)
.map(|type_mapping| {
let resolved::TypeMapping::Object { field_mappings, .. } = type_mapping;
field_mappings
.get(field)
.map(|mapping| mapping.column_type_representation.as_ref())
})
})
.flatten()
.flatten()
}

View File

@ -1,44 +0,0 @@
use async_trait::async_trait;
use std::{any::Any, sync::Arc};
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
mod df {
pub(super) use datafusion::error::Result;
pub(super) use datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider};
}
use crate::catalog;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct Subgraph {
pub models: IndexMap<String, catalog::table::Model>,
}
pub struct OpenDDSchemaProvider {
pub(crate) tables: IndexMap<String, Arc<catalog::table::OpenDDTableProvider>>,
}
#[async_trait]
impl df::SchemaProvider for OpenDDSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.tables.keys().cloned().collect::<Vec<_>>()
}
async fn table(&self, name: &str) -> df::Result<Option<Arc<dyn df::TableProvider>>> {
Ok(self
.tables
.get(name)
.cloned()
.map(|table| table as Arc<dyn df::TableProvider>))
}
fn table_exist(&self, name: &str) -> bool {
self.tables.contains_key(name)
}
}

View File

@ -0,0 +1,62 @@
use async_trait::async_trait;
use std::{any::Any, sync::Arc};
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
mod datafusion {
pub(super) use datafusion::error::{DataFusionError, Result};
pub(super) use datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider};
}
use crate::catalog;
use super::model;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct Subgraph {
pub tables: IndexMap<String, Arc<catalog::model::ModelWithPermissions>>,
}
#[async_trait]
impl datafusion::SchemaProvider for catalog::model::WithSession<Subgraph> {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.value.tables.keys().cloned().collect::<Vec<_>>()
}
async fn table(
&self,
name: &str,
) -> datafusion::Result<Option<Arc<dyn datafusion::TableProvider>>> {
if let Some(model) = self.value.tables.get(name) {
let permission = model.permissions.get(&self.session.role).ok_or_else(|| {
datafusion::DataFusionError::Plan(format!(
"role {} does not have select permission for model {}",
self.session.role, model.model.name
))
})?;
let model_source = model.source.as_ref().ok_or_else(|| {
datafusion::DataFusionError::Plan(format!(
"model source should be configured for {}",
model.model.name
))
})?;
let table = model::Table::new_no_args(
model.model.clone(),
model_source.clone(),
permission.clone(),
);
Ok(Some(Arc::new(table) as Arc<dyn datafusion::TableProvider>))
} else {
Ok(None)
}
}
fn table_exist(&self, name: &str) -> bool {
self.value.tables.contains_key(name)
}
}

View File

@ -1,256 +0,0 @@
//! Describe a model for a SQL table and how to translate datafusion operations on the table
//! to ndc-spec queries.
use std::collections::HashMap;
use std::{any::Any, sync::Arc};
use async_trait::async_trait;
use datafusion::common::internal_err;
use hasura_authn_core::Session;
use indexmap::IndexMap;
use metadata_resolve::{self as resolved, Qualified, SelectPermission};
use open_dds::permissions::Role;
use open_dds::{
models::ModelName,
types::{CustomTypeName, FieldName},
};
use serde::{Deserialize, Serialize};
mod df {
pub(super) use datafusion::arrow::datatypes::Field;
pub(super) use datafusion::{
arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef},
datasource::{TableProvider, TableType},
execution::context::SessionState,
logical_expr::Expr,
physical_plan::ExecutionPlan,
};
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub(crate) struct TypePermission {
pub output: open_dds::permissions::TypeOutputPermission,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct TypePermissionsOfRole {
pub(crate) permissions: HashMap<Qualified<CustomTypeName>, TypePermission>,
}
fn get_type_representation<'a>(
model: &'a resolved::Model,
field: &FieldName,
) -> Option<&'a ndc_models::TypeRepresentation> {
model
.source
.as_ref()
.and_then(|source| {
source
.type_mappings
.get(&model.data_type)
.map(|type_mapping| {
let resolved::TypeMapping::Object { field_mappings, .. } = type_mapping;
field_mappings
.get(field)
.map(|mapping| mapping.column_type_representation.as_ref())
})
})
.flatten()
.flatten()
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct Model {
pub name: ModelName,
pub description: Option<String>,
// Datafusion table schema
pub schema: df::SchemaRef,
// For now, descriptions of fields
pub columns: IndexMap<String, Option<String>>,
// This is the entry point for the type mappings stored
// in ModelSource
pub data_type: Qualified<CustomTypeName>,
// The underlying source to execute ndc queries
pub source: Option<Arc<metadata_resolve::ModelSource>>,
// Permisisons for the model. Note that the type permissions will need to be retrieved from the
// global context
pub permissions: HashMap<Role, Arc<resolved::SelectPermission>>,
}
impl Model {
pub fn from_resolved_model(model: &resolved::ModelWithPermissions) -> Self {
let (schema, columns) = {
let mut columns = IndexMap::new();
let mut builder = df::SchemaBuilder::new();
for (field_name, field_definition) in &model.model.type_fields {
let ndc_type_representation = get_type_representation(&model.model, field_name);
let field_type =
to_arrow_type(&field_definition.field_type, ndc_type_representation);
if let Some(field_type) = field_type {
builder.push(df::Field::new(
field_name.to_string(),
field_type,
field_definition.field_type.nullable,
));
let description = if let Some(ndc_models::TypeRepresentation::Enum { one_of }) =
ndc_type_representation
{
// TODO: Instead of stuffing the possible enum values in description,
// surface them in the metadata tables.
Some(
field_definition
.description
.clone()
.unwrap_or_else(String::new)
+ &format!(" Possible values: {}", one_of.join(", ")),
)
} else {
field_definition.description.clone()
};
columns.insert(field_name.to_string(), description);
}
}
let fields = builder.finish().fields;
(df::SchemaRef::new(df::Schema::new(fields)), columns)
};
let permissions = model
.select_permissions
.iter()
.map(|(role, select_permission)| (role.clone(), Arc::new(select_permission.clone())))
.collect();
Model {
name: model.model.name.name.clone(),
description: model.model.raw.description.clone(),
schema,
data_type: model.model.data_type.clone(),
source: model
.model
.source
.as_ref()
.map(|source| Arc::new(source.clone())),
columns,
permissions,
}
}
}
/// Converts an opendd type to an arrow type.
/// TODO: need to handle complex types
#[allow(clippy::match_same_arms)]
fn to_arrow_type(
ty: &resolved::QualifiedTypeReference,
ndc_type_representation: Option<&ndc_models::TypeRepresentation>,
) -> Option<df::DataType> {
match &ty.underlying_type {
resolved::QualifiedBaseType::Named(resolved::QualifiedTypeName::Inbuilt(inbuilt_type)) => {
let data_type = match inbuilt_type {
open_dds::types::InbuiltType::ID => df::DataType::Utf8,
open_dds::types::InbuiltType::Int => df::DataType::Int32,
open_dds::types::InbuiltType::Float => df::DataType::Float32,
open_dds::types::InbuiltType::Boolean => df::DataType::Boolean,
open_dds::types::InbuiltType::String => df::DataType::Utf8,
};
Some(data_type)
}
resolved::QualifiedBaseType::Named(resolved::QualifiedTypeName::Custom(custom_type)) => {
if let Some(type_representation) = ndc_type_representation {
match type_representation {
ndc_models::TypeRepresentation::Boolean => Some(df::DataType::Boolean),
ndc_models::TypeRepresentation::String => Some(df::DataType::Utf8),
ndc_models::TypeRepresentation::Int8 => Some(df::DataType::Int8),
ndc_models::TypeRepresentation::Int16 => Some(df::DataType::Int16),
ndc_models::TypeRepresentation::Int32 => Some(df::DataType::Int32),
ndc_models::TypeRepresentation::Int64 => Some(df::DataType::Int64),
ndc_models::TypeRepresentation::Float32 => Some(df::DataType::Float32),
ndc_models::TypeRepresentation::Float64 => Some(df::DataType::Float64),
// Can't do anything better for BigInteger, so we just use String.
ndc_models::TypeRepresentation::BigInteger => Some(df::DataType::Utf8),
// BigDecimal128 is not supported by arrow.
ndc_models::TypeRepresentation::BigDecimal => Some(df::DataType::Float64),
ndc_models::TypeRepresentation::UUID => Some(df::DataType::Utf8),
ndc_models::TypeRepresentation::Date => Some(df::DataType::Date32),
ndc_models::TypeRepresentation::Timestamp => Some(df::DataType::Timestamp(
datafusion::arrow::datatypes::TimeUnit::Microsecond,
None,
)),
ndc_models::TypeRepresentation::TimestampTZ => Some(df::DataType::Timestamp(
datafusion::arrow::datatypes::TimeUnit::Microsecond,
None,
)),
ndc_models::TypeRepresentation::Enum { .. } => Some(df::DataType::Utf8),
_ => None,
}
} else {
match custom_type.name.to_string().to_lowercase().as_str() {
"bool" => Some(df::DataType::Boolean),
"int8" => Some(df::DataType::Int8),
"int16" => Some(df::DataType::Int16),
"int32" => Some(df::DataType::Int32),
"int64" => Some(df::DataType::Int64),
"float32" => Some(df::DataType::Float32),
"float64" => Some(df::DataType::Float64),
"varchar" => Some(df::DataType::Utf8),
"text" => Some(df::DataType::Utf8),
"timestamp" => Some(df::DataType::Timestamp(
datafusion::arrow::datatypes::TimeUnit::Microsecond,
None,
)),
"timestamptz" => Some(df::DataType::Timestamp(
datafusion::arrow::datatypes::TimeUnit::Microsecond,
None,
)),
// BigDecimal128 is not supported by arrow.
"bigdecimal" => Some(df::DataType::Float64),
_ => None,
}
}
}
resolved::QualifiedBaseType::List(_) => None,
}
}
#[derive(Debug, Clone)]
pub(crate) struct OpenDDTableProvider {
pub(crate) session: Arc<Session>,
pub(crate) http_context: Arc<execute::HttpContext>,
pub(crate) name: ModelName,
pub(crate) data_type: Qualified<CustomTypeName>,
pub(crate) source: Option<Arc<metadata_resolve::ModelSource>>,
pub(crate) schema: df::SchemaRef,
pub(crate) select_permission: Option<Arc<SelectPermission>>,
pub(crate) type_permissions: Option<Arc<TypePermissionsOfRole>>,
}
#[async_trait]
impl df::TableProvider for OpenDDTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> df::SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> df::TableType {
df::TableType::Base
}
async fn scan(
&self,
_state: &df::SessionState,
_projection: Option<&Vec<usize>>,
// filters and limit can be used here to inject some push-down operations if needed
_filters: &[df::Expr],
_limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn df::ExecutionPlan>> {
internal_err!("scan shouldn't be called")
}
}

View File

@ -13,7 +13,6 @@ use tracing_util::{ErrorVisibility, SpanVisibility, Successful, TraceableError};
pub use datafusion::execution::context::SessionContext;
pub(crate) mod analyzer;
pub(crate) mod optimizer;
pub(crate) mod planner;
@ -51,7 +50,7 @@ impl TraceableError for SqlExecutionError {
/// Executes an SQL Request using the Apache DataFusion query engine.
pub async fn execute_sql(
context: &crate::catalog::Context,
catalog: Arc<crate::catalog::Catalog>,
session: Arc<Session>,
http_context: Arc<execute::HttpContext>,
request: &SqlRequest,
@ -63,7 +62,7 @@ pub async fn execute_sql(
"Create a datafusion SessionContext",
SpanVisibility::Internal,
|| {
let session = context.create_session_context(&session, &http_context);
let session = catalog.create_session_context(&session, &http_context);
Successful::new(session)
},
)

View File

@ -1,3 +1,5 @@
mod projection_pushdown;
mod replace_table_scan;
pub(crate) use projection_pushdown::NDCPushDownProjection;
pub(crate) use replace_table_scan::ReplaceTableScan;

View File

@ -0,0 +1,61 @@
use datafusion::{
common::{internal_err, tree_node::Transformed},
datasource::source_as_provider,
error::Result,
logical_expr::{LogicalPlan, TableScan},
optimizer::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule},
};
use crate::catalog::model;
pub struct ReplaceTableScan {}
impl OptimizerRule for ReplaceTableScan {
fn try_optimize(
&self,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called ReplaceTableScan::rewrite")
}
fn name(&self) -> &str {
"replace_table_scan"
}
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}
fn supports_rewrite(&self) -> bool {
true
}
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::TableScan(TableScan {
table_name: _,
ref source,
projection: _,
ref projected_schema,
filters: _,
fetch: _,
}) => {
if let Some(opendd_table) = source_as_provider(source)?
.as_ref()
.as_any()
.downcast_ref::<model::Table>()
{
let plan = opendd_table.to_logical_plan(projected_schema.clone())?;
Ok(Transformed::yes(plan))
} else {
Ok(Transformed::no(plan))
}
}
_ => Ok(Transformed::no(plan)),
}
}
}

View File

@ -1,3 +1,4 @@
use hasura_authn_core::Session;
use std::{collections::BTreeMap, sync::Arc};
use datafusion::{
@ -16,13 +17,14 @@ use crate::plan::NDCPushDown;
use async_trait::async_trait;
pub(crate) struct NDCQueryPlanner {
pub(crate) default_schema: Option<Arc<String>>,
pub(crate) catalog: Arc<crate::catalog::OpenDDCatalogProvider>,
pub(crate) struct OpenDDQueryPlanner {
pub(crate) session: Arc<Session>,
pub(crate) http_context: Arc<execute::HttpContext>,
pub(crate) catalog: Arc<crate::catalog::Catalog>,
}
#[async_trait]
impl QueryPlanner for NDCQueryPlanner {
impl QueryPlanner for OpenDDQueryPlanner {
/// Given a `LogicalPlan` created from above, create an
/// `ExecutionPlan` suitable for execution
async fn create_physical_plan(
@ -33,7 +35,8 @@ impl QueryPlanner for NDCQueryPlanner {
// Teach the default physical planner how to plan TopK nodes.
let physical_planner =
DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(NDCPushDownPlanner {
default_schema: self.default_schema.clone(),
session: self.session.clone(),
http_context: self.http_context.clone(),
catalog: self.catalog.clone(),
})]);
// Delegate most work of physical planning to the default physical planner
@ -44,8 +47,9 @@ impl QueryPlanner for NDCQueryPlanner {
}
pub(crate) struct NDCPushDownPlanner {
pub(crate) default_schema: Option<Arc<String>>,
pub(crate) catalog: Arc<crate::catalog::OpenDDCatalogProvider>,
pub(crate) session: Arc<Session>,
pub(crate) http_context: Arc<execute::HttpContext>,
pub(crate) catalog: Arc<crate::catalog::Catalog>,
}
#[async_trait]
@ -62,44 +66,24 @@ impl ExtensionPlanner for NDCPushDownPlanner {
if let Some(ndc_node) = node.as_any().downcast_ref::<crate::plan::NDCQuery>() {
assert_eq!(logical_inputs.len(), 0, "Inconsistent number of inputs");
assert_eq!(physical_inputs.len(), 0, "Inconsistent number of inputs");
let table = self
let type_permissions = self
.catalog
.get(
self.default_schema.as_ref().map(|s| s.as_str()),
&ndc_node.table,
)
.type_permissions
.get(&self.session.role)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"table provider not found for replace_table_scan: {}",
&ndc_node.table
DataFusionError::Plan(format!(
"role {} does not have permission to select any fields of model {}",
self.session.role, ndc_node.model.name
))
})?;
let model_source = table.source.as_ref().ok_or_else(|| {
DataFusionError::Plan(format!(
"model source should be configured for {}",
table.name
))
})?;
let select_permission = table.select_permission.as_ref().ok_or_else(|| {
DataFusionError::Plan(format!(
"role {} does not have select permission for model {}",
table.session.role, table.name
))
})?;
let type_permissions = table.type_permissions.as_ref().ok_or_else(|| {
DataFusionError::Plan(format!(
"role {} does not have permission to select any fields of model {}",
table.session.role, table.name
))
})?;
let base_type_allowed_fields = &type_permissions
.permissions
.get(&table.data_type)
.get(&ndc_node.model.data_type)
.ok_or_else(|| {
DataFusionError::Plan(format!(
"role {} has permission to select model {} but does not have permission \
to select fields of the model's underlying type {}",
table.session.role, table.name, table.data_type
self.session.role, ndc_node.model.name, ndc_node.model.data_type
))
})?
.output
@ -117,23 +101,23 @@ impl ExtensionPlanner for NDCPushDownPlanner {
Ok(())
} else {
Err(DataFusionError::Plan(format!(
"role {} does not have permission to select the field {} from type {} of model {}",
table.session.role, field_name, table.data_type, table.name
)))
"role {} does not have permission to select the field {} from type {} of model {}",
self.session.role, field_name, ndc_node.model.data_type, ndc_node.model.name
)))
}?;
}
let mut usage_counts = ir::UsagesCounts::default();
let mut relationships = BTreeMap::new();
let permission_filter = match &select_permission.filter {
let permission_filter = match &ndc_node.permission.filter {
FilterPermission::AllowAll => Ok::<_, DataFusionError>(None),
FilterPermission::Filter(filter) => {
let filter_ir = ir::process_model_predicate(
&model_source.data_connector,
&model_source.type_mappings,
&ndc_node.model_source.data_connector,
&ndc_node.model_source.type_mappings,
filter,
&table.session.variables,
&self.session.variables,
&mut usage_counts,
)
.map_err(|e| {
@ -150,8 +134,10 @@ impl ExtensionPlanner for NDCPushDownPlanner {
))
},
)?;
// TODO: this thing has to change, need to be pushed into the
// execution plan. We shouldn't be running this in the planning phase
let filter =
execute::plan::resolve_expression(filter_plan, &table.http_context.clone())
execute::plan::resolve_expression(filter_plan, &self.http_context.clone())
.await
.map_err(|e| {
DataFusionError::Internal(format!(
@ -165,13 +151,14 @@ impl ExtensionPlanner for NDCPushDownPlanner {
// ndc_node.filter = permission_filter;
// ndc_node.collection_relationships = relationships;
let ndc_pushdown = NDCPushDown::new(
table.http_context.clone(),
self.http_context.clone(),
ndc_node.schema.inner().clone(),
ndc_node.data_source_name.as_ref().clone(),
ndc_node.model_source.collection.clone(),
ndc_node.arguments.clone(),
ndc_node.fields.clone(),
permission_filter,
relationships,
Arc::new(model_source.data_connector.clone()),
Arc::new(ndc_node.model_source.data_connector.clone()),
);
Ok(Some(Arc::new(ndc_pushdown)))
} else {

View File

@ -1,4 +1,6 @@
use core::fmt;
use std::{any::Any, collections::BTreeMap, hash::Hash, sync::Arc};
use datafusion::{
arrow::{
array::RecordBatch, datatypes::SchemaRef, error::ArrowError, json::reader as arrow_json,
@ -11,17 +13,21 @@ use datafusion::{
stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionMode,
ExecutionPlan, Partitioning, PlanProperties,
},
sql::TableReference,
};
use futures::TryFutureExt;
use indexmap::IndexMap;
use std::{any::Any, collections::BTreeMap, hash::Hash, sync::Arc};
use metadata_resolve::{self as resolved};
use open_dds::identifier::Identifier;
use open_dds::{
arguments::ArgumentName,
types::{DataConnectorArgumentName, FieldName},
};
use tracing_util::{FutureExt, SpanVisibility, TraceableError};
use execute::{
plan::{
self, Relationship, ResolvedField, ResolvedFilterExpression, ResolvedQueryExecutionPlan,
ResolvedQueryNode,
self, Argument, Relationship, ResolvedField, ResolvedFilterExpression,
ResolvedQueryExecutionPlan, ResolvedQueryNode,
},
HttpContext,
};
@ -54,23 +60,92 @@ impl TraceableError for ExecutionPlanError {
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct NDCQuery {
pub(crate) table: TableReference,
pub(crate) model: Arc<crate::catalog::model::Model>,
pub(crate) model_source: Arc<resolved::ModelSource>,
pub(crate) arguments: BTreeMap<DataConnectorArgumentName, serde_json::Value>,
pub(crate) permission: Arc<resolved::SelectPermission>,
pub(crate) fields: IndexMap<NdcFieldAlias, DataConnectorColumnName>,
pub(crate) data_source_name: Arc<CollectionName>,
pub(crate) schema: DFSchemaRef,
}
// TODO: fix this
impl Hash for NDCQuery {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.data_source_name.hash(state);
self.model_source.data_connector.name.hash(state);
self.model_source.collection.hash(state);
format!("{:#?}", self.fields).hash(state);
self.schema.hash(state);
}
}
impl Eq for NDCQuery {}
impl NDCQuery {
pub(crate) fn new(
model: Arc<crate::catalog::model::Model>,
model_source: Arc<resolved::ModelSource>,
// TODO: wip: arguments have to be converted to ndc arguments using argument mapping
_arguments: &BTreeMap<ArgumentName, serde_json::Value>,
permission: Arc<resolved::SelectPermission>,
projected_schema: DFSchemaRef,
) -> datafusion::error::Result<Self> {
let mut ndc_fields = IndexMap::new();
let base_type_fields = {
let base_type_mapping = model_source
.type_mappings
.get(&model.data_type)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"couldn't fetch type_mapping of type {} for model {}",
model.data_type, model.name
))
})?;
match base_type_mapping {
resolved::TypeMapping::Object {
ndc_object_type_name: _,
field_mappings,
} => field_mappings,
}
};
for field in projected_schema.fields() {
let field_name = {
let field_name = Identifier::new(field.name().clone()).map_err(|e| {
DataFusionError::Internal(format!(
"field name conversion failed {}: {}",
field.name(),
e
))
})?;
FieldName::new(field_name)
};
let ndc_field = {
base_type_fields
.get(&field_name)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"couldn't fetch field mapping of field {} in type {} for model {}",
field_name, model.data_type, model.name
))
})
.map(|field_mapping| field_mapping.column.clone())
}?;
ndc_fields.insert(
NdcFieldAlias::from(field.name().as_str()),
DataConnectorColumnName::from(ndc_field.as_str()),
);
}
let ndc_query_node = NDCQuery {
model,
model_source,
// TODO, use source mapping to construct this
arguments: BTreeMap::new(),
fields: ndc_fields,
schema: projected_schema,
permission,
};
Ok(ndc_query_node)
}
pub(crate) fn project(
mut self,
schema: DFSchemaRef,
@ -115,10 +190,20 @@ impl UserDefinedLogicalNodeCore for NDCQuery {
/// For example: `TopK: k=10`
fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
let projection = format!(
", projection=[{}]",
self.fields
.keys()
.map(std::string::ToString::to_string)
.collect::<Vec<_>>()
.join(",")
);
write!(
f,
"NDCQuery: data_source_name={}, fields={:#?}",
self.data_source_name, self.fields
"NDCQuery: collection={}:{}:{}{projection}",
self.model_source.data_connector.name.subgraph,
self.model_source.data_connector.name.name,
self.model_source.collection,
)
}
@ -135,6 +220,7 @@ impl UserDefinedLogicalNodeCore for NDCQuery {
pub(crate) struct NDCPushDown {
http_context: Arc<execute::HttpContext>,
collection_name: CollectionName,
arguments: BTreeMap<DataConnectorArgumentName, serde_json::Value>,
fields: IndexMap<NdcFieldAlias, DataConnectorColumnName>,
filter: Option<ResolvedFilterExpression>,
collection_relationships: BTreeMap<NdcRelationshipName, Relationship>,
@ -148,6 +234,7 @@ impl NDCPushDown {
http_context: Arc<HttpContext>,
schema: SchemaRef,
collection_name: CollectionName,
arguments: BTreeMap<DataConnectorArgumentName, serde_json::Value>,
fields: IndexMap<NdcFieldAlias, DataConnectorColumnName>,
filter: Option<ResolvedFilterExpression>,
collection_relationships: BTreeMap<NdcRelationshipName, Relationship>,
@ -157,6 +244,7 @@ impl NDCPushDown {
Self {
http_context,
collection_name,
arguments,
fields,
filter,
collection_relationships,
@ -243,7 +331,18 @@ impl ExecutionPlan for NDCPushDown {
predicate: self.filter.clone(),
},
collection: self.collection_name.clone(),
arguments: BTreeMap::new(),
arguments: self
.arguments
.iter()
.map(|(argument, value)| {
(
argument.clone(),
Argument::Literal {
value: value.clone(),
},
)
})
.collect(),
collection_relationships: self.collection_relationships.clone(),
variables: None,
data_connector: &self.data_connector,