Handle multiple ndc-model versions during query planning and execution (#790)

The PR adds code that handles multiple versions of the ndc-models during
the execution pipeline. Depending on whether a connector supports `v01`
or `v02` models, a different set of types is used to send and receive
the http requests.

However, the engine internally still uses the latest (v02) models inside
its IR. Unfortunately, it was going to be quite traumatic to prevent the
engine from using ndc models inside the IR and during response
processing and remote joins. This means that the engine generates v02
requests, and these are downgraded into v01 requests. v01 responses are
upgraded to v02 responses before being processed by the engine.

The ndc client (`execute::ndc::client`) now only takes new wrapper enum
types (`execute::ndc::types`) that split between v01 or v02
requests/responses. Every place that carries an ndc request now carries
this type instead, which allows it to carry either a v01 or a v02
request.

When ndc requests are created during planning, all creation goes via the
new `execute::plan::ndc_request` module. This inspects the connector's
supported version, creates the necessary request, and if needed,
downgrades it to v01.

When ndc responses are read during planning or during remote joins, they
are upgraded to v02 via helper functions defined on the types in
`execute::ndc::types`.

The upgrade/downgrade code is located in `execute::ndc::migration`. Keep
in mind the "v02" types are currently the same as the "v01" types so the
migration code is not doing much. This will change as the v02 types are
modified.

However, this approach has its drawbacks. One is that it prevents
changes to the ndc types [like
this](https://github.com/hasura/ndc-spec/pull/158) without a fair bit of
pain (see
[comment](https://github.com/hasura/ndc-spec/pull/158#issuecomment-2202127094)).
Another is that the downgrade code can fail at runtime and it is not
immediately obvious to developers using new, but unused, v02 features
that their new feature would fail on v01, because that mapping to v01
has already been written. Another is that we're paying some (small,
probably?) performance cost by upgrading/downgrading types because we
need to rebuild data structures.

Also:
* `execute::ndc::response` has been merged into `execute::ndc::client`,
since it was inextricably linked.
V3_GIT_ORIGIN_REV_ID: f3f36736b52058323d789c378fed06af566f39a3
This commit is contained in:
Daniel Chambers 2024-07-03 18:31:42 +10:00 committed by hasura-bot
parent c1b9592f6b
commit e380876823
39 changed files with 1070 additions and 226 deletions

1
v3/Cargo.lock generated
View File

@ -1800,6 +1800,7 @@ dependencies = [
"lang-graphql",
"metadata-resolve",
"mockito",
"ndc-models 0.1.4 (git+https://github.com/hasura/ndc-spec.git?tag=v0.1.4)",
"ndc-models 0.1.4 (git+https://github.com/hasura/ndc-spec.git?rev=002923d5c337930b9c9007f05c768fe9a0575612)",
"nonempty",
"open-dds",

View File

@ -6,6 +6,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "author",
"query": {
"fields": {
@ -51,6 +52,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}

View File

@ -9,6 +9,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "Album",
"query": {
"fields": {
@ -31,6 +32,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}
@ -50,6 +52,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "Artist",
"query": {
"fields": {
@ -86,6 +89,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}
@ -103,6 +107,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "Track",
"query": {
"fields": {
@ -134,6 +139,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}

View File

@ -9,6 +9,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "Album",
"query": {
"fields": {
@ -59,6 +60,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}
@ -78,6 +80,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "Artist",
"query": {
"fields": {
@ -109,6 +112,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}
@ -129,6 +133,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "Album",
"query": {
"fields": {
@ -160,6 +165,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}

View File

@ -9,6 +9,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "Album",
"query": {
"fields": {
@ -31,6 +32,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}
@ -53,6 +55,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "Track",
"query": {
"fields": {
@ -89,6 +92,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}
@ -106,6 +110,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "Album",
"query": {
"fields": {
@ -137,6 +142,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}
@ -159,6 +165,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "Artist",
"query": {
"fields": {
@ -190,6 +197,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}

View File

@ -9,6 +9,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "author",
"query": {
"fields": {
@ -31,6 +32,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}
@ -47,6 +49,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "article",
"query": {
"fields": {
@ -78,6 +81,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}

View File

@ -9,6 +9,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "get_actor_by_id",
"query": {
"fields": {
@ -49,6 +50,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "author",
"query": {
"fields": {
@ -83,6 +85,7 @@
"ndcExplain": {
"type": "response",
"value": {
"version": "v0.1.x",
"details": {
"explain": "<redacted>"
}
@ -97,6 +100,7 @@
"ndcRequest": {
"type": "query",
"value": {
"version": "v0.1.x",
"collection": "movies",
"query": {
"fields": {

View File

@ -30,6 +30,7 @@ derive_more = { workspace = true }
futures-util = { workspace = true }
indexmap = { workspace = true, features = ["serde"] }
ndc-models = { workspace = true }
ndc-models-v01 = { workspace = true }
nonempty = { workspace = true }
reqwest = { workspace = true, features = ["json", "multipart"] }
schemars = { workspace = true, features = ["smol_str"] }

View File

@ -76,7 +76,7 @@ impl TraceableError for RequestError {
#[transitive(from(gql::normalized_ast::Error, FieldInternalError))]
#[transitive(from(gql::introspection::Error, FieldInternalError))]
pub enum FieldError {
#[error("error from data source: {}", connector_error.error_response.message)]
#[error("error from data source: {}", connector_error.error_response.message())]
NDCExpected {
connector_error: ndc_client::ConnectorError,
},
@ -92,7 +92,7 @@ impl FieldError {
fn get_details(&self) -> Option<serde_json::Value> {
match self {
Self::NDCExpected { connector_error } => {
Some(connector_error.error_response.details.clone())
Some(connector_error.error_response.details().clone())
}
Self::InternalError(internal) => internal.get_details(),
Self::FieldNotFoundInService { .. } => None,
@ -192,7 +192,7 @@ impl NDCUnexpectedError {
pub fn get_details(&self) -> Option<serde_json::Value> {
match self {
Self::NDCClientError(ndc_client::Error::Connector(ce)) => {
Some(ce.error_response.details.clone())
Some(ce.error_response.details().clone())
}
_ => None,
}

View File

@ -228,7 +228,7 @@ async fn get_join_steps(
let mut sequence_steps = vec![];
if let JoinNode::Remote((remote_join, _join_id)) = location.join_node {
let mut query_request = remote_join.target_ndc_ir;
query_request.variables = Some(vec![]);
query_request.set_variables(Some(vec![]));
let ndc_request = types::NDCRequest::Query(query_request);
let data_connector_explain = fetch_explain_from_data_connector(
expose_internal_errors,

View File

@ -1,13 +1,15 @@
use std::collections::BTreeMap;
use ndc_models;
use nonempty::NonEmpty;
use serde::Serialize;
use lang_graphql::http::GraphQLError;
use ndc_models as ndc_models_v02;
use ndc_models_v01;
use tracing_util::Traceable;
use crate::error;
use crate::ndc;
use crate::GraphQLErrors;
#[derive(Debug, Serialize)]
@ -94,7 +96,7 @@ pub(crate) enum ForEachStep {
#[serde(tag = "type", content = "value")]
pub(crate) enum NDCExplainResponse {
NotSupported,
Response(ndc_models::ExplainResponse),
Response(ndc::NdcExplainResponse),
Error(GraphQLError),
}
@ -102,8 +104,8 @@ pub(crate) enum NDCExplainResponse {
#[serde(rename_all = "camelCase")]
#[serde(tag = "type", content = "value")]
pub(crate) enum NDCRequest {
Query(ndc_models::QueryRequest),
Mutation(ndc_models::MutationRequest),
Query(ndc::NdcQueryRequest),
Mutation(ndc::NdcMutationRequest),
}
impl NDCExplainResponse {
@ -113,7 +115,7 @@ impl NDCExplainResponse {
) -> Self {
Self::Error(error.to_graphql_error(expose_internal_errors, None))
}
pub(crate) fn success(response: ndc_models::ExplainResponse) -> Self {
pub(crate) fn success(response: ndc::NdcExplainResponse) -> Self {
Self::Response(response)
}
pub(crate) fn not_supported() -> Self {
@ -175,12 +177,22 @@ fn redact_ndc_explain_response(ndc_explain: NDCExplainResponse) -> NDCExplainRes
match ndc_explain {
NDCExplainResponse::NotSupported => NDCExplainResponse::NotSupported,
NDCExplainResponse::Error(error) => NDCExplainResponse::Error(error),
NDCExplainResponse::Response(_response) => {
NDCExplainResponse::Response(response) => {
let mut redacted_details = BTreeMap::new();
redacted_details.insert("explain".to_string(), "<redacted>".to_string());
NDCExplainResponse::Response(ndc_models::ExplainResponse {
let ndc_response = match response {
ndc::NdcExplainResponse::V01(_) => {
ndc::NdcExplainResponse::V01(ndc_models_v01::ExplainResponse {
details: redacted_details,
})
}
ndc::NdcExplainResponse::V02(_) => {
ndc::NdcExplainResponse::V02(ndc_models_v02::ExplainResponse {
details: redacted_details,
})
}
};
NDCExplainResponse::Response(ndc_response)
}
}
}

View File

@ -1,4 +1,7 @@
pub mod response;
pub mod client;
pub mod migration;
pub mod types;
pub use types::*;
use std::borrow::Cow;
@ -7,10 +10,8 @@ use axum::http::HeaderMap;
use lang_graphql::ast::common as ast;
use tracing_util::{set_attribute_on_active_span, AttributeVisibility, SpanVisibility};
use super::error;
use super::{HttpContext, ProjectId};
pub mod client;
use crate::error;
use crate::{HttpContext, ProjectId};
/// The column name used by NDC query response of functions
/// <https://github.com/hasura/ndc-spec/blob/main/specification/src/specification/queries/functions.md?plain=1#L3>
@ -19,12 +20,12 @@ pub const FUNCTION_IR_VALUE_COLUMN_NAME: &str = "__value";
/// Executes a NDC operation
pub async fn execute_ndc_query<'n, 's>(
http_context: &HttpContext,
query: &ndc_models::QueryRequest,
query: &NdcQueryRequest,
data_connector: &metadata_resolve::DataConnectorLink,
execution_span_attribute: &'static str,
field_span_attribute: String,
project_id: Option<&ProjectId>,
) -> Result<Vec<ndc_models::RowSet>, error::FieldError> {
) -> Result<NdcQueryResponse, error::FieldError> {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(
@ -49,7 +50,7 @@ pub async fn execute_ndc_query<'n, 's>(
let connector_response =
fetch_from_data_connector(http_context, query, data_connector, project_id)
.await?;
Ok(connector_response.0)
Ok(connector_response)
})
},
)
@ -58,10 +59,10 @@ pub async fn execute_ndc_query<'n, 's>(
pub async fn fetch_from_data_connector<'s>(
http_context: &HttpContext,
query_request: &ndc_models::QueryRequest,
query_request: &NdcQueryRequest,
data_connector: &metadata_resolve::DataConnectorLink,
project_id: Option<&ProjectId>,
) -> Result<ndc_models::QueryResponse, client::Error> {
) -> Result<NdcQueryResponse, client::Error> {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(
@ -108,12 +109,12 @@ pub fn append_project_id_to_headers<'a>(
/// Executes a NDC mutation
pub(crate) async fn execute_ndc_mutation<'n, 's, 'ir>(
http_context: &HttpContext,
query: &ndc_models::MutationRequest,
query: &NdcMutationRequest,
data_connector: &metadata_resolve::DataConnectorLink,
execution_span_attribute: String,
field_span_attribute: String,
project_id: Option<&ProjectId>,
) -> Result<ndc_models::MutationResponse, error::FieldError> {
) -> Result<NdcMutationResponse, error::FieldError> {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(
@ -151,10 +152,10 @@ pub(crate) async fn execute_ndc_mutation<'n, 's, 'ir>(
pub(crate) async fn fetch_from_data_connector_mutation<'s>(
http_context: &HttpContext,
query_request: &ndc_models::MutationRequest,
query_request: &NdcMutationRequest,
data_connector: &metadata_resolve::DataConnectorLink,
project_id: Option<&ProjectId>,
) -> Result<ndc_models::MutationResponse, client::Error> {
) -> Result<NdcMutationResponse, client::Error> {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(

View File

@ -1,12 +1,15 @@
use std::borrow::Cow;
use reqwest::header::{HeaderMap, HeaderValue};
use serde::{de::DeserializeOwned, Deserialize};
use serde::de::DeserializeOwned;
use thiserror::Error;
use tracing_util::{SpanVisibility, Successful};
use crate::ndc::response::handle_response_with_size_limit;
use super::{
NdcErrorResponse, NdcExplainResponse, NdcMutationRequest, NdcMutationResponse, NdcQueryRequest,
NdcQueryResponse,
};
/// Error type for the NDC API client interactions
#[derive(Debug, Error)]
@ -46,10 +49,10 @@ impl tracing_util::TraceableError for Error {
}
#[derive(Debug, Clone, Error)]
#[error("connector returned status code {status} with message: {}", error_response.message)]
#[error("connector returned status code {status} with message: {}", error_response.message())]
pub struct ConnectorError {
pub status: reqwest::StatusCode,
pub error_response: ndc_models::ErrorResponse,
pub error_response: NdcErrorResponse,
}
#[derive(Debug, Clone, Error)]
@ -74,8 +77,8 @@ pub struct Configuration<'s> {
/// <https://hasura.github.io/ndc-spec/specification/explain.html?highlight=%2Fexplain#request>
pub async fn explain_query_post(
configuration: Configuration<'_>,
query_request: &ndc_models::QueryRequest,
) -> Result<ndc_models::ExplainResponse, Error> {
query_request: &NdcQueryRequest,
) -> Result<NdcExplainResponse, Error> {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(
@ -86,11 +89,35 @@ pub async fn explain_query_post(
Box::pin(async {
let url = append_path(configuration.base_path, &["query", "explain"])?;
let response_size_limit = configuration.response_size_limit;
match query_request {
NdcQueryRequest::V01(req) => {
let request =
construct_request(configuration, reqwest::Method::POST, url, |r| {
r.json(query_request)
r.json(req)
});
execute_request(request, response_size_limit).await
let response = execute_request(
request,
response_size_limit,
NdcErrorResponse::V01,
)
.await?;
Ok(NdcExplainResponse::V01(response))
}
NdcQueryRequest::V02(req) => {
let request =
construct_request(configuration, reqwest::Method::POST, url, |r| {
r.json(req)
});
let response = execute_request(
request,
response_size_limit,
NdcErrorResponse::V02,
)
.await?;
Ok(NdcExplainResponse::V02(response))
}
}
})
},
)
@ -102,8 +129,8 @@ pub async fn explain_query_post(
/// <https://hasura.github.io/ndc-spec/specification/explain.html?highlight=%2Fexplain#request-1>
pub async fn explain_mutation_post(
configuration: Configuration<'_>,
mutation_request: &ndc_models::MutationRequest,
) -> Result<ndc_models::ExplainResponse, Error> {
mutation_request: &NdcMutationRequest,
) -> Result<NdcExplainResponse, Error> {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(
@ -114,11 +141,35 @@ pub async fn explain_mutation_post(
Box::pin(async {
let url = append_path(configuration.base_path, &["mutation", "explain"])?;
let response_size_limit = configuration.response_size_limit;
match mutation_request {
NdcMutationRequest::V01(req) => {
let request =
construct_request(configuration, reqwest::Method::POST, url, |r| {
r.json(mutation_request)
r.json(req)
});
execute_request(request, response_size_limit).await
let response = execute_request(
request,
response_size_limit,
NdcErrorResponse::V01,
)
.await?;
Ok(NdcExplainResponse::V01(response))
}
NdcMutationRequest::V02(req) => {
let request =
construct_request(configuration, reqwest::Method::POST, url, |r| {
r.json(req)
});
let response = execute_request(
request,
response_size_limit,
NdcErrorResponse::V02,
)
.await?;
Ok(NdcExplainResponse::V02(response))
}
}
})
},
)
@ -130,8 +181,8 @@ pub async fn explain_mutation_post(
/// <https://hasura.github.io/ndc-spec/specification/mutations/index.html>
pub async fn mutation_post(
configuration: Configuration<'_>,
mutation_request: &ndc_models::MutationRequest,
) -> Result<ndc_models::MutationResponse, Error> {
mutation_request: &NdcMutationRequest,
) -> Result<NdcMutationResponse, Error> {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(
@ -142,11 +193,35 @@ pub async fn mutation_post(
Box::pin(async {
let url = append_path(configuration.base_path, &["mutation"])?;
let response_size_limit = configuration.response_size_limit;
match mutation_request {
NdcMutationRequest::V01(req) => {
let request =
construct_request(configuration, reqwest::Method::POST, url, |r| {
r.json(mutation_request)
r.json(req)
});
execute_request(request, response_size_limit).await
let response = execute_request(
request,
response_size_limit,
NdcErrorResponse::V01,
)
.await?;
Ok(NdcMutationResponse::V01(response))
}
NdcMutationRequest::V02(req) => {
let request =
construct_request(configuration, reqwest::Method::POST, url, |r| {
r.json(req)
});
let response = execute_request(
request,
response_size_limit,
NdcErrorResponse::V02,
)
.await?;
Ok(NdcMutationResponse::V02(response))
}
}
})
},
)
@ -158,8 +233,8 @@ pub async fn mutation_post(
/// <https://hasura.github.io/ndc-spec/specification/queries/index.html>
pub async fn query_post(
configuration: Configuration<'_>,
query_request: &ndc_models::QueryRequest,
) -> Result<ndc_models::QueryResponse, Error> {
query_request: &NdcQueryRequest,
) -> Result<NdcQueryResponse, Error> {
let tracer = tracing_util::global_tracer();
tracer
.in_span_async(
@ -170,11 +245,35 @@ pub async fn query_post(
Box::pin(async {
let url = append_path(configuration.base_path, &["query"])?;
let response_size_limit = configuration.response_size_limit;
match query_request {
NdcQueryRequest::V01(req) => {
let request =
construct_request(configuration, reqwest::Method::POST, url, |r| {
r.json(query_request)
r.json(req)
});
execute_request(request, response_size_limit).await
let response = execute_request(
request,
response_size_limit,
NdcErrorResponse::V01,
)
.await?;
Ok(NdcQueryResponse::V01(response))
}
NdcQueryRequest::V02(req) => {
let request =
construct_request(configuration, reqwest::Method::POST, url, |r| {
r.json(req)
});
let response = execute_request(
request,
response_size_limit,
NdcErrorResponse::V02,
)
.await?;
Ok(NdcQueryResponse::V02(response))
}
}
})
},
)
@ -219,10 +318,16 @@ fn construct_request(
}
/// Execute a request and deserialize the JSON response
async fn execute_request<T: DeserializeOwned>(
async fn execute_request<TResponse, TResponseError, F>(
request: reqwest::RequestBuilder,
response_size_limit: Option<usize>,
) -> Result<T, Error> {
to_error: F,
) -> Result<TResponse, Error>
where
TResponse: DeserializeOwned,
TResponseError: DeserializeOwned,
F: FnOnce(TResponseError) -> NdcErrorResponse + Send,
{
let tracer = tracing_util::global_tracer();
let response = tracer
@ -260,7 +365,7 @@ async fn execute_request<T: DeserializeOwned>(
};
Ok(result)
} else {
Err(construct_error(response).await)
Err(construct_error(response, to_error).await)
}
})
},
@ -269,14 +374,18 @@ async fn execute_request<T: DeserializeOwned>(
}
/// Build an error from the response status and content
async fn construct_error(response: reqwest::Response) -> Error {
async fn construct_error<TResponseError, F>(response: reqwest::Response, to_error: F) -> Error
where
TResponseError: DeserializeOwned,
F: FnOnce(TResponseError) -> NdcErrorResponse,
{
let status = response.status();
match response.json().await {
Ok(body) => match ndc_models::ErrorResponse::deserialize(&body) {
Ok(body) => match TResponseError::deserialize(&body) {
Ok(error_response) => {
let connector_error = ConnectorError {
status,
error_response,
error_response: to_error(error_response),
};
Error::Connector(connector_error)
}
@ -294,6 +403,49 @@ async fn construct_error(response: reqwest::Response) -> Error {
}
}
/// Handle response return from an NDC request by applying the size limit and
/// deserializing into a JSON value
async fn handle_response_with_size_limit<T: for<'de> serde::Deserialize<'de>>(
response: reqwest::Response,
size_limit: usize,
) -> Result<T, Error> {
if let Some(content_length) = &response.content_length() {
// Check with content length
if *content_length > size_limit as u64 {
Err(Error::ResponseTooLarge(format!(
"Received content length {content_length} exceeds the limit {size_limit}"
)))
} else {
Ok(response.json().await?)
}
} else {
// If no content length found, then check chunk-by-chunk
handle_response_by_chunks_with_size_limit(response, size_limit).await
}
}
/// Handle response by chunks. For each chunk consumed, check if the total size exceeds the limit.
///
/// This logic is separated in a function to allow testing.
async fn handle_response_by_chunks_with_size_limit<T: for<'de> serde::Deserialize<'de>>(
response: reqwest::Response,
size_limit: usize,
) -> Result<T, Error> {
let mut size = 0;
let mut buf = bytes::BytesMut::new();
let mut response = response;
while let Some(chunk) = response.chunk().await? {
size += chunk.len();
if size > size_limit {
return Err(Error::ResponseTooLarge(format!(
"Size exceeds the limit {size_limit}"
)));
}
buf.extend_from_slice(&chunk);
}
Ok(serde_json::from_slice(&buf)?)
}
#[cfg(test)]
mod tests {
use super::append_path;
@ -337,4 +489,96 @@ mod tests {
let result = append_path(&url, &paths).unwrap();
assert_eq!(result.as_str(), "http://hasura.io/ndc/query/explain");
}
use pretty_assertions::assert_eq;
#[tokio::test]
async fn test_content_length() {
let mut server = mockito::Server::new_async().await;
let test_api = server
.mock("GET", "/test")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"message": "hello"}"#)
.create();
let response = reqwest::get(server.url() + "/test").await.unwrap();
test_api.assert();
let err = super::handle_response_with_size_limit::<serde_json::Value>(response, 10)
.await
.unwrap_err();
assert_eq!(
err.to_string(),
"response received from connector is too large: Received content length 20 exceeds the limit 10"
);
}
#[tokio::test]
async fn test_chunk_by_chunk() {
let mut server = mockito::Server::new_async().await;
let test_api = server
.mock("GET", "/test")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"message": "hello"}"#)
.create();
let response = reqwest::get(server.url() + "/test").await.unwrap();
test_api.assert();
let err =
super::handle_response_by_chunks_with_size_limit::<serde_json::Value>(response, 5)
.await
.unwrap_err();
assert_eq!(
err.to_string(),
"response received from connector is too large: Size exceeds the limit 5"
);
}
#[tokio::test]
async fn test_success() {
let json = serde_json::json!(
[
{"name": "Alice"},
{"name": "Bob"},
{"name": "Charlie"}
]
);
let mut server = mockito::Server::new_async().await;
let test_api = server
.mock("GET", "/test")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(serde_json::to_vec(&json).unwrap())
.create();
let response = reqwest::get(server.url() + "/test").await.unwrap();
test_api.assert();
let res = super::handle_response_with_size_limit::<serde_json::Value>(response, 100)
.await
.unwrap();
assert_eq!(json, res);
}
#[tokio::test]
async fn test_success_by_chunks() {
let json = serde_json::json!(
[
{"name": "Alice"},
{"name": "Bob"},
{"name": "Charlie"}
]
);
let mut server = mockito::Server::new_async().await;
let test_api = server
.mock("GET", "/test")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(serde_json::to_vec(&json).unwrap())
.create();
let response = reqwest::get(server.url() + "/test").await.unwrap();
test_api.assert();
let res =
super::handle_response_by_chunks_with_size_limit::<serde_json::Value>(response, 100)
.await
.unwrap();
assert_eq!(json, res);
}
}

View File

@ -0,0 +1,6 @@
use thiserror::Error;
pub mod v01;
#[derive(Error, Debug)]
pub enum NdcDowngradeError {}

View File

@ -0,0 +1,440 @@
use ndc_models;
use ndc_models_v01;
use super::NdcDowngradeError;
pub fn upgrade_rowset_to_v02(rowset: ndc_models_v01::RowSet) -> ndc_models::RowSet {
ndc_models::RowSet {
aggregates: rowset.aggregates,
rows: rowset.rows.map(|rows| {
rows.into_iter()
.map(|row| {
row.into_iter()
.map(|(k, ndc_models_v01::RowFieldValue(v))| {
(k, ndc_models::RowFieldValue(v))
})
.collect()
})
.collect()
}),
}
}
pub fn upgrade_mutation_response_to_v02(
mutation_response: ndc_models_v01::MutationResponse,
) -> ndc_models::MutationResponse {
ndc_models::MutationResponse {
operation_results: mutation_response
.operation_results
.into_iter()
.map(upgrade_mutation_operation_result_to_latest)
.collect(),
}
}
fn upgrade_mutation_operation_result_to_latest(
mutation_operation_result: ndc_models_v01::MutationOperationResults,
) -> ndc_models::MutationOperationResults {
match mutation_operation_result {
ndc_models_v01::MutationOperationResults::Procedure { result } => {
ndc_models::MutationOperationResults::Procedure { result }
}
}
}
pub fn downgrade_v02_query_request(
query_request: ndc_models::QueryRequest,
) -> Result<ndc_models_v01::QueryRequest, NdcDowngradeError> {
Ok(ndc_models_v01::QueryRequest {
arguments: query_request
.arguments
.into_iter()
.map(|(name, argument)| (name, downgrade_v02_argument(argument)))
.collect(),
collection: query_request.collection,
query: downgrade_v02_query(query_request.query),
collection_relationships: query_request
.collection_relationships
.into_iter()
.map(|(name, relationship)| (name, downgrade_v02_relationship(relationship)))
.collect(),
variables: query_request.variables,
})
}
fn downgrade_v02_argument(argument: ndc_models::Argument) -> ndc_models_v01::Argument {
match argument {
ndc_models::Argument::Variable { name } => ndc_models_v01::Argument::Variable { name },
ndc_models::Argument::Literal { value } => ndc_models_v01::Argument::Literal { value },
}
}
fn downgrade_v02_query(query: ndc_models::Query) -> ndc_models_v01::Query {
ndc_models_v01::Query {
aggregates: query.aggregates.map(|aggregates| {
aggregates
.into_iter()
.map(|(name, aggregate)| (name, downgrade_v02_aggregate(aggregate)))
.collect()
}),
fields: query.fields.map(|fields| {
fields
.into_iter()
.map(|(name, field)| (name, downgrade_v02_field(field)))
.collect()
}),
limit: query.limit,
offset: query.offset,
order_by: query.order_by.map(downgrade_v02_order_by),
predicate: query.predicate.map(downgrade_v02_predicate),
}
}
fn downgrade_v02_aggregate(aggregate: ndc_models::Aggregate) -> ndc_models_v01::Aggregate {
match aggregate {
ndc_models::Aggregate::ColumnCount {
column,
field_path,
distinct,
} => ndc_models_v01::Aggregate::ColumnCount {
column,
field_path,
distinct,
},
ndc_models::Aggregate::SingleColumn {
column,
field_path,
function,
} => ndc_models_v01::Aggregate::SingleColumn {
column,
field_path,
function,
},
ndc_models::Aggregate::StarCount {} => ndc_models_v01::Aggregate::StarCount {},
}
}
fn downgrade_v02_field(field: ndc_models::Field) -> ndc_models_v01::Field {
match field {
ndc_models::Field::Column {
column,
fields,
arguments,
} => ndc_models_v01::Field::Column {
column,
fields: fields.map(downgrade_v02_nested_field),
arguments: arguments
.into_iter()
.map(|(name, argument)| (name, downgrade_v02_argument(argument)))
.collect(),
},
ndc_models::Field::Relationship {
query,
relationship,
arguments,
} => ndc_models_v01::Field::Relationship {
query: Box::new(downgrade_v02_query(*query)),
relationship,
arguments: arguments
.into_iter()
.map(|(name, argument)| (name, downgrade_v02_relationship_argument(argument)))
.collect(),
},
}
}
fn downgrade_v02_nested_field(
nested_field: ndc_models::NestedField,
) -> ndc_models_v01::NestedField {
match nested_field {
ndc_models::NestedField::Object(nested_object) => {
ndc_models_v01::NestedField::Object(downgrade_v02_nested_object(nested_object))
}
ndc_models::NestedField::Array(nested_array) => {
ndc_models_v01::NestedField::Array(downgrade_v02_nested_array(nested_array))
}
}
}
fn downgrade_v02_nested_object(
nested_object: ndc_models::NestedObject,
) -> ndc_models_v01::NestedObject {
ndc_models_v01::NestedObject {
fields: nested_object
.fields
.into_iter()
.map(|(name, field)| (name, downgrade_v02_field(field)))
.collect(),
}
}
fn downgrade_v02_nested_array(
nested_array: ndc_models::NestedArray,
) -> ndc_models_v01::NestedArray {
ndc_models_v01::NestedArray {
fields: Box::new(downgrade_v02_nested_field(*nested_array.fields)),
}
}
fn downgrade_v02_relationship_argument(
relationship_argument: ndc_models::RelationshipArgument,
) -> ndc_models_v01::RelationshipArgument {
match relationship_argument {
ndc_models::RelationshipArgument::Variable { name } => {
ndc_models_v01::RelationshipArgument::Variable { name }
}
ndc_models::RelationshipArgument::Literal { value } => {
ndc_models_v01::RelationshipArgument::Literal { value }
}
ndc_models::RelationshipArgument::Column { name } => {
ndc_models_v01::RelationshipArgument::Column { name }
}
}
}
fn downgrade_v02_order_by(order_by: ndc_models::OrderBy) -> ndc_models_v01::OrderBy {
ndc_models_v01::OrderBy {
elements: order_by
.elements
.into_iter()
.map(downgrade_v02_order_by_element)
.collect(),
}
}
fn downgrade_v02_order_by_element(
order_by_element: ndc_models::OrderByElement,
) -> ndc_models_v01::OrderByElement {
ndc_models_v01::OrderByElement {
order_direction: downgrade_v02_order_direction(order_by_element.order_direction),
target: downgrade_v02_order_by_target(order_by_element.target),
}
}
fn downgrade_v02_order_direction(
order_direction: ndc_models::OrderDirection,
) -> ndc_models_v01::OrderDirection {
match order_direction {
ndc_models::OrderDirection::Asc => ndc_models_v01::OrderDirection::Asc,
ndc_models::OrderDirection::Desc => ndc_models_v01::OrderDirection::Desc,
}
}
fn downgrade_v02_order_by_target(
target: ndc_models::OrderByTarget,
) -> ndc_models_v01::OrderByTarget {
match target {
ndc_models::OrderByTarget::Column {
name,
field_path,
path,
} => ndc_models_v01::OrderByTarget::Column {
name,
field_path,
path: path.into_iter().map(downgrade_v02_path_element).collect(),
},
ndc_models::OrderByTarget::SingleColumnAggregate {
column,
field_path,
function,
path,
} => ndc_models_v01::OrderByTarget::SingleColumnAggregate {
column,
field_path,
function,
path: path.into_iter().map(downgrade_v02_path_element).collect(),
},
ndc_models::OrderByTarget::StarCountAggregate { path } => {
ndc_models_v01::OrderByTarget::StarCountAggregate {
path: path.into_iter().map(downgrade_v02_path_element).collect(),
}
}
}
}
fn downgrade_v02_path_element(
path_element: ndc_models::PathElement,
) -> ndc_models_v01::PathElement {
ndc_models_v01::PathElement {
relationship: path_element.relationship,
arguments: path_element
.arguments
.into_iter()
.map(|(name, argument)| (name, downgrade_v02_relationship_argument(argument)))
.collect(),
predicate: path_element
.predicate
.map(|predicate| Box::new(downgrade_v02_predicate(*predicate))),
}
}
fn downgrade_v02_predicate(predicate: ndc_models::Expression) -> ndc_models_v01::Expression {
match predicate {
ndc_models::Expression::And { expressions } => ndc_models_v01::Expression::And {
expressions: expressions
.into_iter()
.map(downgrade_v02_predicate)
.collect(),
},
ndc_models::Expression::Or { expressions } => ndc_models_v01::Expression::Or {
expressions: expressions
.into_iter()
.map(downgrade_v02_predicate)
.collect(),
},
ndc_models::Expression::Not { expression } => ndc_models_v01::Expression::Not {
expression: Box::new(downgrade_v02_predicate(*expression)),
},
ndc_models::Expression::UnaryComparisonOperator { column, operator } => {
ndc_models_v01::Expression::UnaryComparisonOperator {
column: downgrade_v02_comparison_target(column),
operator: downgrade_v02_unary_comparison_operator(operator),
}
}
ndc_models::Expression::BinaryComparisonOperator {
column,
operator,
value,
} => ndc_models_v01::Expression::BinaryComparisonOperator {
column: downgrade_v02_comparison_target(column),
operator,
value: downgrade_v02_binary_comparison_value(value),
},
ndc_models::Expression::Exists {
in_collection,
predicate,
} => ndc_models_v01::Expression::Exists {
in_collection: downgrade_v02_exists_in_collection(in_collection),
predicate: predicate.map(|predicate| Box::new(downgrade_v02_predicate(*predicate))),
},
}
}
fn downgrade_v02_comparison_target(
column: ndc_models::ComparisonTarget,
) -> ndc_models_v01::ComparisonTarget {
match column {
ndc_models::ComparisonTarget::Column {
name,
field_path,
path,
} => ndc_models_v01::ComparisonTarget::Column {
name,
field_path,
path: path.into_iter().map(downgrade_v02_path_element).collect(),
},
ndc_models::ComparisonTarget::RootCollectionColumn { name, field_path } => {
ndc_models_v01::ComparisonTarget::RootCollectionColumn { name, field_path }
}
}
}
fn downgrade_v02_unary_comparison_operator(
operator: ndc_models::UnaryComparisonOperator,
) -> ndc_models_v01::UnaryComparisonOperator {
match operator {
ndc_models::UnaryComparisonOperator::IsNull => {
ndc_models_v01::UnaryComparisonOperator::IsNull
}
}
}
fn downgrade_v02_binary_comparison_value(
value: ndc_models::ComparisonValue,
) -> ndc_models_v01::ComparisonValue {
match value {
ndc_models::ComparisonValue::Column { column } => ndc_models_v01::ComparisonValue::Column {
column: downgrade_v02_comparison_target(column),
},
ndc_models::ComparisonValue::Scalar { value } => {
ndc_models_v01::ComparisonValue::Scalar { value }
}
ndc_models::ComparisonValue::Variable { name } => {
ndc_models_v01::ComparisonValue::Variable { name }
}
}
}
fn downgrade_v02_exists_in_collection(
exists_in_collection: ndc_models::ExistsInCollection,
) -> ndc_models_v01::ExistsInCollection {
match exists_in_collection {
ndc_models::ExistsInCollection::Related {
relationship,
arguments,
} => ndc_models_v01::ExistsInCollection::Related {
relationship,
arguments: arguments
.into_iter()
.map(|(name, argument)| (name, downgrade_v02_relationship_argument(argument)))
.collect(),
},
ndc_models::ExistsInCollection::Unrelated {
collection,
arguments,
} => ndc_models_v01::ExistsInCollection::Unrelated {
collection,
arguments: arguments
.into_iter()
.map(|(name, argument)| (name, downgrade_v02_relationship_argument(argument)))
.collect(),
},
}
}
fn downgrade_v02_relationship(
relationship: ndc_models::Relationship,
) -> ndc_models_v01::Relationship {
ndc_models_v01::Relationship {
column_mapping: relationship.column_mapping,
relationship_type: downgrade_v02_relationship_type(relationship.relationship_type),
target_collection: relationship.target_collection,
arguments: relationship
.arguments
.into_iter()
.map(|(name, argument)| (name, downgrade_v02_relationship_argument(argument)))
.collect(),
}
}
fn downgrade_v02_relationship_type(
relationship_type: ndc_models::RelationshipType,
) -> ndc_models_v01::RelationshipType {
match relationship_type {
ndc_models::RelationshipType::Object => ndc_models_v01::RelationshipType::Object,
ndc_models::RelationshipType::Array => ndc_models_v01::RelationshipType::Array,
}
}
pub fn downgrade_v02_mutation_request(
mutation_request: ndc_models::MutationRequest,
) -> Result<ndc_models_v01::MutationRequest, NdcDowngradeError> {
Ok(ndc_models_v01::MutationRequest {
operations: mutation_request
.operations
.into_iter()
.map(downgrade_v02_mutation_operation)
.collect(),
collection_relationships: mutation_request
.collection_relationships
.into_iter()
.map(|(name, relationship)| (name, downgrade_v02_relationship(relationship)))
.collect(),
})
}
fn downgrade_v02_mutation_operation(
mutation_operation: ndc_models::MutationOperation,
) -> ndc_models_v01::MutationOperation {
match mutation_operation {
ndc_models::MutationOperation::Procedure {
name,
arguments,
fields,
} => ndc_models_v01::MutationOperation::Procedure {
name,
arguments,
fields: fields.map(downgrade_v02_nested_field),
},
}
}

View File

@ -1,139 +0,0 @@
use super::client as ndc_client;
/// Handle response return from an NDC request by applying the size limit and
/// deserializing into a JSON value
pub(crate) async fn handle_response_with_size_limit<T: for<'de> serde::Deserialize<'de>>(
response: reqwest::Response,
size_limit: usize,
) -> Result<T, ndc_client::Error> {
if let Some(content_length) = &response.content_length() {
// Check with content length
if *content_length > size_limit as u64 {
Err(ndc_client::Error::ResponseTooLarge(format!(
"Received content length {content_length} exceeds the limit {size_limit}"
)))
} else {
Ok(response.json().await?)
}
} else {
// If no content length found, then check chunk-by-chunk
handle_response_by_chunks_with_size_limit(response, size_limit).await
}
}
/// Handle response by chunks. For each chunk consumed, check if the total size exceeds the limit.
///
/// This logic is separated in a function to allow testing.
async fn handle_response_by_chunks_with_size_limit<T: for<'de> serde::Deserialize<'de>>(
response: reqwest::Response,
size_limit: usize,
) -> Result<T, ndc_client::Error> {
let mut size = 0;
let mut buf = bytes::BytesMut::new();
let mut response = response;
while let Some(chunk) = response.chunk().await? {
size += chunk.len();
if size > size_limit {
return Err(ndc_client::Error::ResponseTooLarge(format!(
"Size exceeds the limit {size_limit}"
)));
}
buf.extend_from_slice(&chunk);
}
Ok(serde_json::from_slice(&buf)?)
}
#[cfg(test)]
mod test {
use pretty_assertions::assert_eq;
#[tokio::test]
async fn test_content_length() {
let mut server = mockito::Server::new_async().await;
let test_api = server
.mock("GET", "/test")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"message": "hello"}"#)
.create();
let response = reqwest::get(server.url() + "/test").await.unwrap();
test_api.assert();
let err = super::handle_response_with_size_limit::<serde_json::Value>(response, 10)
.await
.unwrap_err();
assert_eq!(
err.to_string(),
"response received from connector is too large: Received content length 20 exceeds the limit 10"
);
}
#[tokio::test]
async fn test_chunk_by_chunk() {
let mut server = mockito::Server::new_async().await;
let test_api = server
.mock("GET", "/test")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(r#"{"message": "hello"}"#)
.create();
let response = reqwest::get(server.url() + "/test").await.unwrap();
test_api.assert();
let err =
super::handle_response_by_chunks_with_size_limit::<serde_json::Value>(response, 5)
.await
.unwrap_err();
assert_eq!(
err.to_string(),
"response received from connector is too large: Size exceeds the limit 5"
);
}
#[tokio::test]
async fn test_success() {
let json = serde_json::json!(
[
{"name": "Alice"},
{"name": "Bob"},
{"name": "Charlie"}
]
);
let mut server = mockito::Server::new_async().await;
let test_api = server
.mock("GET", "/test")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(serde_json::to_vec(&json).unwrap())
.create();
let response = reqwest::get(server.url() + "/test").await.unwrap();
test_api.assert();
let res = super::handle_response_with_size_limit::<serde_json::Value>(response, 100)
.await
.unwrap();
assert_eq!(json, res);
}
#[tokio::test]
async fn test_success_by_chunks() {
let json = serde_json::json!(
[
{"name": "Alice"},
{"name": "Bob"},
{"name": "Charlie"}
]
);
let mut server = mockito::Server::new_async().await;
let test_api = server
.mock("GET", "/test")
.with_status(200)
.with_header("content-type", "application/json")
.with_body(serde_json::to_vec(&json).unwrap())
.create();
let response = reqwest::get(server.url() + "/test").await.unwrap();
test_api.assert();
let res =
super::handle_response_by_chunks_with_size_limit::<serde_json::Value>(response, 100)
.await
.unwrap();
assert_eq!(json, res);
}
}

View File

@ -0,0 +1,112 @@
use std::collections::BTreeMap;
use serde::Serialize;
use serde_json;
use ndc_models as ndc_models_v02;
use ndc_models_v01;
use super::migration;
#[derive(Serialize, Debug, Clone, PartialEq)]
#[serde(tag = "version")]
pub enum NdcQueryRequest {
#[serde(rename = "v0.1.x")]
V01(ndc_models_v01::QueryRequest),
#[serde(rename = "v0.2.x")]
V02(ndc_models_v02::QueryRequest),
}
impl NdcQueryRequest {
pub fn set_variables(&mut self, variables: Option<Vec<BTreeMap<String, serde_json::Value>>>) {
match self {
NdcQueryRequest::V01(request) => request.variables = variables,
NdcQueryRequest::V02(request) => request.variables = variables,
}
}
}
#[derive(Serialize, Debug, Clone, PartialEq)]
#[serde(tag = "version")]
pub enum NdcQueryResponse {
#[serde(rename = "v0.1.x")]
V01(ndc_models_v01::QueryResponse),
#[serde(rename = "v0.2.x")]
V02(ndc_models_v02::QueryResponse),
}
impl NdcQueryResponse {
pub fn as_latest_rowsets(self) -> Vec<ndc_models_v02::RowSet> {
match self {
NdcQueryResponse::V01(response) => response
.0
.into_iter()
.map(migration::v01::upgrade_rowset_to_v02)
.collect(),
NdcQueryResponse::V02(response) => response.0,
}
}
}
#[derive(Serialize, Debug, Clone, PartialEq)]
#[serde(tag = "version")]
pub enum NdcExplainResponse {
#[serde(rename = "v0.1.x")]
V01(ndc_models_v01::ExplainResponse),
#[serde(rename = "v0.2.x")]
V02(ndc_models_v02::ExplainResponse),
}
#[derive(Serialize, Debug, Clone, PartialEq)]
#[serde(tag = "version")]
pub enum NdcMutationRequest {
#[serde(rename = "v0.1.x")]
V01(ndc_models_v01::MutationRequest),
#[serde(rename = "v0.2.x")]
V02(ndc_models_v02::MutationRequest),
}
#[derive(Serialize, Debug, Clone, PartialEq)]
#[serde(tag = "version")]
pub enum NdcMutationResponse {
#[serde(rename = "v0.1.x")]
V01(ndc_models_v01::MutationResponse),
#[serde(rename = "v0.2.x")]
V02(ndc_models_v02::MutationResponse),
}
impl NdcMutationResponse {
pub fn as_latest(self) -> ndc_models_v02::MutationResponse {
match self {
NdcMutationResponse::V01(response) => {
migration::v01::upgrade_mutation_response_to_v02(response)
}
NdcMutationResponse::V02(response) => response,
}
}
}
#[derive(Serialize, Debug, Clone, PartialEq)]
#[serde(tag = "version")]
pub enum NdcErrorResponse {
#[serde(rename = "v0.1.x")]
V01(ndc_models_v01::ErrorResponse),
#[serde(rename = "v0.2.x")]
V02(ndc_models_v02::ErrorResponse),
}
impl NdcErrorResponse {
pub fn message(&self) -> &str {
match self {
NdcErrorResponse::V01(err) => err.message.as_str(),
NdcErrorResponse::V02(err) => err.message.as_str(),
}
}
pub fn details(&self) -> &serde_json::Value {
match self {
NdcErrorResponse::V01(err) => &err.details,
NdcErrorResponse::V02(err) => &err.details,
}
}
}

View File

@ -1,6 +1,7 @@
mod commands;
pub(crate) mod error;
mod model_selection;
mod ndc_request;
mod relationships;
pub(crate) mod selection_set;
@ -103,7 +104,7 @@ pub enum ApolloFederationSelect<'n, 's, 'ir> {
#[derive(Debug)]
pub struct NDCMutationExecution<'n, 's, 'ir> {
pub query: ndc_models::MutationRequest,
pub query: ndc::NdcMutationRequest,
pub join_locations: JoinLocations<(RemoteJoin<'s, 'ir>, JoinId)>,
pub data_connector: &'s metadata_resolve::DataConnectorLink,
pub execution_span_attribute: String,
@ -120,7 +121,7 @@ pub struct ExecutionTree<'s, 'ir> {
#[derive(Debug)]
pub struct ExecutionNode<'s> {
pub query: ndc_models::QueryRequest,
pub query: ndc::NdcQueryRequest,
pub data_connector: &'s metadata_resolve::DataConnectorLink,
}
@ -200,7 +201,7 @@ fn plan_mutation<'n, 's, 'ir>(
) -> Result<NDCMutationExecution<'n, 's, 'ir>, error::Error> {
let mut join_id_counter = MonotonicCounter::new();
let (ndc_ir, join_locations) =
commands::ndc_mutation_ir(ir.procedure_name, ir, &mut join_id_counter)?;
ndc_request::ndc_procedure_mutation_request(ir.procedure_name, ir, &mut join_id_counter)?;
let join_locations_ids = assign_with_join_ids(join_locations)?;
Ok(NDCMutationExecution {
query: ndc_ir,
@ -295,7 +296,8 @@ fn plan_query<'n, 's, 'ir>(
None => NodeQueryPlan::RelayNodeSelect(None),
},
root_field::QueryRootField::FunctionBasedCommand { ir, selection_set } => {
let (ndc_ir, join_locations) = commands::ndc_query_ir(ir, &mut counter)?;
let (ndc_ir, join_locations) =
ndc_request::make_ndc_function_query_request(ir, &mut counter)?;
let join_locations_ids = assign_with_join_ids(join_locations)?;
let execution_tree = ExecutionTree {
root_node: ExecutionNode {
@ -357,7 +359,7 @@ fn generate_execution_tree<'s, 'ir>(
ir: &'ir ModelSelection<'s>,
) -> Result<ExecutionTree<'s, 'ir>, error::Error> {
let mut counter = MonotonicCounter::new();
let (ndc_ir, join_locations) = model_selection::ndc_ir(ir, &mut counter)?;
let (ndc_ir, join_locations) = ndc_request::make_ndc_model_query_request(ir, &mut counter)?;
let join_locations_with_ids = assign_with_join_ids(join_locations)?;
Ok(ExecutionTree {
root_node: ExecutionNode {
@ -890,7 +892,7 @@ async fn resolve_ndc_query_execution(
process_response_as,
} = ndc_query;
let mut response = ndc::execute_ndc_query(
let response = ndc::execute_ndc_query(
http_context,
&execution_tree.root_node.query,
execution_tree.root_node.data_connector,
@ -900,19 +902,21 @@ async fn resolve_ndc_query_execution(
)
.await?;
let mut response_rowsets = response.as_latest_rowsets();
// TODO: Failures in remote joins should result in partial response
// https://github.com/hasura/v3-engine/issues/229
execute_join_locations(
http_context,
execution_span_attribute,
&mut response,
&mut response_rowsets,
process_response_as,
&execution_tree.remote_executions,
project_id,
)
.await?;
process_response(selection_set, response, process_response_as)
process_response(selection_set, response_rowsets, process_response_as)
}
async fn resolve_ndc_mutation_execution(
@ -939,7 +943,8 @@ async fn resolve_ndc_mutation_execution(
field_span_attribute,
project_id,
)
.await?;
.await?
.as_latest();
process_mutation_response(selection_set, response, &process_response_as)
}

View File

@ -2,6 +2,8 @@ use open_dds::{relationships::RelationshipName, types::FieldName};
use thiserror::Error;
use tracing_util::TraceableError;
use crate::ndc;
#[derive(Error, Debug)]
pub enum Error {
#[error("internal: {0}")]
@ -29,4 +31,7 @@ pub enum InternalError {
#[error("generic error: {description}")]
InternalGeneric { description: String },
#[error("error when downgrading ndc request: {0}")]
NdcRequestDowngradeError(ndc::migration::NdcDowngradeError),
}

View File

@ -4,11 +4,11 @@ use std::collections::BTreeMap;
use indexmap::IndexMap;
use super::error;
use super::relationships;
use super::selection_set;
use crate::ir::aggregates::{AggregateFieldSelection, AggregateSelectionSet};
use crate::ir::model_selection::ModelSelection;
use crate::plan::error;
use crate::remote_joins::types::{JoinLocations, MonotonicCounter, RemoteJoin};
/// Create an NDC `Query` based on the internal IR `ModelSelection` settings

View File

@ -0,0 +1,79 @@
use metadata_resolve::data_connectors::NdcVersion;
use open_dds::commands::ProcedureName;
use crate::ir::commands::FunctionBasedCommand;
use crate::ir::commands::ProcedureBasedCommand;
use crate::ir::model_selection::ModelSelection;
use crate::ndc;
use crate::remote_joins::types::{JoinLocations, MonotonicCounter, RemoteJoin};
use super::commands;
use super::error;
use super::model_selection;
pub(crate) fn make_ndc_model_query_request<'s, 'ir>(
ir: &'ir ModelSelection<'s>,
join_id_counter: &mut MonotonicCounter,
) -> Result<(ndc::NdcQueryRequest, JoinLocations<RemoteJoin<'s, 'ir>>), error::Error> {
match ir.data_connector.capabilities.supported_ndc_version {
NdcVersion::V01 => {
let (request, join_locations) = model_selection::ndc_ir(ir, join_id_counter)?;
let v01_request = ndc::migration::v01::downgrade_v02_query_request(request)
.map_err(error::InternalError::NdcRequestDowngradeError)?;
Ok((ndc::NdcQueryRequest::V01(v01_request), join_locations))
}
NdcVersion::V02 => {
let (request, join_locations) = model_selection::ndc_ir(ir, join_id_counter)?;
Ok((ndc::NdcQueryRequest::V02(request), join_locations))
}
}
}
pub(crate) fn make_ndc_function_query_request<'s, 'ir>(
ir: &'ir FunctionBasedCommand<'s>,
join_id_counter: &mut MonotonicCounter,
) -> Result<(ndc::NdcQueryRequest, JoinLocations<RemoteJoin<'s, 'ir>>), error::Error> {
match ir
.command_info
.data_connector
.capabilities
.supported_ndc_version
{
NdcVersion::V01 => {
let (request, join_locations) = commands::ndc_query_ir(ir, join_id_counter)?;
let v01_request = ndc::migration::v01::downgrade_v02_query_request(request)
.map_err(error::InternalError::NdcRequestDowngradeError)?;
Ok((ndc::NdcQueryRequest::V01(v01_request), join_locations))
}
NdcVersion::V02 => {
let (request, join_locations) = commands::ndc_query_ir(ir, join_id_counter)?;
Ok((ndc::NdcQueryRequest::V02(request), join_locations))
}
}
}
pub(crate) fn ndc_procedure_mutation_request<'s, 'ir>(
procedure_name: &ProcedureName,
ir: &'ir ProcedureBasedCommand<'s>,
join_id_counter: &mut MonotonicCounter,
) -> Result<(ndc::NdcMutationRequest, JoinLocations<RemoteJoin<'s, 'ir>>), error::Error> {
match ir
.command_info
.data_connector
.capabilities
.supported_ndc_version
{
NdcVersion::V01 => {
let (request, join_locations) =
commands::ndc_mutation_ir(procedure_name, ir, join_id_counter)?;
let v01_request = ndc::migration::v01::downgrade_v02_mutation_request(request)
.map_err(error::InternalError::NdcRequestDowngradeError)?;
Ok((ndc::NdcMutationRequest::V01(v01_request), join_locations))
}
NdcVersion::V02 => {
let (request, join_locations) =
commands::ndc_mutation_ir(procedure_name, ir, join_id_counter)?;
Ok((ndc::NdcMutationRequest::V02(request), join_locations))
}
}
}

View File

@ -3,11 +3,11 @@
use open_dds::relationships::RelationshipType;
use std::collections::BTreeMap;
use super::error;
use super::selection_set;
use crate::ir::model_selection::ModelSelection;
use crate::ir::relationship::{self, LocalCommandRelationshipInfo, LocalModelRelationshipInfo};
use crate::ir::selection_set::FieldSelection;
use crate::plan::error;
/// collect relationships recursively from IR components containing relationships,
/// and create NDC relationship definitions which will be added to the `relationships`

View File

@ -1,9 +1,10 @@
use super::commands;
use super::error;
use super::model_selection;
use super::relationships;
use super::ProcessResponseAs;
use crate::ir::selection_set::{FieldSelection, NestedSelection, ResultSelectionSet};
use crate::plan::error;
use crate::plan::ndc_request;
use crate::plan::ProcessResponseAs;
use crate::remote_joins::types::SourceFieldAlias;
use crate::remote_joins::types::{
JoinLocations, JoinNode, LocationKind, MonotonicCounter, RemoteJoinType, TargetField,
@ -173,7 +174,8 @@ pub(crate) fn process_selection_set_ir<'s, 'ir>(
);
}
// Construct the `JoinLocations` tree
let (ndc_ir, sub_join_locations) = model_selection::ndc_ir(ir, join_id_counter)?;
let (ndc_ir, sub_join_locations) =
ndc_request::make_ndc_model_query_request(ir, join_id_counter)?;
let rj_info = RemoteJoin {
target_ndc_ir: ndc_ir,
target_data_connector: ir.data_connector,
@ -211,7 +213,8 @@ pub(crate) fn process_selection_set_ir<'s, 'ir>(
);
}
// Construct the `JoinLocations` tree
let (ndc_ir, sub_join_locations) = commands::ndc_query_ir(ir, join_id_counter)?;
let (ndc_ir, sub_join_locations) =
ndc_request::make_ndc_function_query_request(ir, join_id_counter)?;
let rj_info = RemoteJoin {
target_ndc_ir: ndc_ir,
target_data_connector: ir.command_info.data_connector,

View File

@ -148,7 +148,9 @@ where
.collect()
})
.collect();
join_node.target_ndc_ir.variables = Some(foreach_variables);
join_node
.target_ndc_ir
.set_variables(Some(foreach_variables));
// execute the remote query
let mut target_response = tracer
@ -167,7 +169,8 @@ where
))
},
)
.await?;
.await?
.as_latest_rowsets();
// if the sub-tree is not empty, recursively process the sub-tree; which
// will modify the `target_response` with all joins down the tree

View File

@ -7,6 +7,7 @@ use json_ext::ValueExt;
use open_dds::arguments::ArgumentName;
use open_dds::types::FieldName;
use crate::ndc;
use crate::plan::ProcessResponseAs;
/// This tree structure captures all the locations (in the selection set IR) where
@ -108,7 +109,7 @@ pub struct RemoteJoin<'s, 'ir> {
/// target data connector to execute query on
pub target_data_connector: &'s metadata_resolve::DataConnectorLink,
/// NDC IR to execute on a data connector
pub target_ndc_ir: ndc_models::QueryRequest,
pub target_ndc_ir: ndc::NdcQueryRequest,
/// Mapping of the fields in source to fields in target.
/// The HashMap has the following info -
/// - key: is the field name in the source

View File

@ -145,6 +145,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,

View File

@ -114,6 +114,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,

View File

@ -100,6 +100,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,

View File

@ -38,6 +38,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,
@ -162,6 +163,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,
@ -286,6 +288,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,
@ -511,6 +514,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,
@ -536,6 +540,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,
@ -568,6 +573,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,
@ -600,6 +606,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,
@ -655,6 +662,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,
@ -711,6 +719,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,
@ -809,6 +818,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,
@ -865,6 +875,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,
@ -963,6 +974,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,
@ -1019,6 +1031,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,

View File

@ -114,6 +114,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,

View File

@ -100,6 +100,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,
@ -300,6 +301,7 @@
"hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\""
},
"capabilities": {
"supported_ndc_version": "V01",
"supports_explaining_queries": true,
"supports_explaining_mutations": false,
"supports_nested_object_filtering": true,

View File

@ -6,7 +6,7 @@ mod types;
use std::collections::BTreeMap;
pub use types::{
ArgumentPreset, CommandsResponseConfig, DataConnectorCapabilities, DataConnectorContext,
DataConnectorLink, DataConnectorSchema, DataConnectors,
DataConnectorLink, DataConnectorSchema, DataConnectors, NdcVersion,
};
/// Resolve data connectors.

View File

@ -20,7 +20,7 @@ use open_dds::{
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
#[derive(Debug, Eq, PartialEq, Clone, Copy, Serialize, Deserialize)]
pub enum NdcVersion {
V01,
V02,
@ -224,6 +224,7 @@ impl DataConnectorLink {
},
})?;
let capabilities = DataConnectorCapabilities {
supported_ndc_version: context.supported_ndc_version,
supports_explaining_queries: context.capabilities.query.explain.is_some(),
supports_explaining_mutations: context.capabilities.mutation.explain.is_some(),
supports_nested_object_filtering: context
@ -392,6 +393,7 @@ impl CommandsResponseConfig {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
#[allow(clippy::struct_excessive_bools)]
pub struct DataConnectorCapabilities {
pub supported_ndc_version: NdcVersion,
pub supports_explaining_queries: bool,
pub supports_explaining_mutations: bool,
pub supports_nested_object_filtering: bool,

View File

@ -1190,6 +1190,7 @@ Metadata {
argument_presets: [],
response_config: None,
capabilities: DataConnectorCapabilities {
supported_ndc_version: V01,
supports_explaining_queries: true,
supports_explaining_mutations: true,
supports_nested_object_filtering: false,
@ -1834,6 +1835,7 @@ Metadata {
argument_presets: [],
response_config: None,
capabilities: DataConnectorCapabilities {
supported_ndc_version: V01,
supports_explaining_queries: true,
supports_explaining_mutations: true,
supports_nested_object_filtering: false,

View File

@ -772,6 +772,7 @@ Metadata {
argument_presets: [],
response_config: None,
capabilities: DataConnectorCapabilities {
supported_ndc_version: V01,
supports_explaining_queries: true,
supports_explaining_mutations: true,
supports_nested_object_filtering: false,

View File

@ -790,6 +790,7 @@ Metadata {
argument_presets: [],
response_config: None,
capabilities: DataConnectorCapabilities {
supported_ndc_version: V01,
supports_explaining_queries: true,
supports_explaining_mutations: false,
supports_nested_object_filtering: true,

View File

@ -401,6 +401,7 @@ Metadata {
argument_presets: [],
response_config: None,
capabilities: DataConnectorCapabilities {
supported_ndc_version: V01,
supports_explaining_queries: true,
supports_explaining_mutations: false,
supports_nested_object_filtering: true,

View File

@ -334,6 +334,7 @@ Metadata {
argument_presets: [],
response_config: None,
capabilities: DataConnectorCapabilities {
supported_ndc_version: V01,
supports_explaining_queries: true,
supports_explaining_mutations: false,
supports_nested_object_filtering: true,

View File

@ -23,6 +23,9 @@ use thiserror::Error;
#[derive(Debug, Error)]
pub enum ExecutionPlanError {
#[error("{0}")]
NDCDowngradeError(#[from] execute::ndc::migration::NdcDowngradeError),
#[error("{0}")]
NDCExecutionError(#[from] execute::ndc::client::Error),
@ -224,7 +227,17 @@ pub async fn fetch_from_data_connector(
data_connector: Arc<metadata_resolve::DataConnectorLink>,
) -> Result<RecordBatch, ExecutionPlanError> {
let tracer = tracing_util::global_tracer();
let mut ndc_response =
let query_request = match data_connector.capabilities.supported_ndc_version {
metadata_resolve::data_connectors::NdcVersion::V01 => execute::ndc::NdcQueryRequest::V01(
execute::ndc::migration::v01::downgrade_v02_query_request(
query_request.as_ref().clone(),
)?,
),
metadata_resolve::data_connectors::NdcVersion::V02 => {
execute::ndc::NdcQueryRequest::V02(query_request.as_ref().clone())
}
};
let ndc_response =
execute::fetch_from_data_connector(&http_context, &query_request, &data_connector, None)
.await?;
let batch = tracer.in_span(
@ -233,7 +246,7 @@ pub async fn fetch_from_data_connector(
SpanVisibility::Internal,
|| {
let rows = ndc_response
.0
.as_latest_rowsets()
.pop()
.ok_or_else(|| {
ExecutionPlanError::NDCResponseFormat("no row sets found".to_string())