Experimental SQL interface (#742)

Adds a very experimental SQL interface to v3-engine for GenAI use cases.

---------

Co-authored-by: Abhinav Gupta <127770473+abhinav-hasura@users.noreply.github.com>
Co-authored-by: Gil Mizrahi <gil@gilmi.net>
Co-authored-by: Anon Ray <ecthiender@users.noreply.github.com>
V3_GIT_ORIGIN_REV_ID: 077779ec4e7843abdffdac1ed6aa655210649b93
This commit is contained in:
Vamshi Surabhi 2024-06-26 00:15:49 +05:30 committed by hasura-bot
parent 91ec7bafd1
commit 8af78227a0
30 changed files with 3257 additions and 85 deletions

1301
v3/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -16,6 +16,7 @@ members = [
"crates/open-dds",
"crates/query-usage-analytics",
"crates/schema",
"crates/sql",
"crates/utils/*",
]

View File

@ -27,13 +27,13 @@ lang-graphql = { path = "../lang-graphql" }
open-dds = { path = "../open-dds" }
opendds-derive = { path = "../utils/opendds-derive" }
schema = { path = "../schema" }
sql = { path = "../sql" }
tracing-util = { path = "../utils/tracing-util" }
metadata-resolve = {path = "../metadata-resolve" }
anyhow = { workspace = true }
axum = { workspace = true }
base64 = { workspace = true }
bincode = { workspace = true }
clap = { workspace = true, features = ["derive", "env"] }
json_value_merge = { workspace = true }
reqwest = { workspace = true, features = ["json", "multipart"] }

View File

@ -13,6 +13,7 @@ use axum::{
Extension, Json, Router,
};
use clap::Parser;
use reqwest::header::CONTENT_TYPE;
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use tracing_util::{
@ -61,6 +62,9 @@ struct ServerOptions {
/// The port on which the server listens.
#[arg(long, value_name = "PORT", env = "PORT", default_value_t = DEFAULT_PORT)]
port: u16,
/// Enables the '/v1/sql' endpoint
#[arg(long, env = "ENABLE_SQL_INTERFACE")]
enable_sql_interface: bool,
/// Enable CORS. Support preflight request and include related headers in responses.
#[arg(long, env = "ENABLE_CORS")]
enable_cors: bool,
@ -88,6 +92,7 @@ struct EngineState {
http_context: HttpContext,
schema: gql::schema::Schema<GDS>,
auth_config: AuthConfig,
sql_context: sql::catalog::Context,
}
#[tokio::main]
@ -156,7 +161,7 @@ async fn shutdown_signal() {
enum StartupError {
#[error("could not read the auth config - {0}")]
ReadAuth(anyhow::Error),
#[error("could not read the schema - {0}")]
#[error("failed to build engine state - {0}")]
ReadSchema(anyhow::Error),
}
@ -174,6 +179,8 @@ struct EngineRouter {
/// The metadata routes for the introspection metadata file.
/// Contains /metadata and /metadata-hash routes.
metadata_routes: Option<Router>,
/// Routes for the SQL interface
sql_routes: Option<Router>,
/// The CORS layer for the engine.
cors_layer: Option<CorsLayer>,
}
@ -233,6 +240,7 @@ impl EngineRouter {
Self {
base_router: base_routes,
metadata_routes: None,
sql_routes: None,
cors_layer: None,
}
}
@ -257,6 +265,25 @@ impl EngineRouter {
Ok(())
}
fn add_sql_route(&mut self, state: Arc<EngineState>) {
let sql_routes = Router::new()
.route("/v1/sql", post(handle_sql_request))
.layer(axum::middleware::from_fn(
hasura_authn_core::resolve_session,
))
.layer(axum::middleware::from_fn_with_state(
state.clone(),
authentication_middleware,
))
.layer(axum::middleware::from_fn(sql_request_tracing_middleware))
// *PLEASE DO NOT ADD ANY MIDDLEWARE
// BEFORE THE `explain_request_tracing_middleware`*
// Refer to it for more details.
.layer(TraceLayer::new_for_http())
.with_state(state);
self.sql_routes = Some(sql_routes);
}
fn add_cors_layer(&mut self, allow_origin: &[String]) {
self.cors_layer = Some(cors::build_cors_layer(allow_origin));
}
@ -264,6 +291,10 @@ impl EngineRouter {
fn into_make_service(self) -> axum::routing::IntoMakeService<Router> {
let mut app = self.base_router;
// Merge the metadata routes if they exist.
if let Some(sql_routes) = self.sql_routes {
app = app.merge(sql_routes);
}
// Merge the metadata routes if they exist.
if let Some(metadata_routes) = self.metadata_routes {
app = app.merge(metadata_routes);
}
@ -279,25 +310,20 @@ impl EngineRouter {
#[allow(clippy::print_stdout)]
async fn start_engine(server: &ServerOptions) -> Result<(), StartupError> {
let auth_config =
read_auth_config(&server.authn_config_path).map_err(StartupError::ReadAuth)?;
let metadata_resolve_flags = resolve_unstable_features(&server.unstable_features);
let schema = read_schema(&server.metadata_path, &metadata_resolve_flags)
let state = build_state(
&server.authn_config_path,
&server.metadata_path,
&metadata_resolve_flags,
)
.map_err(StartupError::ReadSchema)?;
let http_context = HttpContext {
client: reqwest::Client::new(),
ndc_response_size_limit: None,
};
let state = Arc::new(EngineState {
http_context,
schema,
auth_config,
});
let mut engine_router = EngineRouter::new(state.clone());
let mut engine_router = EngineRouter::new(state);
if server.enable_sql_interface {
engine_router.add_sql_route(state.clone());
}
// If `--introspection-metadata` is specified we also serve the file indicated on `/metadata`
// and its hash on `/metadata-hash`.
@ -390,6 +416,33 @@ async fn explain_request_tracing_middleware<B: Send>(
.response
}
/// Middleware to start tracing of the `/v1/sql` request.
/// This middleware must be active for the entire duration
/// of the request i.e. this middleware should be the
/// entry point and the exit point of the SQL request.
async fn sql_request_tracing_middleware<B: Send>(
request: Request<B>,
next: Next<B>,
) -> axum::response::Response {
let tracer = tracing_util::global_tracer();
let path = "/v1/sql";
tracer
.in_span_async_with_parent_context(
path,
path,
SpanVisibility::User,
&request.headers().clone(),
|| {
Box::pin(async move {
let response = next.run(request).await;
TraceableHttpResponse::new(response, path)
})
},
)
.await
.response
}
#[derive(Debug, thiserror::Error)]
enum AuthError {
#[error("JWT auth error: {0}")]
@ -540,16 +593,78 @@ async fn handle_explain_request(
response
}
fn read_schema(
/// Handle a SQL request and execute it.
async fn handle_sql_request(
State(state): State<Arc<EngineState>>,
Extension(session): Extension<Session>,
Json(request): Json<sql::execute::SqlRequest>,
) -> axum::response::Response {
let tracer = tracing_util::global_tracer();
let response = tracer
.in_span_async(
"handle_sql_request",
"Handle SQL Request",
SpanVisibility::User,
|| {
Box::pin(async {
sql::execute::execute_sql(
&state.sql_context,
Arc::new(session),
Arc::new(state.http_context.clone()),
&request,
)
.await
})
},
)
.await;
// Set the span as error if the response contains an error
set_status_on_current_span(&response);
match response {
Ok(r) => {
let mut response = (axum::http::StatusCode::OK, r).into_response();
response.headers_mut().insert(
CONTENT_TYPE,
axum::http::HeaderValue::from_static("application/json"),
);
response
}
Err(e) => (
axum::http::StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
/// Build the engine state - include auth, metadata, and sql context.
fn build_state(
authn_config_path: &PathBuf,
metadata_path: &PathBuf,
metadata_resolve_flags: &metadata_resolve::MetadataResolveFlagsInternal,
) -> Result<gql::schema::Schema<GDS>, anyhow::Error> {
) -> Result<Arc<EngineState>, anyhow::Error> {
let auth_config = read_auth_config(authn_config_path).map_err(StartupError::ReadAuth)?;
let raw_metadata = std::fs::read_to_string(metadata_path)?;
let metadata = open_dds::Metadata::from_json_str(&raw_metadata)?;
Ok(engine::build::build_schema(
metadata,
metadata_resolve_flags,
)?)
let resolved_metadata = metadata_resolve::resolve(metadata, metadata_resolve_flags)?;
let http_context = HttpContext {
client: reqwest::Client::new(),
ndc_response_size_limit: None,
};
let sql_context = sql::catalog::Context::from_metadata(&resolved_metadata);
let schema = schema::GDS {
metadata: resolved_metadata,
}
.build_schema()?;
let state = Arc::new(EngineState {
http_context,
schema,
auth_config,
sql_context,
});
Ok(state)
}
fn read_auth_config(path: &PathBuf) -> Result<AuthConfig, anyhow::Error> {

View File

@ -10,14 +10,15 @@ pub enum BuildError {
InvalidMetadata(#[from] metadata_resolve::Error),
#[error("unable to build schema: {0}")]
UnableToBuildSchema(#[from] schema::Error),
#[error("unable to encode schema: {0}")]
EncodingError(#[from] bincode::Error),
}
pub fn build_schema(
metadata: open_dds::Metadata,
metadata_resolve_flags: &metadata_resolve::MetadataResolveFlagsInternal,
) -> Result<gql_schema::Schema<GDS>, BuildError> {
let gds = schema::GDS::new(metadata, metadata_resolve_flags)?;
let resolved_metadata = metadata_resolve::resolve(metadata, metadata_resolve_flags)?;
let gds = schema::GDS {
metadata: resolved_metadata,
};
Ok(gds.build_schema()?)
}

View File

@ -75,7 +75,7 @@ pub(crate) fn get_argument_presets(
}
}
pub(crate) fn process_model_predicate<'s>(
pub fn process_model_predicate<'s>(
model_predicate: &'s metadata_resolve::ModelPredicate,
session_variables: &SessionVariables,
relationships: &mut BTreeMap<NDCRelationshipName, LocalModelRelationshipInfo<'s>>,

View File

@ -31,7 +31,7 @@ use schema::{Annotation, BooleanExpressionAnnotation, InputAnnotation, ModelInpu
use schema::{CommandRelationshipAnnotation, CommandTargetSource};
#[derive(Debug, Serialize)]
pub(crate) struct LocalModelRelationshipInfo<'s> {
pub struct LocalModelRelationshipInfo<'s> {
pub relationship_name: &'s RelationshipName,
pub relationship_type: &'s RelationshipType,
pub source_type: &'s Qualified<CustomTypeName>,

View File

@ -1,13 +1,14 @@
mod error;
mod explain;
mod global_id;
mod ir;
mod model_tracking;
mod ndc;
pub mod ir;
pub mod model_tracking;
pub mod ndc;
mod plan;
mod process_response;
mod remote_joins;
pub use plan::process_model_relationship_definition;
use plan::ExecuteQueryResult;
use thiserror::Error;
@ -28,9 +29,11 @@ use tracing_util::{
// we explicitly export things used by other crates
pub use explain::execute_explain;
pub use explain::types::{redact_ndc_explain, ExplainResponse};
pub use ndc::fetch_from_data_connector;
pub use plan::{execute_mutation_plan, execute_query_plan, generate_request_plan, RequestPlan};
/// Context for making HTTP requests
#[derive(Debug, Clone)]
pub struct HttpContext {
/// The HTTP client to use for making requests
pub client: reqwest::Client,

View File

@ -56,7 +56,7 @@ pub async fn execute_ndc_query<'n, 's>(
.await
}
pub(crate) async fn fetch_from_data_connector<'s>(
pub async fn fetch_from_data_connector<'s>(
http_context: &HttpContext,
query_request: &ndc_models::QueryRequest,
data_connector: &metadata_resolve::DataConnectorLink,

View File

@ -4,6 +4,8 @@ mod model_selection;
mod relationships;
pub(crate) mod selection_set;
pub use relationships::process_model_relationship_definition;
use gql::normalized_ast;
use gql::schema::NamespacedGetter;
use hasura_authn_core::Role;

View File

@ -73,7 +73,7 @@ pub(crate) fn collect_relationships(
Ok(())
}
pub(crate) fn process_model_relationship_definition(
pub fn process_model_relationship_definition(
relationship_info: &LocalModelRelationshipInfo,
) -> Result<ndc_models::Relationship, error::Error> {
let &LocalModelRelationshipInfo {

View File

@ -57,6 +57,7 @@
"type": "named",
"name": "int4"
},
"column_type_representation": null,
"argument_mappings": {}
},
"author_id": {
@ -65,6 +66,7 @@
"type": "named",
"name": "int4"
},
"column_type_representation": null,
"argument_mappings": {}
},
"title": {
@ -73,6 +75,7 @@
"type": "named",
"name": "varchar"
},
"column_type_representation": null,
"argument_mappings": {}
}
}
@ -177,6 +180,7 @@
"type": "named",
"name": "int8"
},
"column_type_representation": null,
"argument_mappings": {}
},
"first_name": {
@ -185,6 +189,7 @@
"type": "named",
"name": "varchar"
},
"column_type_representation": null,
"argument_mappings": {}
},
"last_name": {
@ -193,6 +198,7 @@
"type": "named",
"name": "varchar"
},
"column_type_representation": null,
"argument_mappings": {}
}
}
@ -297,6 +303,7 @@
"type": "named",
"name": "int4"
},
"column_type_representation": null,
"argument_mappings": {}
},
"author_id": {
@ -305,6 +312,7 @@
"type": "named",
"name": "int4"
},
"column_type_representation": null,
"argument_mappings": {}
},
"title": {
@ -313,6 +321,7 @@
"type": "named",
"name": "varchar"
},
"column_type_representation": null,
"argument_mappings": {}
}
}
@ -656,6 +665,7 @@
"type": "named",
"name": "int8"
},
"column_type_representation": null,
"argument_mappings": {}
},
"first_name": {
@ -664,6 +674,7 @@
"type": "named",
"name": "varchar"
},
"column_type_representation": null,
"argument_mappings": {}
},
"last_name": {
@ -672,6 +683,7 @@
"type": "named",
"name": "varchar"
},
"column_type_representation": null,
"argument_mappings": {}
}
}
@ -711,6 +723,7 @@
"type": "named",
"name": "int4"
},
"column_type_representation": null,
"argument_mappings": {}
},
"author_id": {
@ -719,6 +732,7 @@
"type": "named",
"name": "int4"
},
"column_type_representation": null,
"argument_mappings": {}
},
"title": {
@ -727,6 +741,7 @@
"type": "named",
"name": "varchar"
},
"column_type_representation": null,
"argument_mappings": {}
}
}
@ -803,6 +818,7 @@
"type": "named",
"name": "int4"
},
"column_type_representation": null,
"argument_mappings": {}
},
"author_id": {
@ -811,6 +827,7 @@
"type": "named",
"name": "int4"
},
"column_type_representation": null,
"argument_mappings": {}
},
"title": {
@ -819,6 +836,7 @@
"type": "named",
"name": "varchar"
},
"column_type_representation": null,
"argument_mappings": {}
}
}
@ -858,6 +876,7 @@
"type": "named",
"name": "int8"
},
"column_type_representation": null,
"argument_mappings": {}
},
"first_name": {
@ -866,6 +885,7 @@
"type": "named",
"name": "varchar"
},
"column_type_representation": null,
"argument_mappings": {}
},
"last_name": {
@ -874,6 +894,7 @@
"type": "named",
"name": "varchar"
},
"column_type_representation": null,
"argument_mappings": {}
}
}
@ -950,6 +971,7 @@
"type": "named",
"name": "int4"
},
"column_type_representation": null,
"argument_mappings": {}
},
"author_id": {
@ -958,6 +980,7 @@
"type": "named",
"name": "int4"
},
"column_type_representation": null,
"argument_mappings": {}
},
"title": {
@ -966,6 +989,7 @@
"type": "named",
"name": "varchar"
},
"column_type_representation": null,
"argument_mappings": {}
}
}
@ -1005,6 +1029,7 @@
"type": "named",
"name": "int4"
},
"column_type_representation": null,
"argument_mappings": {}
},
"author_id": {
@ -1013,6 +1038,7 @@
"type": "named",
"name": "int4"
},
"column_type_representation": null,
"argument_mappings": {}
},
"title": {
@ -1021,6 +1047,7 @@
"type": "named",
"name": "varchar"
},
"column_type_representation": null,
"argument_mappings": {}
}
}

View File

@ -31,7 +31,7 @@ pub use stages::commands::Command;
pub use stages::data_connectors;
pub use stages::data_connectors::DataConnectorLink;
pub use stages::model_permissions::{
FilterPermission, ModelPredicate, ModelTargetSource, ModelWithPermissions,
FilterPermission, ModelPredicate, ModelTargetSource, ModelWithPermissions, SelectPermission,
};
pub use stages::models::{ConnectorArgumentName, Model, ModelSource};

View File

@ -9,6 +9,7 @@ pub use types::{
ResolvedApolloFederationObjectKey, ResolvedObjectApolloFederationConfig, TypeMapping,
};
use crate::helpers::ndc_validation::get_underlying_named_type;
use crate::helpers::types::{mk_name, store_new_graphql_type};
use crate::stages::data_connectors;
@ -328,9 +329,17 @@ pub fn resolve_data_connector_type_mapping(
)
};
let source_column = get_column(ndc_object_type, field_name, resolved_field_mapping_column)?;
let underlying_column_type = get_underlying_named_type(&source_column.r#type);
let column_type_representation = data_connector_context
.inner
.schema
.scalar_types
.get(underlying_column_type)
.and_then(|scalar_type| scalar_type.representation.clone());
let resolved_field_mapping = FieldMapping {
column: resolved_field_mapping_column.clone(),
column_type: source_column.r#type.clone(),
column_type_representation,
argument_mappings: resolved_argument_mappings.0,
};

View File

@ -129,6 +129,7 @@ pub struct ResolvedApolloFederationObjectKey {
pub struct FieldMapping {
pub column: DataConnectorColumnName,
pub column_type: ndc_models::Type,
pub column_type_representation: Option<ndc_models::TypeRepresentation>,
pub argument_mappings: BTreeMap<ArgumentName, DataConnectorArgumentName>,
}

View File

@ -989,7 +989,7 @@ pub enum TypeMappingValidationError {
unknown_ndc_field_type_name: String,
},
#[error("ndc validation error: {0}")]
NDCValidationError(NDCValidationError),
NDCValidationError(#[from] NDCValidationError),
}
impl From<AggregateExpressionError> for Error {

24
v3/crates/sql/Cargo.toml Normal file
View File

@ -0,0 +1,24 @@
[package]
name = "sql"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
metadata-resolve = {path = "../metadata-resolve" }
open-dds = { path = "../open-dds" }
schema = { path = "../schema" }
execute = { path = "../execute" }
tracing-util = { path = "../utils/tracing-util" }
hasura-authn-core = { path = "../auth/hasura-authn-core" }
ndc-models = { workspace = true }
indexmap = { workspace = true }
datafusion = { version = "39.0.0", features = ["serde"] }
async-trait = "0.1.80"
futures = "0.3.30"
serde = { workspace = true, features = ["rc"] }
thiserror = { workspace = true }
[lints]
workspace = true

12
v3/crates/sql/readme.md Normal file
View File

@ -0,0 +1,12 @@
# SQL Interface
An experimental SQL interface over OpenDD models. This is mostly targeted at AI
use cases for now - GenAI models are better at generating SQL queries than
GraphQL queries.
This is implemented using the Apache DataFusion Query Engine by deriving the SQL
metadata for datafusion from Open DDS metadata. As the implementation currently
stands, once we get a `LogicalPlan` from datafusion we replace `TableScan`s with
NDC queries to the underlying connector. There is a rudimentary optimizer that
pushes down projections to the ndc query so that we don't fetch all the columns
of a collection.

View File

@ -0,0 +1,259 @@
use std::{any::Any, collections::HashMap, sync::Arc};
use ::datafusion::{
execution::{context::SessionState, runtime_env::RuntimeEnv},
sql::TableReference,
};
use async_trait::async_trait;
use hasura_authn_core::Session;
use indexmap::IndexMap;
use metadata_resolve::{self as resolved};
use open_dds::permissions::Role;
use schema::OpenDDSchemaProvider;
use serde::{Deserialize, Serialize};
mod datafusion {
pub(super) use datafusion::{
catalog::{schema::SchemaProvider, CatalogProvider},
datasource::TableProvider,
error::Result,
prelude::{SessionConfig, SessionContext},
scalar::ScalarValue,
};
}
pub mod introspection;
pub mod schema;
pub mod table;
/// The context in which to compile and execute SQL queries.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct Context {
pub(crate) subgraphs: IndexMap<String, schema::Subgraph>,
pub(crate) type_permissions: HashMap<Role, Arc<table::TypePermissionsOfRole>>,
pub(crate) introspection: introspection::Introspection,
}
impl Context {
/// Derive a SQL Context from resolved Open DDS metadata.
pub fn from_metadata(metadata: &resolved::Metadata) -> Self {
let mut subgraphs = IndexMap::new();
for (model_name, model) in &metadata.models {
let schema_name = &model_name.subgraph;
let table_name = &model_name.name;
let subgraph =
subgraphs
.entry(schema_name.clone())
.or_insert_with(|| schema::Subgraph {
models: IndexMap::new(),
});
subgraph.models.insert(
table_name.to_string(),
table::Model::from_resolved_model(model),
);
}
let mut type_permissions = HashMap::new();
for (type_name, object_type) in &metadata.object_types {
for (role, output_permission) in &object_type.type_output_permissions {
let output_permission = table::TypePermission {
output: output_permission.clone(),
};
let role_permissions =
type_permissions
.entry(role)
.or_insert_with(|| table::TypePermissionsOfRole {
permissions: HashMap::new(),
});
role_permissions
.permissions
.insert(type_name.clone(), output_permission);
}
}
let introspection = introspection::Introspection::from_metadata(metadata, &subgraphs);
Context {
subgraphs,
type_permissions: type_permissions
.into_iter()
.map(|(role, role_permissions)| (role.clone(), Arc::new(role_permissions)))
.collect(),
introspection,
}
}
}
pub struct OpenDDCatalogProvider {
schemas: IndexMap<String, Arc<HasuraSchemaProvider>>,
}
impl OpenDDCatalogProvider {
fn new(
session: &Arc<Session>,
http_context: &Arc<execute::HttpContext>,
context: &Context,
) -> Self {
let type_permissions = context.type_permissions.get(&session.role).cloned();
let mut schemas = IndexMap::new();
for (subgraph_name, subgraph) in &context.subgraphs {
let mut tables = IndexMap::new();
for model in subgraph.models.values() {
let select_permission = model.permissions.get(&session.role).cloned();
let provider = table::OpenDDTableProvider {
session: session.clone(),
http_context: http_context.clone(),
name: model.name.clone(),
data_type: model.data_type.clone(),
source: model.source.clone(),
schema: model.schema.clone(),
select_permission,
type_permissions: type_permissions.clone(),
};
tables.insert(model.name.to_string(), Arc::new(provider));
}
let provider = HasuraSchemaProvider::OpenDD(schema::OpenDDSchemaProvider { tables });
schemas.insert(subgraph_name.clone(), Arc::new(provider));
}
schemas.insert(
introspection::HASURA_METADATA_SCHEMA.to_string(),
Arc::new(HasuraSchemaProvider::Introspection(
introspection::IntrospectionSchemaProvider::new(&context.introspection),
)),
);
OpenDDCatalogProvider { schemas }
}
pub(crate) fn get(
&self,
default_schema: Option<&str>,
table: &TableReference,
) -> Option<&table::OpenDDTableProvider> {
let schema = table.schema().or(default_schema);
let table = table.table();
if let Some(schema) = schema {
if let HasuraSchemaProvider::OpenDD(schema) = self.schemas.get(schema)?.as_ref() {
schema.tables.get(table).map(std::convert::AsRef::as_ref)
} else {
None
}
} else {
None
}
}
}
enum HasuraSchemaProvider {
OpenDD(OpenDDSchemaProvider),
Introspection(introspection::IntrospectionSchemaProvider),
}
#[async_trait]
impl datafusion::SchemaProvider for HasuraSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
match self {
HasuraSchemaProvider::OpenDD(schema) => schema.table_names(),
HasuraSchemaProvider::Introspection(schema) => schema.table_names(),
}
}
async fn table(
&self,
name: &str,
) -> datafusion::Result<Option<Arc<dyn datafusion::TableProvider>>> {
match self {
HasuraSchemaProvider::OpenDD(schema) => schema.table(name).await,
HasuraSchemaProvider::Introspection(schema) => schema.table(name).await,
}
}
fn table_exist(&self, name: &str) -> bool {
match self {
HasuraSchemaProvider::OpenDD(schema) => schema.table_exist(name),
HasuraSchemaProvider::Introspection(schema) => schema.table_exist(name),
}
}
}
impl datafusion::CatalogProvider for OpenDDCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.schemas.keys().cloned().collect()
}
fn schema(&self, name: &str) -> Option<Arc<dyn datafusion::SchemaProvider>> {
self.schemas
.get(name)
.cloned()
.map(|schema| schema as Arc<dyn datafusion::SchemaProvider>)
}
}
impl Context {
pub fn create_session_context(
&self,
session: &Arc<Session>,
http_context: &Arc<execute::HttpContext>,
) -> datafusion::SessionContext {
let default_schema_name = if self.subgraphs.len() == 1 {
self.subgraphs.get_index(0).map(|v| v.0)
} else {
None
};
let session_config = datafusion::SessionConfig::new()
.set(
"datafusion.catalog.default_catalog",
datafusion::ScalarValue::Utf8(Some("default".to_string())),
)
.set(
"datafusion.catalog.information_schema",
datafusion::ScalarValue::Boolean(Some(true)),
)
.set(
"datafusion.execution.target_partitions",
datafusion::ScalarValue::Int32(Some(1)),
)
.set(
"datafusion.execution.planning_concurrency",
datafusion::ScalarValue::Int32(Some(1)),
)
.set(
"datafusion.sql_parser.enable_ident_normalization",
datafusion::ScalarValue::Boolean(Some(false)),
);
let session_config = if let Some(default_schema_name) = default_schema_name {
session_config.set(
"datafusion.catalog.default_schema",
datafusion::ScalarValue::Utf8(Some(default_schema_name.clone())),
)
} else {
session_config
};
let catalog = Arc::new(OpenDDCatalogProvider::new(session, http_context, self));
let query_planner = Arc::new(super::execute::planner::NDCQueryPlanner {
default_schema: default_schema_name.map(|s| Arc::new(s.clone())),
catalog: catalog.clone(),
});
let session_state =
SessionState::new_with_config_rt(session_config, Arc::new(RuntimeEnv::default()))
.with_analyzer_rules(vec![Arc::new(
super::execute::analyzer::ReplaceTableScan::new(
default_schema_name.map(|s| Arc::new(s.clone())),
catalog.clone(),
),
)])
.with_query_planner(query_planner)
.add_optimizer_rule(Arc::new(
super::execute::optimizer::NDCPushDownProjection {},
));
let session_context = datafusion::SessionContext::new_with_state(session_state);
session_context
.register_catalog("default", catalog as Arc<dyn datafusion::CatalogProvider>);
session_context
}
}

View File

@ -0,0 +1,354 @@
//! Describe and populate the introspection tables used by data fusion.
use std::{any::Any, sync::Arc};
use async_trait::async_trait;
use indexmap::IndexMap;
use metadata_resolve::{self as resolved, ModelRelationshipTarget};
mod df {
pub(super) use datafusion::{
arrow::{
array::RecordBatch,
datatypes::{DataType, Field, Schema, SchemaRef},
},
catalog::schema::SchemaProvider,
common::ScalarValue,
datasource::{TableProvider, TableType},
error::Result,
execution::context::SessionState,
logical_expr::Expr,
physical_plan::{values::ValuesExec, ExecutionPlan},
};
}
use open_dds::relationships::RelationshipType;
use serde::{Deserialize, Serialize};
pub const HASURA_METADATA_SCHEMA: &str = "hasura";
pub const TABLE_METADATA: &str = "table_metadata";
pub const COLUMN_METADATA: &str = "column_metadata";
pub const INFERRED_FOREIGN_KEY_CONSTRAINTS: &str = "inferred_foreign_key_constraints";
/// Describes the database schema structure and metadata.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct Introspection {
pub(crate) table_metadata: TableMetadata,
pub(crate) column_metadata: ColumnMetadata,
pub(crate) inferred_foreign_key_constraints: InferredForeignKeys,
}
impl Introspection {
/// Derive SQL schema from the Open DDS metadata.
pub fn from_metadata(
metadata: &resolved::Metadata,
schemas: &IndexMap<String, crate::catalog::schema::Subgraph>,
) -> Self {
let mut table_metadata_rows = Vec::new();
let mut column_metadata_rows = Vec::new();
let mut foreign_key_constraint_rows = Vec::new();
for (schema_name, schema) in schemas {
for (table_name, table) in &schema.models {
table_metadata_rows.push(TableRow::new(
schema_name.clone(),
table_name.to_string(),
table.description.clone(),
));
for (column_name, column_description) in &table.columns {
column_metadata_rows.push(ColumnRow {
schema_name: schema_name.clone(),
table_name: table_name.clone(),
column_name: column_name.clone(),
description: column_description.clone(),
});
}
// TODO:
// 1. Need to check if the target_model is part of subgraphs
// 2. Need to also check for array relationships in case the corresponding
// object relationship isn't present
if let Some(object_type) = metadata.object_types.get(&table.data_type) {
for relationship in object_type.relationship_fields.values() {
if let metadata_resolve::RelationshipTarget::Model(
ModelRelationshipTarget {
model_name,
relationship_type: RelationshipType::Object,
target_typename: _,
mappings,
},
) = &relationship.target
{
for mapping in mappings {
foreign_key_constraint_rows.push(ForeignKeyRow {
from_schema_name: schema_name.clone(),
from_table_name: table_name.clone(),
from_column_name: mapping.source_field.field_name.to_string(),
to_schema_name: model_name.subgraph.clone(),
to_table_name: model_name.name.to_string(),
to_column_name: mapping.target_field.field_name.to_string(),
});
}
}
}
}
}
}
Introspection {
table_metadata: TableMetadata::new(table_metadata_rows),
column_metadata: ColumnMetadata::new(column_metadata_rows),
inferred_foreign_key_constraints: InferredForeignKeys::new(foreign_key_constraint_rows),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct TableMetadata {
schema: df::SchemaRef,
rows: Vec<TableRow>,
}
impl TableMetadata {
pub(crate) fn new(rows: Vec<TableRow>) -> Self {
let schema_name = df::Field::new("schema_name", df::DataType::Utf8, false);
let table_name = df::Field::new("table_name", df::DataType::Utf8, false);
let description = df::Field::new("description", df::DataType::Utf8, true);
let schema =
df::SchemaRef::new(df::Schema::new(vec![schema_name, table_name, description]));
TableMetadata { schema, rows }
}
}
impl TableMetadata {
fn to_values_table(&self) -> ValuesTable {
ValuesTable {
schema: self.schema.clone(),
rows: self
.rows
.iter()
.map(|row| {
vec![
df::ScalarValue::Utf8(Some(row.schema_name.clone())),
df::ScalarValue::Utf8(Some(row.table_name.clone())),
df::ScalarValue::Utf8(row.description.clone()),
]
})
.collect(),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct TableRow {
schema_name: String,
table_name: String,
description: Option<String>,
}
impl TableRow {
pub(crate) fn new(
schema_name: String,
table_name: String,
description: Option<String>,
) -> Self {
Self {
schema_name,
table_name,
description,
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct ColumnMetadata {
pub(crate) schema: df::SchemaRef,
pub(crate) rows: Vec<ColumnRow>,
}
impl ColumnMetadata {
fn new(rows: Vec<ColumnRow>) -> Self {
let schema_name = df::Field::new("schema_name", df::DataType::Utf8, false);
let table_name = df::Field::new("table_name", df::DataType::Utf8, false);
let column_name = df::Field::new("column_name", df::DataType::Utf8, false);
let description = df::Field::new("description", df::DataType::Utf8, true);
let schema = df::SchemaRef::new(df::Schema::new(vec![
schema_name,
table_name,
column_name,
description,
]));
ColumnMetadata { schema, rows }
}
fn to_values_table(&self) -> ValuesTable {
ValuesTable {
schema: self.schema.clone(),
rows: self
.rows
.iter()
.map(|row| {
vec![
df::ScalarValue::Utf8(Some(row.schema_name.clone())),
df::ScalarValue::Utf8(Some(row.table_name.clone())),
df::ScalarValue::Utf8(Some(row.column_name.clone())),
df::ScalarValue::Utf8(row.description.clone()),
]
})
.collect(),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct ColumnRow {
schema_name: String,
table_name: String,
column_name: String,
description: Option<String>,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct InferredForeignKeys {
schema: df::SchemaRef,
rows: Vec<ForeignKeyRow>,
}
impl InferredForeignKeys {
fn new(rows: Vec<ForeignKeyRow>) -> Self {
let from_schema_name = df::Field::new("from_schema_name", df::DataType::Utf8, false);
let from_table_name = df::Field::new("from_table_name", df::DataType::Utf8, false);
let from_column_name = df::Field::new("from_column_name", df::DataType::Utf8, false);
let to_schema_name = df::Field::new("to_schema_name", df::DataType::Utf8, false);
let to_table_name = df::Field::new("to_table_name", df::DataType::Utf8, false);
let to_column_name = df::Field::new("to_column_name", df::DataType::Utf8, false);
let schema = df::SchemaRef::new(df::Schema::new(vec![
from_schema_name,
from_table_name,
from_column_name,
to_schema_name,
to_table_name,
to_column_name,
]));
InferredForeignKeys { schema, rows }
}
fn to_values_table(&self) -> ValuesTable {
ValuesTable {
schema: self.schema.clone(),
rows: self
.rows
.iter()
.map(|row| {
vec![
df::ScalarValue::Utf8(Some(row.from_schema_name.clone())),
df::ScalarValue::Utf8(Some(row.from_table_name.clone())),
df::ScalarValue::Utf8(Some(row.from_column_name.clone())),
df::ScalarValue::Utf8(Some(row.to_schema_name.clone())),
df::ScalarValue::Utf8(Some(row.to_table_name.clone())),
df::ScalarValue::Utf8(Some(row.to_column_name.clone())),
]
})
.collect(),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
struct ForeignKeyRow {
from_schema_name: String,
from_table_name: String,
from_column_name: String,
to_schema_name: String,
to_table_name: String,
to_column_name: String,
}
pub(crate) struct IntrospectionSchemaProvider {
tables: IndexMap<String, Arc<dyn df::TableProvider>>,
}
impl IntrospectionSchemaProvider {
pub(crate) fn new(introspection: &Introspection) -> Self {
let tables = [
(
TABLE_METADATA,
introspection.table_metadata.to_values_table(),
),
(
COLUMN_METADATA,
introspection.column_metadata.to_values_table(),
),
(
INFERRED_FOREIGN_KEY_CONSTRAINTS,
introspection
.inferred_foreign_key_constraints
.to_values_table(),
),
]
.into_iter()
.map(|(k, table)| (k.to_string(), Arc::new(table) as Arc<dyn df::TableProvider>))
.collect();
IntrospectionSchemaProvider { tables }
}
}
#[async_trait]
impl df::SchemaProvider for IntrospectionSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.tables.keys().cloned().collect::<Vec<_>>()
}
async fn table(
&self,
name: &str,
) -> datafusion::error::Result<Option<Arc<dyn df::TableProvider>>> {
Ok(self.tables.get(name).cloned())
}
fn table_exist(&self, name: &str) -> bool {
self.tables.contains_key(name)
}
}
// A table with static rows
struct ValuesTable {
schema: df::SchemaRef,
rows: Vec<Vec<df::ScalarValue>>,
}
#[async_trait]
impl df::TableProvider for ValuesTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> df::SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> df::TableType {
df::TableType::View
}
async fn scan(
&self,
_state: &df::SessionState,
projection: Option<&Vec<usize>>,
// filters and limit can be used here to inject some push-down operations if needed
_filters: &[df::Expr],
_limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn df::ExecutionPlan>> {
let projected_schema = Arc::new(self.schema.project(projection.unwrap_or(&vec![]))?);
let columnar_projection = projection
.unwrap_or(&vec![])
.iter()
.map(|j| self.rows.iter().map(|row| row[*j].clone()))
.map(df::ScalarValue::iter_to_array)
.collect::<df::Result<Vec<_>>>()?;
Ok(Arc::new(df::ValuesExec::try_new_from_batches(
projected_schema.clone(),
vec![df::RecordBatch::try_new(
projected_schema,
columnar_projection,
)?],
)?))
}
}

View File

@ -0,0 +1,44 @@
use async_trait::async_trait;
use std::{any::Any, sync::Arc};
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
mod df {
pub(super) use datafusion::error::Result;
pub(super) use datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider};
}
use crate::catalog;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct Subgraph {
pub models: IndexMap<String, catalog::table::Model>,
}
pub struct OpenDDSchemaProvider {
pub(crate) tables: IndexMap<String, Arc<catalog::table::OpenDDTableProvider>>,
}
#[async_trait]
impl df::SchemaProvider for OpenDDSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.tables.keys().cloned().collect::<Vec<_>>()
}
async fn table(&self, name: &str) -> df::Result<Option<Arc<dyn df::TableProvider>>> {
Ok(self
.tables
.get(name)
.cloned()
.map(|table| table as Arc<dyn df::TableProvider>))
}
fn table_exist(&self, name: &str) -> bool {
self.tables.contains_key(name)
}
}

View File

@ -0,0 +1,256 @@
//! Describe a model for a SQL table and how to translate datafusion operations on the table
//! to ndc-spec queries.
use std::collections::HashMap;
use std::{any::Any, sync::Arc};
use async_trait::async_trait;
use datafusion::common::internal_err;
use hasura_authn_core::Session;
use indexmap::IndexMap;
use metadata_resolve::{self as resolved, Qualified, SelectPermission};
use open_dds::permissions::Role;
use open_dds::{
models::ModelName,
types::{CustomTypeName, FieldName},
};
use serde::{Deserialize, Serialize};
mod df {
pub(super) use datafusion::arrow::datatypes::Field;
pub(super) use datafusion::{
arrow::datatypes::{DataType, Schema, SchemaBuilder, SchemaRef},
datasource::{TableProvider, TableType},
execution::context::SessionState,
logical_expr::Expr,
physical_plan::ExecutionPlan,
};
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub(crate) struct TypePermission {
pub output: open_dds::permissions::TypeOutputPermission,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct TypePermissionsOfRole {
pub(crate) permissions: HashMap<Qualified<CustomTypeName>, TypePermission>,
}
fn get_type_representation<'a>(
model: &'a resolved::Model,
field: &FieldName,
) -> Option<&'a ndc_models::TypeRepresentation> {
model
.source
.as_ref()
.and_then(|source| {
source
.type_mappings
.get(&model.data_type)
.map(|type_mapping| {
let resolved::TypeMapping::Object { field_mappings, .. } = type_mapping;
field_mappings
.get(field)
.map(|mapping| mapping.column_type_representation.as_ref())
})
})
.flatten()
.flatten()
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub(crate) struct Model {
pub name: ModelName,
pub description: Option<String>,
// Datafusion table schema
pub schema: df::SchemaRef,
// For now, descriptions of fields
pub columns: IndexMap<String, Option<String>>,
// This is the entry point for the type mappings stored
// in ModelSource
pub data_type: Qualified<CustomTypeName>,
// The underlying source to execute ndc queries
pub source: Option<Arc<metadata_resolve::ModelSource>>,
// Permisisons for the model. Note that the type permissions will need to be retrieved from the
// global context
pub permissions: HashMap<Role, Arc<resolved::SelectPermission>>,
}
impl Model {
pub fn from_resolved_model(model: &resolved::ModelWithPermissions) -> Self {
let (schema, columns) = {
let mut columns = IndexMap::new();
let mut builder = df::SchemaBuilder::new();
for (field_name, field_definition) in &model.model.type_fields {
let ndc_type_representation = get_type_representation(&model.model, field_name);
let field_type =
to_arrow_type(&field_definition.field_type, ndc_type_representation);
if let Some(field_type) = field_type {
builder.push(df::Field::new(
field_name.to_string(),
field_type,
field_definition.field_type.nullable,
));
let description = if let Some(ndc_models::TypeRepresentation::Enum { one_of }) =
ndc_type_representation
{
// TODO: Instead of stuffing the possible enum values in description,
// surface them in the metadata tables.
Some(
field_definition
.description
.clone()
.unwrap_or_else(String::new)
+ &format!(" Possible values: {}", one_of.join(", ")),
)
} else {
field_definition.description.clone()
};
columns.insert(field_name.to_string(), description);
}
}
let fields = builder.finish().fields;
(df::SchemaRef::new(df::Schema::new(fields)), columns)
};
let permissions = model
.select_permissions
.iter()
.map(|(role, select_permission)| (role.clone(), Arc::new(select_permission.clone())))
.collect();
Model {
name: model.model.name.name.clone(),
description: model.model.raw.description.clone(),
schema,
data_type: model.model.data_type.clone(),
source: model
.model
.source
.as_ref()
.map(|source| Arc::new(source.clone())),
columns,
permissions,
}
}
}
/// Converts an opendd type to an arrow type.
/// TODO: need to handle complex types
#[allow(clippy::match_same_arms)]
fn to_arrow_type(
ty: &resolved::QualifiedTypeReference,
ndc_type_representation: Option<&ndc_models::TypeRepresentation>,
) -> Option<df::DataType> {
match &ty.underlying_type {
resolved::QualifiedBaseType::Named(resolved::QualifiedTypeName::Inbuilt(inbuilt_type)) => {
let data_type = match inbuilt_type {
open_dds::types::InbuiltType::ID => df::DataType::Utf8,
open_dds::types::InbuiltType::Int => df::DataType::Int32,
open_dds::types::InbuiltType::Float => df::DataType::Float32,
open_dds::types::InbuiltType::Boolean => df::DataType::Boolean,
open_dds::types::InbuiltType::String => df::DataType::Utf8,
};
Some(data_type)
}
resolved::QualifiedBaseType::Named(resolved::QualifiedTypeName::Custom(custom_type)) => {
if let Some(type_representation) = ndc_type_representation {
match type_representation {
ndc_models::TypeRepresentation::Boolean => Some(df::DataType::Boolean),
ndc_models::TypeRepresentation::String => Some(df::DataType::Utf8),
ndc_models::TypeRepresentation::Int8 => Some(df::DataType::Int8),
ndc_models::TypeRepresentation::Int16 => Some(df::DataType::Int16),
ndc_models::TypeRepresentation::Int32 => Some(df::DataType::Int32),
ndc_models::TypeRepresentation::Int64 => Some(df::DataType::Int64),
ndc_models::TypeRepresentation::Float32 => Some(df::DataType::Float32),
ndc_models::TypeRepresentation::Float64 => Some(df::DataType::Float64),
// Can't do anything better for BigInteger, so we just use String.
ndc_models::TypeRepresentation::BigInteger => Some(df::DataType::Utf8),
// BigDecimal128 is not supported by arrow.
ndc_models::TypeRepresentation::BigDecimal => Some(df::DataType::Float64),
ndc_models::TypeRepresentation::UUID => Some(df::DataType::Utf8),
ndc_models::TypeRepresentation::Date => Some(df::DataType::Date32),
ndc_models::TypeRepresentation::Timestamp => Some(df::DataType::Timestamp(
datafusion::arrow::datatypes::TimeUnit::Microsecond,
None,
)),
ndc_models::TypeRepresentation::TimestampTZ => Some(df::DataType::Timestamp(
datafusion::arrow::datatypes::TimeUnit::Microsecond,
None,
)),
ndc_models::TypeRepresentation::Enum { .. } => Some(df::DataType::Utf8),
_ => None,
}
} else {
match custom_type.name.to_string().to_lowercase().as_str() {
"bool" => Some(df::DataType::Boolean),
"int8" => Some(df::DataType::Int8),
"int16" => Some(df::DataType::Int16),
"int32" => Some(df::DataType::Int32),
"int64" => Some(df::DataType::Int64),
"float32" => Some(df::DataType::Float32),
"float64" => Some(df::DataType::Float64),
"varchar" => Some(df::DataType::Utf8),
"text" => Some(df::DataType::Utf8),
"timestamp" => Some(df::DataType::Timestamp(
datafusion::arrow::datatypes::TimeUnit::Microsecond,
None,
)),
"timestamptz" => Some(df::DataType::Timestamp(
datafusion::arrow::datatypes::TimeUnit::Microsecond,
None,
)),
// BigDecimal128 is not supported by arrow.
"bigdecimal" => Some(df::DataType::Float64),
_ => None,
}
}
}
resolved::QualifiedBaseType::List(_) => None,
}
}
#[derive(Debug, Clone)]
pub(crate) struct OpenDDTableProvider {
pub(crate) session: Arc<Session>,
pub(crate) http_context: Arc<execute::HttpContext>,
pub(crate) name: ModelName,
pub(crate) data_type: Qualified<CustomTypeName>,
pub(crate) source: Option<Arc<metadata_resolve::ModelSource>>,
pub(crate) schema: df::SchemaRef,
pub(crate) select_permission: Option<Arc<SelectPermission>>,
pub(crate) type_permissions: Option<Arc<TypePermissionsOfRole>>,
}
#[async_trait]
impl df::TableProvider for OpenDDTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> df::SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> df::TableType {
df::TableType::Base
}
async fn scan(
&self,
_state: &df::SessionState,
_projection: Option<&Vec<usize>>,
// filters and limit can be used here to inject some push-down operations if needed
_filters: &[df::Expr],
_limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn df::ExecutionPlan>> {
internal_err!("scan shouldn't be called")
}
}

View File

@ -0,0 +1,158 @@
use std::sync::Arc;
use datafusion::{
arrow::{array::RecordBatch, error::ArrowError, json::writer::JsonArray, json::WriterBuilder},
dataframe::DataFrame,
error::DataFusionError,
};
use hasura_authn_core::Session;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing_util::{ErrorVisibility, SpanVisibility, Successful, TraceableError};
pub use datafusion::execution::context::SessionContext;
pub(crate) mod analyzer;
pub(crate) mod optimizer;
pub(crate) mod planner;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SqlRequest {
sql: String,
}
#[derive(Error, Debug, Clone)]
pub enum SqlExecutionError {
#[error("error in data fusion: {0}")]
DataFusion(String),
#[error("error in encoding data: {0}")]
Arrow(String),
}
impl From<DataFusionError> for SqlExecutionError {
fn from(e: DataFusionError) -> Self {
Self::DataFusion(e.to_string())
}
}
impl From<ArrowError> for SqlExecutionError {
fn from(e: ArrowError) -> Self {
Self::Arrow(e.to_string())
}
}
impl TraceableError for SqlExecutionError {
fn visibility(&self) -> ErrorVisibility {
ErrorVisibility::User
}
}
/// Executes an SQL Request using the Apache DataFusion query engine.
pub async fn execute_sql(
context: &crate::catalog::Context,
session: Arc<Session>,
http_context: Arc<execute::HttpContext>,
request: &SqlRequest,
) -> Result<Vec<u8>, SqlExecutionError> {
let tracer = tracing_util::global_tracer();
let session_context = tracer
.in_span(
"create_session_context",
"Create a datafusion SessionContext",
SpanVisibility::Internal,
|| {
let session = context.create_session_context(&session, &http_context);
Successful::new(session)
},
)
.into_inner();
let data_frame = tracer
.in_span_async(
"create_logical_plan",
"Creates a Logical Plan for the given SQL statement",
SpanVisibility::User,
|| {
Box::pin(async {
session_context
.sql(&request.sql)
.await
.map_err(|e| SqlExecutionError::DataFusion(e.to_string()))
})
},
)
.await?;
let batches = tracer
.in_span_async(
"execute_logical_plan",
"Executes the Logical Plan of a query",
SpanVisibility::User,
|| Box::pin(async { execute_logical_plan(data_frame).await }),
)
.await?;
tracer.in_span(
"serialize_record_batch",
"Serializes datafusion's RecordBatch into a JSON array",
SpanVisibility::User,
|| record_batches_to_json_array(&batches),
)
}
async fn execute_logical_plan(frame: DataFrame) -> Result<Vec<RecordBatch>, SqlExecutionError> {
let tracer = tracing_util::global_tracer();
let task_ctx = frame.task_ctx();
let session_config = task_ctx.session_config().clone();
let plan = tracer
.in_span_async(
"create_physical_plan",
"Creates a physical plan from a logical plan",
SpanVisibility::User,
|| {
Box::pin(async {
frame
.create_physical_plan()
.await
.map_err(|e| SqlExecutionError::DataFusion(e.to_string()))
})
},
)
.await?;
let record_batches = tracer
.in_span_async(
"execute_physical_plan",
"Executes a physical plan to collect record batches",
SpanVisibility::User,
|| {
let task_ctx = Arc::new(task_ctx.with_session_config(
session_config.with_extension(Arc::new(tracing_util::Context::current())),
));
Box::pin(async {
datafusion::physical_plan::collect(plan, task_ctx)
.await
.map_err(|e| SqlExecutionError::DataFusion(e.to_string()))
})
},
)
.await?;
Ok(record_batches)
}
fn record_batches_to_json_array(batches: &[RecordBatch]) -> Result<Vec<u8>, SqlExecutionError> {
if batches.is_empty() {
return Ok(vec![b'[', b']']);
}
// Write the record batch out as a JSON array
let buf = Vec::new();
let builder = WriterBuilder::new().with_explicit_nulls(true);
let mut writer = builder.build::<_, JsonArray>(buf);
for batch in batches {
writer.write(batch)?;
}
writer.finish()?;
// Get the underlying buffer back,
Ok(writer.into_inner())
}

View File

@ -0,0 +1,188 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Analyzed rule to replace TableScan references
//! such as DataFrames and Views and inlines the LogicalPlan.
use std::collections::BTreeMap;
use std::sync::Arc;
use datafusion::{
common::{
config::ConfigOptions,
tree_node::{Transformed, TransformedResult, TreeNode},
Result,
},
error::DataFusionError,
logical_expr::{logical_plan::LogicalPlan, Extension, TableScan},
optimizer::AnalyzerRule,
};
use indexmap::IndexMap;
use metadata_resolve::{self as resolved};
use open_dds::identifier::Identifier;
use open_dds::types::FieldName;
use crate::plan::NDCQuery;
/// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`]
/// (DataFrame / ViewTable)
pub struct ReplaceTableScan {
default_schema: Option<Arc<String>>,
catalog: Arc<crate::catalog::OpenDDCatalogProvider>,
}
impl ReplaceTableScan {
pub fn new(
default_schema: Option<Arc<String>>,
catalog: Arc<crate::catalog::OpenDDCatalogProvider>,
) -> Self {
Self {
default_schema,
catalog,
}
}
}
impl AnalyzerRule for ReplaceTableScan {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
plan.transform_up(|n| {
analyze_internal(
self.default_schema.as_ref().map(|x| x.as_str()),
&self.catalog,
n,
)
})
.data()
}
fn name(&self) -> &str {
"replace_table_scan_with_ndc_query"
}
}
fn analyze_internal(
default_schema: Option<&str>,
catalog: &crate::catalog::OpenDDCatalogProvider,
plan: LogicalPlan,
) -> Result<Transformed<LogicalPlan>> {
// rewrite any subqueries in the plan first
let transformed_plan = plan.map_subqueries(|plan| {
plan.transform_up(|n| analyze_internal(default_schema, catalog, n))
})?;
let transformed_plan = transformed_plan.transform_data(|plan| match plan {
LogicalPlan::TableScan(TableScan {
table_name,
source: _,
projection: _,
projected_schema,
filters: _,
fetch: _,
}) if table_name.schema() != Some("hasura") => {
let table = catalog.get(default_schema, &table_name).ok_or_else(|| {
DataFusionError::Internal(format!(
"table provider not found for replace_table_scan: {table_name}"
))
})?;
let model_source = table.source.as_ref().ok_or_else(|| {
DataFusionError::Plan(format!(
"model source should be configured for {}",
table.name
))
})?;
let mut ndc_fields = IndexMap::new();
let base_type_fields = {
let base_type_mapping = model_source
.type_mappings
.get(&table.data_type)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"couldn't fetch type_mapping of type {} for model {}",
table.data_type, table.name
))
})?;
match base_type_mapping {
resolved::TypeMapping::Object {
ndc_object_type_name: _,
field_mappings,
} => field_mappings,
}
};
for field in projected_schema.fields() {
let field_name = {
let field_name = Identifier::new(field.name().clone()).map_err(|e| {
DataFusionError::Internal(format!(
"field name conversion failed {}: {}",
field.name(),
e
))
})?;
FieldName(field_name)
};
let ndc_field = {
base_type_fields
.get(&field_name)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"couldn't fetch field mapping of field {} in type {} for model {}",
field_name, table.data_type, table.name
))
})
.map(|field_mapping| field_mapping.column.clone())
}?;
ndc_fields.insert(
field.name().clone(),
ndc_models::Field::Column {
column: ndc_field.to_string(),
fields: None,
arguments: BTreeMap::new(),
},
);
}
let ndc_query = ndc_models::Query {
aggregates: None,
fields: Some(ndc_fields),
limit: None,
offset: None,
order_by: None,
predicate: None,
};
let query_request = ndc_models::QueryRequest {
query: ndc_query,
collection: model_source.collection.clone(),
arguments: BTreeMap::new(),
collection_relationships: BTreeMap::new(),
variables: None,
};
let ndc_query_node = NDCQuery {
table: table_name.clone(),
query: query_request,
data_source_name: Arc::new(model_source.collection.clone()),
schema: projected_schema,
};
Ok(Transformed::yes(LogicalPlan::Extension(Extension {
node: Arc::new(ndc_query_node),
})))
}
_ => Ok(Transformed::no(plan)),
})?;
Ok(transformed_plan)
}

View File

@ -0,0 +1,3 @@
mod projection_pushdown;
pub(crate) use projection_pushdown::NDCPushDownProjection;

View File

@ -0,0 +1,71 @@
use std::sync::Arc;
use datafusion::{
common::{internal_err, tree_node::Transformed},
error::Result,
logical_expr::{Expr, Extension, LogicalPlan},
optimizer::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule},
};
pub(crate) struct NDCPushDownProjection {}
impl OptimizerRule for NDCPushDownProjection {
fn try_optimize(
&self,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called NDCPushDownProjection::rewrite")
}
fn name(&self) -> &str {
"ndc_pushdown_projection"
}
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}
fn supports_rewrite(&self) -> bool {
true
}
fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
if let Some((projections, projected_schema, ndc_query)) = {
match plan {
LogicalPlan::Projection(ref projection) => match projection.input.as_ref() {
LogicalPlan::Extension(Extension { node }) => node
.as_ref()
.as_any()
.downcast_ref::<crate::plan::NDCQuery>()
.map(|ndc_query| (&projection.expr, &projection.schema, ndc_query.clone())),
_ => None,
},
_ => None,
}
} {
let projected_columns = projections_to_columns(projections)?;
let projected_query =
ndc_query.project(projected_schema.clone(), &projected_columns)?;
let plan = LogicalPlan::Extension(Extension {
node: Arc::new(projected_query),
});
Ok(Transformed::yes(plan))
} else {
Ok(Transformed::no(plan))
}
}
}
fn projections_to_columns(projections: &[Expr]) -> Result<Vec<String>> {
projections
.iter()
.map(|expr| match expr {
Expr::Column(column) => Ok(column.name.clone()),
_ => internal_err!("non-column found in projection of ndcscan: {}", expr),
})
.collect()
}

View File

@ -0,0 +1,182 @@
use std::{collections::BTreeMap, sync::Arc};
use datafusion::{
error::{DataFusionError, Result},
execution::context::{QueryPlanner, SessionState},
logical_expr::{LogicalPlan, UserDefinedLogicalNode},
physical_plan::ExecutionPlan,
physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner},
};
use execute::process_model_relationship_definition;
use indexmap::IndexMap;
use metadata_resolve::FilterPermission;
use open_dds::identifier::Identifier;
use open_dds::types::FieldName;
use crate::plan::NDCPushDown;
use async_trait::async_trait;
pub(crate) struct NDCQueryPlanner {
pub(crate) default_schema: Option<Arc<String>>,
pub(crate) catalog: Arc<crate::catalog::OpenDDCatalogProvider>,
}
#[async_trait]
impl QueryPlanner for NDCQueryPlanner {
/// Given a `LogicalPlan` created from above, create an
/// `ExecutionPlan` suitable for execution
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
// Teach the default physical planner how to plan TopK nodes.
let physical_planner =
DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(NDCPushDownPlanner {
default_schema: self.default_schema.clone(),
catalog: self.catalog.clone(),
})]);
// Delegate most work of physical planning to the default physical planner
physical_planner
.create_physical_plan(logical_plan, session_state)
.await
}
}
pub(crate) struct NDCPushDownPlanner {
pub(crate) default_schema: Option<Arc<String>>,
pub(crate) catalog: Arc<crate::catalog::OpenDDCatalogProvider>,
}
#[async_trait]
impl ExtensionPlanner for NDCPushDownPlanner {
/// Create a physical plan for an extension node
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
if let Some(ndc_node) = node.as_any().downcast_ref::<crate::plan::NDCQuery>() {
assert_eq!(logical_inputs.len(), 0, "Inconsistent number of inputs");
assert_eq!(physical_inputs.len(), 0, "Inconsistent number of inputs");
let table = self
.catalog
.get(
self.default_schema.as_ref().map(|s| s.as_str()),
&ndc_node.table,
)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"table provider not found for replace_table_scan: {}",
&ndc_node.table
))
})?;
let model_source = table.source.as_ref().ok_or_else(|| {
DataFusionError::Plan(format!(
"model source should be configured for {}",
table.name
))
})?;
let select_permission = table.select_permission.as_ref().ok_or_else(|| {
DataFusionError::Plan(format!(
"role {} does not have select permission for model {}",
table.session.role, table.name
))
})?;
let type_permissions = table.type_permissions.as_ref().ok_or_else(|| {
DataFusionError::Plan(format!(
"role {} does not have permission to select any fields of model {}",
table.session.role, table.name
))
})?;
let base_type_allowed_fields = &type_permissions
.permissions
.get(&table.data_type)
.ok_or_else(|| {
DataFusionError::Plan(format!(
"role {} has permission to select model {} but does not have permission \
to select fields of the model's underlying type {}",
table.session.role, table.name, table.data_type
))
})?
.output
.allowed_fields;
for (field_name, _field) in ndc_node
.query
.query
.fields
.as_ref()
.unwrap_or(&IndexMap::new())
{
let field_name = {
let field_name = Identifier::new(field_name.clone()).map_err(|e| {
DataFusionError::Internal(format!(
"field name conversion failed {field_name}: {e}"
))
})?;
FieldName(field_name)
};
if base_type_allowed_fields.contains(&field_name) {
Ok(())
} else {
Err(DataFusionError::Plan(format!(
"role {} does not have permission to select the field {} from type {} of model {}",
table.session.role, field_name, table.data_type, table.name
)))
}?;
}
let mut usage_counts = execute::model_tracking::UsagesCounts::default();
let mut relationships = BTreeMap::new();
let permission_filter = match &select_permission.filter {
FilterPermission::AllowAll => Ok(ndc_models::Expression::And {
expressions: vec![],
}),
FilterPermission::Filter(filter) => {
execute::ir::permissions::process_model_predicate(
filter,
&table.session.variables,
&mut relationships,
&mut usage_counts,
)
.map_err(|e| {
DataFusionError::Internal(format!(
"error when processing model predicate: {e}"
))
})
}
}?;
let relationships = relationships
.into_values()
.map(|v| {
process_model_relationship_definition(&v)
.map(|r| (v.relationship_name.to_string(), r))
.map_err(|e| {
DataFusionError::Internal(format!(
"error constructing ndc relationship definition: {e}"
))
})
})
.collect::<Result<BTreeMap<String, ndc_models::Relationship>, DataFusionError>>()?;
let mut query = ndc_node.query.clone();
query.query.predicate = Some(permission_filter);
query.collection_relationships = relationships;
let ndc_pushdown = NDCPushDown::new(
table.http_context.clone(),
ndc_node.schema.inner().clone(),
Arc::new(query),
Arc::new(model_source.data_connector.clone()),
);
Ok(Some(Arc::new(ndc_pushdown)))
} else {
Ok(None)
}
}
}

3
v3/crates/sql/src/lib.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod catalog;
pub mod execute;
pub mod plan;

257
v3/crates/sql/src/plan.rs Normal file
View File

@ -0,0 +1,257 @@
use core::fmt;
use std::{any::Any, hash::Hash, sync::Arc};
use datafusion::{
arrow::{
array::RecordBatch, datatypes::SchemaRef, error::ArrowError, json::reader as arrow_json,
},
common::DFSchemaRef,
error::DataFusionError,
logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore},
physical_expr::EquivalenceProperties,
physical_plan::{
stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionMode,
ExecutionPlan, Partitioning, PlanProperties,
},
sql::TableReference,
};
use execute::HttpContext;
use futures::TryFutureExt;
use tracing_util::{FutureExt, SpanVisibility, TraceableError};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ExecutionPlanError {
#[error("{0}")]
NDCExecutionError(#[from] execute::ndc::client::Error),
#[error("NDC Response not as expected: {0}")]
NDCResponseFormat(String),
#[error("Arrow error: {0}")]
ArrowError(#[from] ArrowError),
#[error("Couldn't construct a RecordBatch: {0}")]
RecordBatchConstruction(String),
#[error("Couldn't fetch otel tracing context")]
TracingContextNotFound,
}
impl TraceableError for ExecutionPlanError {
fn visibility(&self) -> tracing_util::ErrorVisibility {
tracing_util::ErrorVisibility::Internal
}
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct NDCQuery {
pub(crate) table: TableReference,
pub(crate) query: ndc_models::QueryRequest,
pub(crate) data_source_name: Arc<String>,
pub(crate) schema: DFSchemaRef,
}
impl Hash for NDCQuery {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.data_source_name.hash(state);
format!("{:#?}", self.query).hash(state);
self.schema.hash(state);
}
}
impl Eq for NDCQuery {}
impl NDCQuery {
pub(crate) fn project(
mut self,
schema: DFSchemaRef,
projection: &[String],
) -> datafusion::error::Result<Self> {
let mut current_fields = self.query.query.fields.take().ok_or_else(|| {
DataFusionError::Internal("empty fields found in ndcscan for projection".to_string())
})?;
let new_fields = projection
.iter()
.map(|projected_field| {
current_fields
.swap_remove(projected_field)
.map(|field| (projected_field.clone(), field))
.ok_or_else(|| {
DataFusionError::Internal(
"failed to lookup projectd field in ndcscan".to_string(),
)
})
})
.collect::<Result<_, DataFusionError>>()?;
let _ = std::mem::replace(&mut self.query.query.fields, Some(new_fields));
let _ = std::mem::replace(&mut self.schema, schema);
Ok(self)
}
}
impl UserDefinedLogicalNodeCore for NDCQuery {
fn name(&self) -> &str {
"NDCQuery"
}
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![]
}
/// Schema for TopK is the same as the input
fn schema(&self) -> &DFSchemaRef {
&self.schema
}
fn expressions(&self) -> Vec<datafusion::logical_expr::Expr> {
vec![]
}
/// For example: `TopK: k=10`
fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NDCQuery: query={:#?}", self.query)
}
fn with_exprs_and_inputs(
&self,
_exprs: Vec<datafusion::logical_expr::Expr>,
_inputs: Vec<LogicalPlan>,
) -> datafusion::error::Result<Self> {
Ok(self.clone())
}
}
#[derive(Debug, Clone)]
pub(crate) struct NDCPushDown {
http_context: Arc<execute::HttpContext>,
query: Arc<ndc_models::QueryRequest>,
data_connector: Arc<metadata_resolve::DataConnectorLink>,
projected_schema: SchemaRef,
cache: PlanProperties,
}
impl NDCPushDown {
pub(crate) fn new(
http_context: Arc<HttpContext>,
schema: SchemaRef,
query: Arc<ndc_models::QueryRequest>,
data_connector: Arc<metadata_resolve::DataConnectorLink>,
) -> Self {
let cache = Self::compute_properties(schema.clone());
Self {
http_context,
query,
data_connector,
projected_schema: schema,
cache,
}
}
/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
PlanProperties::new(
eq_properties,
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
)
}
}
impl DisplayAs for NDCPushDown {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
write!(f, "NDCPushDown")
}
}
impl ExecutionPlan for NDCPushDown {
fn name(&self) -> &'static str {
"NDCPushdown"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.cache
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
_partition: usize,
context: Arc<datafusion::execution::TaskContext>,
) -> datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
let otel_cx = context
.session_config()
.get_extension::<tracing_util::Context>()
.ok_or_else(|| {
DataFusionError::External(Box::new(ExecutionPlanError::TracingContextNotFound))
})?;
let fut = fetch_from_data_connector(
self.projected_schema.clone(),
self.http_context.clone(),
self.query.clone(),
self.data_connector.clone(),
)
.with_context((*otel_cx).clone())
.map_err(|e| DataFusionError::External(Box::new(e)));
let stream = futures::stream::once(fut);
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.projected_schema.clone(),
stream,
)))
}
}
pub async fn fetch_from_data_connector(
schema: SchemaRef,
http_context: Arc<HttpContext>,
query_request: Arc<ndc_models::QueryRequest>,
data_connector: Arc<metadata_resolve::DataConnectorLink>,
) -> Result<RecordBatch, ExecutionPlanError> {
let tracer = tracing_util::global_tracer();
let mut ndc_response =
execute::fetch_from_data_connector(&http_context, &query_request, &data_connector, None)
.await?;
let batch = tracer.in_span(
"ndc_response_to_record_batch",
"Converts NDC Response into datafusion's RecordBatch",
SpanVisibility::Internal,
|| {
let rows = ndc_response
.0
.pop()
.ok_or_else(|| {
ExecutionPlanError::NDCResponseFormat("no row sets found".to_string())
})?
.rows
.ok_or_else(|| {
ExecutionPlanError::NDCResponseFormat(
"no rows found for the row set".to_string(),
)
})?;
let mut decoder = arrow_json::ReaderBuilder::new(schema.clone()).build_decoder()?;
decoder.serialize(&rows)?;
decoder.flush()?.ok_or_else(|| {
ExecutionPlanError::RecordBatchConstruction(
"json to arrow decoder did not return any rows".to_string(),
)
})
},
)?;
Ok(batch)
}

View File

@ -18,6 +18,7 @@ pub use tracer::{
// risking mismatches and multiple globals
pub use opentelemetry::propagation::text_map_propagator::TextMapPropagator;
pub use opentelemetry::trace::get_active_span;
pub use opentelemetry::trace::FutureExt;
pub use opentelemetry::trace::Status;
pub use opentelemetry::Context;
pub use opentelemetry_contrib::trace::propagator::trace_context_response::TraceContextResponsePropagator;