From e3808768239ad576f992e6a91984308079bbf9ee Mon Sep 17 00:00:00 2001 From: Daniel Chambers Date: Wed, 3 Jul 2024 18:31:42 +1000 Subject: [PATCH] Handle multiple ndc-model versions during query planning and execution (#790) The PR adds code that handles multiple versions of the ndc-models during the execution pipeline. Depending on whether a connector supports `v01` or `v02` models, a different set of types is used to send and receive the http requests. However, the engine internally still uses the latest (v02) models inside its IR. Unfortunately, it was going to be quite traumatic to prevent the engine from using ndc models inside the IR and during response processing and remote joins. This means that the engine generates v02 requests, and these are downgraded into v01 requests. v01 responses are upgraded to v02 responses before being processed by the engine. The ndc client (`execute::ndc::client`) now only takes new wrapper enum types (`execute::ndc::types`) that split between v01 or v02 requests/responses. Every place that carries an ndc request now carries this type instead, which allows it to carry either a v01 or a v02 request. When ndc requests are created during planning, all creation goes via the new `execute::plan::ndc_request` module. This inspects the connector's supported version, creates the necessary request, and if needed, downgrades it to v01. When ndc responses are read during planning or during remote joins, they are upgraded to v02 via helper functions defined on the types in `execute::ndc::types`. The upgrade/downgrade code is located in `execute::ndc::migration`. Keep in mind the "v02" types are currently the same as the "v01" types so the migration code is not doing much. This will change as the v02 types are modified. However, this approach has its drawbacks. One is that it prevents changes to the ndc types [like this](https://github.com/hasura/ndc-spec/pull/158) without a fair bit of pain (see [comment](https://github.com/hasura/ndc-spec/pull/158#issuecomment-2202127094)). Another is that the downgrade code can fail at runtime and it is not immediately obvious to developers using new, but unused, v02 features that their new feature would fail on v01, because that mapping to v01 has already been written. Another is that we're paying some (small, probably?) performance cost by upgrading/downgrading types because we need to rebuild data structures. Also: * `execute::ndc::response` has been merged into `execute::ndc::client`, since it was inextricably linked. V3_GIT_ORIGIN_REV_ID: f3f36736b52058323d789c378fed06af566f39a3 --- v3/Cargo.lock | 1 + .../expected.json | 2 + .../expected.json | 6 + .../expected.json | 6 + .../expected.json | 8 + .../expected.json | 4 + .../multi_root_field_queries/expected.json | 4 + v3/crates/execute/Cargo.toml | 1 + v3/crates/execute/src/error.rs | 6 +- v3/crates/execute/src/explain.rs | 2 +- v3/crates/execute/src/explain/types.rs | 30 +- v3/crates/execute/src/ndc.rs | 29 +- v3/crates/execute/src/ndc/client.rs | 320 +++++++++++-- v3/crates/execute/src/ndc/migration/mod.rs | 6 + v3/crates/execute/src/ndc/migration/v01.rs | 440 ++++++++++++++++++ v3/crates/execute/src/ndc/response.rs | 139 ------ v3/crates/execute/src/ndc/types.rs | 112 +++++ v3/crates/execute/src/plan.rs | 23 +- v3/crates/execute/src/plan/error.rs | 5 + v3/crates/execute/src/plan/model_selection.rs | 2 +- v3/crates/execute/src/plan/ndc_request.rs | 79 ++++ v3/crates/execute/src/plan/relationships.rs | 2 +- v3/crates/execute/src/plan/selection_set.rs | 11 +- v3/crates/execute/src/remote_joins.rs | 7 +- v3/crates/execute/src/remote_joins/types.rs | 3 +- .../generate_ir/field_argument/expected.json | 1 + .../tests/generate_ir/get_by_id/expected.json | 1 + .../tests/generate_ir/get_many/expected.json | 1 + .../get_many_model_count/expected.json | 13 + .../generate_ir/get_many_user_2/expected.json | 1 + .../generate_ir/get_many_where/expected.json | 2 + .../src/stages/data_connectors/mod.rs | 2 +- .../src/stages/data_connectors/types.rs | 4 +- .../relationship/resolved.snap | 2 + .../root_field/resolved.snap | 1 + .../nested_object/resolved.snap | 1 + .../range/resolved.snap | 1 + .../resolved.snap | 1 + v3/crates/sql/src/plan.rs | 17 +- 39 files changed, 1070 insertions(+), 226 deletions(-) create mode 100644 v3/crates/execute/src/ndc/migration/mod.rs create mode 100644 v3/crates/execute/src/ndc/migration/v01.rs delete mode 100644 v3/crates/execute/src/ndc/response.rs create mode 100644 v3/crates/execute/src/ndc/types.rs create mode 100644 v3/crates/execute/src/plan/ndc_request.rs diff --git a/v3/Cargo.lock b/v3/Cargo.lock index 3f9e1e4b0fd..a633a365652 100644 --- a/v3/Cargo.lock +++ b/v3/Cargo.lock @@ -1800,6 +1800,7 @@ dependencies = [ "lang-graphql", "metadata-resolve", "mockito", + "ndc-models 0.1.4 (git+https://github.com/hasura/ndc-spec.git?tag=v0.1.4)", "ndc-models 0.1.4 (git+https://github.com/hasura/ndc-spec.git?rev=002923d5c337930b9c9007f05c768fe9a0575612)", "nonempty", "open-dds", diff --git a/v3/crates/engine/tests/explain/field_with_local_relationship/expected.json b/v3/crates/engine/tests/explain/field_with_local_relationship/expected.json index 038427df81a..0f8f4d92b27 100644 --- a/v3/crates/engine/tests/explain/field_with_local_relationship/expected.json +++ b/v3/crates/engine/tests/explain/field_with_local_relationship/expected.json @@ -6,6 +6,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "author", "query": { "fields": { @@ -51,6 +52,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } diff --git a/v3/crates/engine/tests/explain/field_with_multi_remote_relationship_subfields/expected.json b/v3/crates/engine/tests/explain/field_with_multi_remote_relationship_subfields/expected.json index 93036580f43..4f17f903c36 100644 --- a/v3/crates/engine/tests/explain/field_with_multi_remote_relationship_subfields/expected.json +++ b/v3/crates/engine/tests/explain/field_with_multi_remote_relationship_subfields/expected.json @@ -9,6 +9,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "Album", "query": { "fields": { @@ -31,6 +32,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } @@ -50,6 +52,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "Artist", "query": { "fields": { @@ -86,6 +89,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } @@ -103,6 +107,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "Track", "query": { "fields": { @@ -134,6 +139,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } diff --git a/v3/crates/engine/tests/explain/field_with_nested_remote_relationship_1/expected.json b/v3/crates/engine/tests/explain/field_with_nested_remote_relationship_1/expected.json index 749b768f804..35c73baf60e 100644 --- a/v3/crates/engine/tests/explain/field_with_nested_remote_relationship_1/expected.json +++ b/v3/crates/engine/tests/explain/field_with_nested_remote_relationship_1/expected.json @@ -9,6 +9,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "Album", "query": { "fields": { @@ -59,6 +60,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } @@ -78,6 +80,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "Artist", "query": { "fields": { @@ -109,6 +112,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } @@ -129,6 +133,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "Album", "query": { "fields": { @@ -160,6 +165,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } diff --git a/v3/crates/engine/tests/explain/field_with_nested_remote_relationship_2/expected.json b/v3/crates/engine/tests/explain/field_with_nested_remote_relationship_2/expected.json index 0c6e3c86b2d..f0293b93dd6 100644 --- a/v3/crates/engine/tests/explain/field_with_nested_remote_relationship_2/expected.json +++ b/v3/crates/engine/tests/explain/field_with_nested_remote_relationship_2/expected.json @@ -9,6 +9,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "Album", "query": { "fields": { @@ -31,6 +32,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } @@ -53,6 +55,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "Track", "query": { "fields": { @@ -89,6 +92,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } @@ -106,6 +110,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "Album", "query": { "fields": { @@ -137,6 +142,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } @@ -159,6 +165,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "Artist", "query": { "fields": { @@ -190,6 +197,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } diff --git a/v3/crates/engine/tests/explain/field_with_remote_relationship/expected.json b/v3/crates/engine/tests/explain/field_with_remote_relationship/expected.json index 7afd6b41eb4..6d6a73ed7ec 100644 --- a/v3/crates/engine/tests/explain/field_with_remote_relationship/expected.json +++ b/v3/crates/engine/tests/explain/field_with_remote_relationship/expected.json @@ -9,6 +9,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "author", "query": { "fields": { @@ -31,6 +32,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } @@ -47,6 +49,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "article", "query": { "fields": { @@ -78,6 +81,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } diff --git a/v3/crates/engine/tests/explain/multi_root_field_queries/expected.json b/v3/crates/engine/tests/explain/multi_root_field_queries/expected.json index fd707581656..592cd5f0eb0 100644 --- a/v3/crates/engine/tests/explain/multi_root_field_queries/expected.json +++ b/v3/crates/engine/tests/explain/multi_root_field_queries/expected.json @@ -9,6 +9,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "get_actor_by_id", "query": { "fields": { @@ -49,6 +50,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "author", "query": { "fields": { @@ -83,6 +85,7 @@ "ndcExplain": { "type": "response", "value": { + "version": "v0.1.x", "details": { "explain": "" } @@ -97,6 +100,7 @@ "ndcRequest": { "type": "query", "value": { + "version": "v0.1.x", "collection": "movies", "query": { "fields": { diff --git a/v3/crates/execute/Cargo.toml b/v3/crates/execute/Cargo.toml index dd9f93200cd..8b25639095b 100644 --- a/v3/crates/execute/Cargo.toml +++ b/v3/crates/execute/Cargo.toml @@ -30,6 +30,7 @@ derive_more = { workspace = true } futures-util = { workspace = true } indexmap = { workspace = true, features = ["serde"] } ndc-models = { workspace = true } +ndc-models-v01 = { workspace = true } nonempty = { workspace = true } reqwest = { workspace = true, features = ["json", "multipart"] } schemars = { workspace = true, features = ["smol_str"] } diff --git a/v3/crates/execute/src/error.rs b/v3/crates/execute/src/error.rs index fc989b06427..aa8ed5a8daf 100644 --- a/v3/crates/execute/src/error.rs +++ b/v3/crates/execute/src/error.rs @@ -76,7 +76,7 @@ impl TraceableError for RequestError { #[transitive(from(gql::normalized_ast::Error, FieldInternalError))] #[transitive(from(gql::introspection::Error, FieldInternalError))] pub enum FieldError { - #[error("error from data source: {}", connector_error.error_response.message)] + #[error("error from data source: {}", connector_error.error_response.message())] NDCExpected { connector_error: ndc_client::ConnectorError, }, @@ -92,7 +92,7 @@ impl FieldError { fn get_details(&self) -> Option { match self { Self::NDCExpected { connector_error } => { - Some(connector_error.error_response.details.clone()) + Some(connector_error.error_response.details().clone()) } Self::InternalError(internal) => internal.get_details(), Self::FieldNotFoundInService { .. } => None, @@ -192,7 +192,7 @@ impl NDCUnexpectedError { pub fn get_details(&self) -> Option { match self { Self::NDCClientError(ndc_client::Error::Connector(ce)) => { - Some(ce.error_response.details.clone()) + Some(ce.error_response.details().clone()) } _ => None, } diff --git a/v3/crates/execute/src/explain.rs b/v3/crates/execute/src/explain.rs index 116465726c4..de64444edc8 100644 --- a/v3/crates/execute/src/explain.rs +++ b/v3/crates/execute/src/explain.rs @@ -228,7 +228,7 @@ async fn get_join_steps( let mut sequence_steps = vec![]; if let JoinNode::Remote((remote_join, _join_id)) = location.join_node { let mut query_request = remote_join.target_ndc_ir; - query_request.variables = Some(vec![]); + query_request.set_variables(Some(vec![])); let ndc_request = types::NDCRequest::Query(query_request); let data_connector_explain = fetch_explain_from_data_connector( expose_internal_errors, diff --git a/v3/crates/execute/src/explain/types.rs b/v3/crates/execute/src/explain/types.rs index 520f01b319e..a6787587d44 100644 --- a/v3/crates/execute/src/explain/types.rs +++ b/v3/crates/execute/src/explain/types.rs @@ -1,13 +1,15 @@ use std::collections::BTreeMap; -use ndc_models; use nonempty::NonEmpty; use serde::Serialize; use lang_graphql::http::GraphQLError; +use ndc_models as ndc_models_v02; +use ndc_models_v01; use tracing_util::Traceable; use crate::error; +use crate::ndc; use crate::GraphQLErrors; #[derive(Debug, Serialize)] @@ -94,7 +96,7 @@ pub(crate) enum ForEachStep { #[serde(tag = "type", content = "value")] pub(crate) enum NDCExplainResponse { NotSupported, - Response(ndc_models::ExplainResponse), + Response(ndc::NdcExplainResponse), Error(GraphQLError), } @@ -102,8 +104,8 @@ pub(crate) enum NDCExplainResponse { #[serde(rename_all = "camelCase")] #[serde(tag = "type", content = "value")] pub(crate) enum NDCRequest { - Query(ndc_models::QueryRequest), - Mutation(ndc_models::MutationRequest), + Query(ndc::NdcQueryRequest), + Mutation(ndc::NdcMutationRequest), } impl NDCExplainResponse { @@ -113,7 +115,7 @@ impl NDCExplainResponse { ) -> Self { Self::Error(error.to_graphql_error(expose_internal_errors, None)) } - pub(crate) fn success(response: ndc_models::ExplainResponse) -> Self { + pub(crate) fn success(response: ndc::NdcExplainResponse) -> Self { Self::Response(response) } pub(crate) fn not_supported() -> Self { @@ -175,12 +177,22 @@ fn redact_ndc_explain_response(ndc_explain: NDCExplainResponse) -> NDCExplainRes match ndc_explain { NDCExplainResponse::NotSupported => NDCExplainResponse::NotSupported, NDCExplainResponse::Error(error) => NDCExplainResponse::Error(error), - NDCExplainResponse::Response(_response) => { + NDCExplainResponse::Response(response) => { let mut redacted_details = BTreeMap::new(); redacted_details.insert("explain".to_string(), "".to_string()); - NDCExplainResponse::Response(ndc_models::ExplainResponse { - details: redacted_details, - }) + let ndc_response = match response { + ndc::NdcExplainResponse::V01(_) => { + ndc::NdcExplainResponse::V01(ndc_models_v01::ExplainResponse { + details: redacted_details, + }) + } + ndc::NdcExplainResponse::V02(_) => { + ndc::NdcExplainResponse::V02(ndc_models_v02::ExplainResponse { + details: redacted_details, + }) + } + }; + NDCExplainResponse::Response(ndc_response) } } } diff --git a/v3/crates/execute/src/ndc.rs b/v3/crates/execute/src/ndc.rs index daf7364ab43..10d912177c3 100644 --- a/v3/crates/execute/src/ndc.rs +++ b/v3/crates/execute/src/ndc.rs @@ -1,4 +1,7 @@ -pub mod response; +pub mod client; +pub mod migration; +pub mod types; +pub use types::*; use std::borrow::Cow; @@ -7,10 +10,8 @@ use axum::http::HeaderMap; use lang_graphql::ast::common as ast; use tracing_util::{set_attribute_on_active_span, AttributeVisibility, SpanVisibility}; -use super::error; -use super::{HttpContext, ProjectId}; - -pub mod client; +use crate::error; +use crate::{HttpContext, ProjectId}; /// The column name used by NDC query response of functions /// @@ -19,12 +20,12 @@ pub const FUNCTION_IR_VALUE_COLUMN_NAME: &str = "__value"; /// Executes a NDC operation pub async fn execute_ndc_query<'n, 's>( http_context: &HttpContext, - query: &ndc_models::QueryRequest, + query: &NdcQueryRequest, data_connector: &metadata_resolve::DataConnectorLink, execution_span_attribute: &'static str, field_span_attribute: String, project_id: Option<&ProjectId>, -) -> Result, error::FieldError> { +) -> Result { let tracer = tracing_util::global_tracer(); tracer .in_span_async( @@ -49,7 +50,7 @@ pub async fn execute_ndc_query<'n, 's>( let connector_response = fetch_from_data_connector(http_context, query, data_connector, project_id) .await?; - Ok(connector_response.0) + Ok(connector_response) }) }, ) @@ -58,10 +59,10 @@ pub async fn execute_ndc_query<'n, 's>( pub async fn fetch_from_data_connector<'s>( http_context: &HttpContext, - query_request: &ndc_models::QueryRequest, + query_request: &NdcQueryRequest, data_connector: &metadata_resolve::DataConnectorLink, project_id: Option<&ProjectId>, -) -> Result { +) -> Result { let tracer = tracing_util::global_tracer(); tracer .in_span_async( @@ -108,12 +109,12 @@ pub fn append_project_id_to_headers<'a>( /// Executes a NDC mutation pub(crate) async fn execute_ndc_mutation<'n, 's, 'ir>( http_context: &HttpContext, - query: &ndc_models::MutationRequest, + query: &NdcMutationRequest, data_connector: &metadata_resolve::DataConnectorLink, execution_span_attribute: String, field_span_attribute: String, project_id: Option<&ProjectId>, -) -> Result { +) -> Result { let tracer = tracing_util::global_tracer(); tracer .in_span_async( @@ -151,10 +152,10 @@ pub(crate) async fn execute_ndc_mutation<'n, 's, 'ir>( pub(crate) async fn fetch_from_data_connector_mutation<'s>( http_context: &HttpContext, - query_request: &ndc_models::MutationRequest, + query_request: &NdcMutationRequest, data_connector: &metadata_resolve::DataConnectorLink, project_id: Option<&ProjectId>, -) -> Result { +) -> Result { let tracer = tracing_util::global_tracer(); tracer .in_span_async( diff --git a/v3/crates/execute/src/ndc/client.rs b/v3/crates/execute/src/ndc/client.rs index 20aa06de631..e628529af7e 100644 --- a/v3/crates/execute/src/ndc/client.rs +++ b/v3/crates/execute/src/ndc/client.rs @@ -1,12 +1,15 @@ use std::borrow::Cow; use reqwest::header::{HeaderMap, HeaderValue}; -use serde::{de::DeserializeOwned, Deserialize}; +use serde::de::DeserializeOwned; use thiserror::Error; use tracing_util::{SpanVisibility, Successful}; -use crate::ndc::response::handle_response_with_size_limit; +use super::{ + NdcErrorResponse, NdcExplainResponse, NdcMutationRequest, NdcMutationResponse, NdcQueryRequest, + NdcQueryResponse, +}; /// Error type for the NDC API client interactions #[derive(Debug, Error)] @@ -46,10 +49,10 @@ impl tracing_util::TraceableError for Error { } #[derive(Debug, Clone, Error)] -#[error("connector returned status code {status} with message: {}", error_response.message)] +#[error("connector returned status code {status} with message: {}", error_response.message())] pub struct ConnectorError { pub status: reqwest::StatusCode, - pub error_response: ndc_models::ErrorResponse, + pub error_response: NdcErrorResponse, } #[derive(Debug, Clone, Error)] @@ -74,8 +77,8 @@ pub struct Configuration<'s> { /// pub async fn explain_query_post( configuration: Configuration<'_>, - query_request: &ndc_models::QueryRequest, -) -> Result { + query_request: &NdcQueryRequest, +) -> Result { let tracer = tracing_util::global_tracer(); tracer .in_span_async( @@ -86,11 +89,35 @@ pub async fn explain_query_post( Box::pin(async { let url = append_path(configuration.base_path, &["query", "explain"])?; let response_size_limit = configuration.response_size_limit; - let request = - construct_request(configuration, reqwest::Method::POST, url, |r| { - r.json(query_request) - }); - execute_request(request, response_size_limit).await + + match query_request { + NdcQueryRequest::V01(req) => { + let request = + construct_request(configuration, reqwest::Method::POST, url, |r| { + r.json(req) + }); + let response = execute_request( + request, + response_size_limit, + NdcErrorResponse::V01, + ) + .await?; + Ok(NdcExplainResponse::V01(response)) + } + NdcQueryRequest::V02(req) => { + let request = + construct_request(configuration, reqwest::Method::POST, url, |r| { + r.json(req) + }); + let response = execute_request( + request, + response_size_limit, + NdcErrorResponse::V02, + ) + .await?; + Ok(NdcExplainResponse::V02(response)) + } + } }) }, ) @@ -102,8 +129,8 @@ pub async fn explain_query_post( /// pub async fn explain_mutation_post( configuration: Configuration<'_>, - mutation_request: &ndc_models::MutationRequest, -) -> Result { + mutation_request: &NdcMutationRequest, +) -> Result { let tracer = tracing_util::global_tracer(); tracer .in_span_async( @@ -114,11 +141,35 @@ pub async fn explain_mutation_post( Box::pin(async { let url = append_path(configuration.base_path, &["mutation", "explain"])?; let response_size_limit = configuration.response_size_limit; - let request = - construct_request(configuration, reqwest::Method::POST, url, |r| { - r.json(mutation_request) - }); - execute_request(request, response_size_limit).await + + match mutation_request { + NdcMutationRequest::V01(req) => { + let request = + construct_request(configuration, reqwest::Method::POST, url, |r| { + r.json(req) + }); + let response = execute_request( + request, + response_size_limit, + NdcErrorResponse::V01, + ) + .await?; + Ok(NdcExplainResponse::V01(response)) + } + NdcMutationRequest::V02(req) => { + let request = + construct_request(configuration, reqwest::Method::POST, url, |r| { + r.json(req) + }); + let response = execute_request( + request, + response_size_limit, + NdcErrorResponse::V02, + ) + .await?; + Ok(NdcExplainResponse::V02(response)) + } + } }) }, ) @@ -130,8 +181,8 @@ pub async fn explain_mutation_post( /// pub async fn mutation_post( configuration: Configuration<'_>, - mutation_request: &ndc_models::MutationRequest, -) -> Result { + mutation_request: &NdcMutationRequest, +) -> Result { let tracer = tracing_util::global_tracer(); tracer .in_span_async( @@ -142,11 +193,35 @@ pub async fn mutation_post( Box::pin(async { let url = append_path(configuration.base_path, &["mutation"])?; let response_size_limit = configuration.response_size_limit; - let request = - construct_request(configuration, reqwest::Method::POST, url, |r| { - r.json(mutation_request) - }); - execute_request(request, response_size_limit).await + + match mutation_request { + NdcMutationRequest::V01(req) => { + let request = + construct_request(configuration, reqwest::Method::POST, url, |r| { + r.json(req) + }); + let response = execute_request( + request, + response_size_limit, + NdcErrorResponse::V01, + ) + .await?; + Ok(NdcMutationResponse::V01(response)) + } + NdcMutationRequest::V02(req) => { + let request = + construct_request(configuration, reqwest::Method::POST, url, |r| { + r.json(req) + }); + let response = execute_request( + request, + response_size_limit, + NdcErrorResponse::V02, + ) + .await?; + Ok(NdcMutationResponse::V02(response)) + } + } }) }, ) @@ -158,8 +233,8 @@ pub async fn mutation_post( /// pub async fn query_post( configuration: Configuration<'_>, - query_request: &ndc_models::QueryRequest, -) -> Result { + query_request: &NdcQueryRequest, +) -> Result { let tracer = tracing_util::global_tracer(); tracer .in_span_async( @@ -170,11 +245,35 @@ pub async fn query_post( Box::pin(async { let url = append_path(configuration.base_path, &["query"])?; let response_size_limit = configuration.response_size_limit; - let request = - construct_request(configuration, reqwest::Method::POST, url, |r| { - r.json(query_request) - }); - execute_request(request, response_size_limit).await + + match query_request { + NdcQueryRequest::V01(req) => { + let request = + construct_request(configuration, reqwest::Method::POST, url, |r| { + r.json(req) + }); + let response = execute_request( + request, + response_size_limit, + NdcErrorResponse::V01, + ) + .await?; + Ok(NdcQueryResponse::V01(response)) + } + NdcQueryRequest::V02(req) => { + let request = + construct_request(configuration, reqwest::Method::POST, url, |r| { + r.json(req) + }); + let response = execute_request( + request, + response_size_limit, + NdcErrorResponse::V02, + ) + .await?; + Ok(NdcQueryResponse::V02(response)) + } + } }) }, ) @@ -219,10 +318,16 @@ fn construct_request( } /// Execute a request and deserialize the JSON response -async fn execute_request( +async fn execute_request( request: reqwest::RequestBuilder, response_size_limit: Option, -) -> Result { + to_error: F, +) -> Result +where + TResponse: DeserializeOwned, + TResponseError: DeserializeOwned, + F: FnOnce(TResponseError) -> NdcErrorResponse + Send, +{ let tracer = tracing_util::global_tracer(); let response = tracer @@ -260,7 +365,7 @@ async fn execute_request( }; Ok(result) } else { - Err(construct_error(response).await) + Err(construct_error(response, to_error).await) } }) }, @@ -269,14 +374,18 @@ async fn execute_request( } /// Build an error from the response status and content -async fn construct_error(response: reqwest::Response) -> Error { +async fn construct_error(response: reqwest::Response, to_error: F) -> Error +where + TResponseError: DeserializeOwned, + F: FnOnce(TResponseError) -> NdcErrorResponse, +{ let status = response.status(); match response.json().await { - Ok(body) => match ndc_models::ErrorResponse::deserialize(&body) { + Ok(body) => match TResponseError::deserialize(&body) { Ok(error_response) => { let connector_error = ConnectorError { status, - error_response, + error_response: to_error(error_response), }; Error::Connector(connector_error) } @@ -294,6 +403,49 @@ async fn construct_error(response: reqwest::Response) -> Error { } } +/// Handle response return from an NDC request by applying the size limit and +/// deserializing into a JSON value +async fn handle_response_with_size_limit serde::Deserialize<'de>>( + response: reqwest::Response, + size_limit: usize, +) -> Result { + if let Some(content_length) = &response.content_length() { + // Check with content length + if *content_length > size_limit as u64 { + Err(Error::ResponseTooLarge(format!( + "Received content length {content_length} exceeds the limit {size_limit}" + ))) + } else { + Ok(response.json().await?) + } + } else { + // If no content length found, then check chunk-by-chunk + handle_response_by_chunks_with_size_limit(response, size_limit).await + } +} + +/// Handle response by chunks. For each chunk consumed, check if the total size exceeds the limit. +/// +/// This logic is separated in a function to allow testing. +async fn handle_response_by_chunks_with_size_limit serde::Deserialize<'de>>( + response: reqwest::Response, + size_limit: usize, +) -> Result { + let mut size = 0; + let mut buf = bytes::BytesMut::new(); + let mut response = response; + while let Some(chunk) = response.chunk().await? { + size += chunk.len(); + if size > size_limit { + return Err(Error::ResponseTooLarge(format!( + "Size exceeds the limit {size_limit}" + ))); + } + buf.extend_from_slice(&chunk); + } + Ok(serde_json::from_slice(&buf)?) +} + #[cfg(test)] mod tests { use super::append_path; @@ -337,4 +489,96 @@ mod tests { let result = append_path(&url, &paths).unwrap(); assert_eq!(result.as_str(), "http://hasura.io/ndc/query/explain"); } + + use pretty_assertions::assert_eq; + + #[tokio::test] + async fn test_content_length() { + let mut server = mockito::Server::new_async().await; + let test_api = server + .mock("GET", "/test") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(r#"{"message": "hello"}"#) + .create(); + let response = reqwest::get(server.url() + "/test").await.unwrap(); + test_api.assert(); + let err = super::handle_response_with_size_limit::(response, 10) + .await + .unwrap_err(); + assert_eq!( + err.to_string(), + "response received from connector is too large: Received content length 20 exceeds the limit 10" + ); + } + + #[tokio::test] + async fn test_chunk_by_chunk() { + let mut server = mockito::Server::new_async().await; + let test_api = server + .mock("GET", "/test") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(r#"{"message": "hello"}"#) + .create(); + let response = reqwest::get(server.url() + "/test").await.unwrap(); + test_api.assert(); + let err = + super::handle_response_by_chunks_with_size_limit::(response, 5) + .await + .unwrap_err(); + assert_eq!( + err.to_string(), + "response received from connector is too large: Size exceeds the limit 5" + ); + } + + #[tokio::test] + async fn test_success() { + let json = serde_json::json!( + [ + {"name": "Alice"}, + {"name": "Bob"}, + {"name": "Charlie"} + ] + ); + let mut server = mockito::Server::new_async().await; + let test_api = server + .mock("GET", "/test") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(serde_json::to_vec(&json).unwrap()) + .create(); + let response = reqwest::get(server.url() + "/test").await.unwrap(); + test_api.assert(); + let res = super::handle_response_with_size_limit::(response, 100) + .await + .unwrap(); + assert_eq!(json, res); + } + + #[tokio::test] + async fn test_success_by_chunks() { + let json = serde_json::json!( + [ + {"name": "Alice"}, + {"name": "Bob"}, + {"name": "Charlie"} + ] + ); + let mut server = mockito::Server::new_async().await; + let test_api = server + .mock("GET", "/test") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(serde_json::to_vec(&json).unwrap()) + .create(); + let response = reqwest::get(server.url() + "/test").await.unwrap(); + test_api.assert(); + let res = + super::handle_response_by_chunks_with_size_limit::(response, 100) + .await + .unwrap(); + assert_eq!(json, res); + } } diff --git a/v3/crates/execute/src/ndc/migration/mod.rs b/v3/crates/execute/src/ndc/migration/mod.rs new file mode 100644 index 00000000000..2ed6c1a9b7b --- /dev/null +++ b/v3/crates/execute/src/ndc/migration/mod.rs @@ -0,0 +1,6 @@ +use thiserror::Error; + +pub mod v01; + +#[derive(Error, Debug)] +pub enum NdcDowngradeError {} diff --git a/v3/crates/execute/src/ndc/migration/v01.rs b/v3/crates/execute/src/ndc/migration/v01.rs new file mode 100644 index 00000000000..197f0c8e098 --- /dev/null +++ b/v3/crates/execute/src/ndc/migration/v01.rs @@ -0,0 +1,440 @@ +use ndc_models; +use ndc_models_v01; + +use super::NdcDowngradeError; + +pub fn upgrade_rowset_to_v02(rowset: ndc_models_v01::RowSet) -> ndc_models::RowSet { + ndc_models::RowSet { + aggregates: rowset.aggregates, + rows: rowset.rows.map(|rows| { + rows.into_iter() + .map(|row| { + row.into_iter() + .map(|(k, ndc_models_v01::RowFieldValue(v))| { + (k, ndc_models::RowFieldValue(v)) + }) + .collect() + }) + .collect() + }), + } +} + +pub fn upgrade_mutation_response_to_v02( + mutation_response: ndc_models_v01::MutationResponse, +) -> ndc_models::MutationResponse { + ndc_models::MutationResponse { + operation_results: mutation_response + .operation_results + .into_iter() + .map(upgrade_mutation_operation_result_to_latest) + .collect(), + } +} + +fn upgrade_mutation_operation_result_to_latest( + mutation_operation_result: ndc_models_v01::MutationOperationResults, +) -> ndc_models::MutationOperationResults { + match mutation_operation_result { + ndc_models_v01::MutationOperationResults::Procedure { result } => { + ndc_models::MutationOperationResults::Procedure { result } + } + } +} + +pub fn downgrade_v02_query_request( + query_request: ndc_models::QueryRequest, +) -> Result { + Ok(ndc_models_v01::QueryRequest { + arguments: query_request + .arguments + .into_iter() + .map(|(name, argument)| (name, downgrade_v02_argument(argument))) + .collect(), + collection: query_request.collection, + query: downgrade_v02_query(query_request.query), + collection_relationships: query_request + .collection_relationships + .into_iter() + .map(|(name, relationship)| (name, downgrade_v02_relationship(relationship))) + .collect(), + variables: query_request.variables, + }) +} + +fn downgrade_v02_argument(argument: ndc_models::Argument) -> ndc_models_v01::Argument { + match argument { + ndc_models::Argument::Variable { name } => ndc_models_v01::Argument::Variable { name }, + ndc_models::Argument::Literal { value } => ndc_models_v01::Argument::Literal { value }, + } +} + +fn downgrade_v02_query(query: ndc_models::Query) -> ndc_models_v01::Query { + ndc_models_v01::Query { + aggregates: query.aggregates.map(|aggregates| { + aggregates + .into_iter() + .map(|(name, aggregate)| (name, downgrade_v02_aggregate(aggregate))) + .collect() + }), + fields: query.fields.map(|fields| { + fields + .into_iter() + .map(|(name, field)| (name, downgrade_v02_field(field))) + .collect() + }), + limit: query.limit, + offset: query.offset, + order_by: query.order_by.map(downgrade_v02_order_by), + predicate: query.predicate.map(downgrade_v02_predicate), + } +} + +fn downgrade_v02_aggregate(aggregate: ndc_models::Aggregate) -> ndc_models_v01::Aggregate { + match aggregate { + ndc_models::Aggregate::ColumnCount { + column, + field_path, + distinct, + } => ndc_models_v01::Aggregate::ColumnCount { + column, + field_path, + distinct, + }, + ndc_models::Aggregate::SingleColumn { + column, + field_path, + function, + } => ndc_models_v01::Aggregate::SingleColumn { + column, + field_path, + function, + }, + ndc_models::Aggregate::StarCount {} => ndc_models_v01::Aggregate::StarCount {}, + } +} + +fn downgrade_v02_field(field: ndc_models::Field) -> ndc_models_v01::Field { + match field { + ndc_models::Field::Column { + column, + fields, + arguments, + } => ndc_models_v01::Field::Column { + column, + fields: fields.map(downgrade_v02_nested_field), + arguments: arguments + .into_iter() + .map(|(name, argument)| (name, downgrade_v02_argument(argument))) + .collect(), + }, + ndc_models::Field::Relationship { + query, + relationship, + arguments, + } => ndc_models_v01::Field::Relationship { + query: Box::new(downgrade_v02_query(*query)), + relationship, + arguments: arguments + .into_iter() + .map(|(name, argument)| (name, downgrade_v02_relationship_argument(argument))) + .collect(), + }, + } +} + +fn downgrade_v02_nested_field( + nested_field: ndc_models::NestedField, +) -> ndc_models_v01::NestedField { + match nested_field { + ndc_models::NestedField::Object(nested_object) => { + ndc_models_v01::NestedField::Object(downgrade_v02_nested_object(nested_object)) + } + ndc_models::NestedField::Array(nested_array) => { + ndc_models_v01::NestedField::Array(downgrade_v02_nested_array(nested_array)) + } + } +} + +fn downgrade_v02_nested_object( + nested_object: ndc_models::NestedObject, +) -> ndc_models_v01::NestedObject { + ndc_models_v01::NestedObject { + fields: nested_object + .fields + .into_iter() + .map(|(name, field)| (name, downgrade_v02_field(field))) + .collect(), + } +} + +fn downgrade_v02_nested_array( + nested_array: ndc_models::NestedArray, +) -> ndc_models_v01::NestedArray { + ndc_models_v01::NestedArray { + fields: Box::new(downgrade_v02_nested_field(*nested_array.fields)), + } +} + +fn downgrade_v02_relationship_argument( + relationship_argument: ndc_models::RelationshipArgument, +) -> ndc_models_v01::RelationshipArgument { + match relationship_argument { + ndc_models::RelationshipArgument::Variable { name } => { + ndc_models_v01::RelationshipArgument::Variable { name } + } + ndc_models::RelationshipArgument::Literal { value } => { + ndc_models_v01::RelationshipArgument::Literal { value } + } + ndc_models::RelationshipArgument::Column { name } => { + ndc_models_v01::RelationshipArgument::Column { name } + } + } +} + +fn downgrade_v02_order_by(order_by: ndc_models::OrderBy) -> ndc_models_v01::OrderBy { + ndc_models_v01::OrderBy { + elements: order_by + .elements + .into_iter() + .map(downgrade_v02_order_by_element) + .collect(), + } +} + +fn downgrade_v02_order_by_element( + order_by_element: ndc_models::OrderByElement, +) -> ndc_models_v01::OrderByElement { + ndc_models_v01::OrderByElement { + order_direction: downgrade_v02_order_direction(order_by_element.order_direction), + target: downgrade_v02_order_by_target(order_by_element.target), + } +} + +fn downgrade_v02_order_direction( + order_direction: ndc_models::OrderDirection, +) -> ndc_models_v01::OrderDirection { + match order_direction { + ndc_models::OrderDirection::Asc => ndc_models_v01::OrderDirection::Asc, + ndc_models::OrderDirection::Desc => ndc_models_v01::OrderDirection::Desc, + } +} + +fn downgrade_v02_order_by_target( + target: ndc_models::OrderByTarget, +) -> ndc_models_v01::OrderByTarget { + match target { + ndc_models::OrderByTarget::Column { + name, + field_path, + path, + } => ndc_models_v01::OrderByTarget::Column { + name, + field_path, + path: path.into_iter().map(downgrade_v02_path_element).collect(), + }, + ndc_models::OrderByTarget::SingleColumnAggregate { + column, + field_path, + function, + path, + } => ndc_models_v01::OrderByTarget::SingleColumnAggregate { + column, + field_path, + function, + path: path.into_iter().map(downgrade_v02_path_element).collect(), + }, + ndc_models::OrderByTarget::StarCountAggregate { path } => { + ndc_models_v01::OrderByTarget::StarCountAggregate { + path: path.into_iter().map(downgrade_v02_path_element).collect(), + } + } + } +} + +fn downgrade_v02_path_element( + path_element: ndc_models::PathElement, +) -> ndc_models_v01::PathElement { + ndc_models_v01::PathElement { + relationship: path_element.relationship, + arguments: path_element + .arguments + .into_iter() + .map(|(name, argument)| (name, downgrade_v02_relationship_argument(argument))) + .collect(), + predicate: path_element + .predicate + .map(|predicate| Box::new(downgrade_v02_predicate(*predicate))), + } +} + +fn downgrade_v02_predicate(predicate: ndc_models::Expression) -> ndc_models_v01::Expression { + match predicate { + ndc_models::Expression::And { expressions } => ndc_models_v01::Expression::And { + expressions: expressions + .into_iter() + .map(downgrade_v02_predicate) + .collect(), + }, + ndc_models::Expression::Or { expressions } => ndc_models_v01::Expression::Or { + expressions: expressions + .into_iter() + .map(downgrade_v02_predicate) + .collect(), + }, + ndc_models::Expression::Not { expression } => ndc_models_v01::Expression::Not { + expression: Box::new(downgrade_v02_predicate(*expression)), + }, + ndc_models::Expression::UnaryComparisonOperator { column, operator } => { + ndc_models_v01::Expression::UnaryComparisonOperator { + column: downgrade_v02_comparison_target(column), + operator: downgrade_v02_unary_comparison_operator(operator), + } + } + ndc_models::Expression::BinaryComparisonOperator { + column, + operator, + value, + } => ndc_models_v01::Expression::BinaryComparisonOperator { + column: downgrade_v02_comparison_target(column), + operator, + value: downgrade_v02_binary_comparison_value(value), + }, + ndc_models::Expression::Exists { + in_collection, + predicate, + } => ndc_models_v01::Expression::Exists { + in_collection: downgrade_v02_exists_in_collection(in_collection), + predicate: predicate.map(|predicate| Box::new(downgrade_v02_predicate(*predicate))), + }, + } +} + +fn downgrade_v02_comparison_target( + column: ndc_models::ComparisonTarget, +) -> ndc_models_v01::ComparisonTarget { + match column { + ndc_models::ComparisonTarget::Column { + name, + field_path, + path, + } => ndc_models_v01::ComparisonTarget::Column { + name, + field_path, + path: path.into_iter().map(downgrade_v02_path_element).collect(), + }, + ndc_models::ComparisonTarget::RootCollectionColumn { name, field_path } => { + ndc_models_v01::ComparisonTarget::RootCollectionColumn { name, field_path } + } + } +} + +fn downgrade_v02_unary_comparison_operator( + operator: ndc_models::UnaryComparisonOperator, +) -> ndc_models_v01::UnaryComparisonOperator { + match operator { + ndc_models::UnaryComparisonOperator::IsNull => { + ndc_models_v01::UnaryComparisonOperator::IsNull + } + } +} + +fn downgrade_v02_binary_comparison_value( + value: ndc_models::ComparisonValue, +) -> ndc_models_v01::ComparisonValue { + match value { + ndc_models::ComparisonValue::Column { column } => ndc_models_v01::ComparisonValue::Column { + column: downgrade_v02_comparison_target(column), + }, + ndc_models::ComparisonValue::Scalar { value } => { + ndc_models_v01::ComparisonValue::Scalar { value } + } + ndc_models::ComparisonValue::Variable { name } => { + ndc_models_v01::ComparisonValue::Variable { name } + } + } +} + +fn downgrade_v02_exists_in_collection( + exists_in_collection: ndc_models::ExistsInCollection, +) -> ndc_models_v01::ExistsInCollection { + match exists_in_collection { + ndc_models::ExistsInCollection::Related { + relationship, + arguments, + } => ndc_models_v01::ExistsInCollection::Related { + relationship, + arguments: arguments + .into_iter() + .map(|(name, argument)| (name, downgrade_v02_relationship_argument(argument))) + .collect(), + }, + ndc_models::ExistsInCollection::Unrelated { + collection, + arguments, + } => ndc_models_v01::ExistsInCollection::Unrelated { + collection, + arguments: arguments + .into_iter() + .map(|(name, argument)| (name, downgrade_v02_relationship_argument(argument))) + .collect(), + }, + } +} + +fn downgrade_v02_relationship( + relationship: ndc_models::Relationship, +) -> ndc_models_v01::Relationship { + ndc_models_v01::Relationship { + column_mapping: relationship.column_mapping, + relationship_type: downgrade_v02_relationship_type(relationship.relationship_type), + target_collection: relationship.target_collection, + arguments: relationship + .arguments + .into_iter() + .map(|(name, argument)| (name, downgrade_v02_relationship_argument(argument))) + .collect(), + } +} + +fn downgrade_v02_relationship_type( + relationship_type: ndc_models::RelationshipType, +) -> ndc_models_v01::RelationshipType { + match relationship_type { + ndc_models::RelationshipType::Object => ndc_models_v01::RelationshipType::Object, + ndc_models::RelationshipType::Array => ndc_models_v01::RelationshipType::Array, + } +} + +pub fn downgrade_v02_mutation_request( + mutation_request: ndc_models::MutationRequest, +) -> Result { + Ok(ndc_models_v01::MutationRequest { + operations: mutation_request + .operations + .into_iter() + .map(downgrade_v02_mutation_operation) + .collect(), + collection_relationships: mutation_request + .collection_relationships + .into_iter() + .map(|(name, relationship)| (name, downgrade_v02_relationship(relationship))) + .collect(), + }) +} + +fn downgrade_v02_mutation_operation( + mutation_operation: ndc_models::MutationOperation, +) -> ndc_models_v01::MutationOperation { + match mutation_operation { + ndc_models::MutationOperation::Procedure { + name, + arguments, + fields, + } => ndc_models_v01::MutationOperation::Procedure { + name, + arguments, + fields: fields.map(downgrade_v02_nested_field), + }, + } +} diff --git a/v3/crates/execute/src/ndc/response.rs b/v3/crates/execute/src/ndc/response.rs deleted file mode 100644 index d40c31d86de..00000000000 --- a/v3/crates/execute/src/ndc/response.rs +++ /dev/null @@ -1,139 +0,0 @@ -use super::client as ndc_client; - -/// Handle response return from an NDC request by applying the size limit and -/// deserializing into a JSON value -pub(crate) async fn handle_response_with_size_limit serde::Deserialize<'de>>( - response: reqwest::Response, - size_limit: usize, -) -> Result { - if let Some(content_length) = &response.content_length() { - // Check with content length - if *content_length > size_limit as u64 { - Err(ndc_client::Error::ResponseTooLarge(format!( - "Received content length {content_length} exceeds the limit {size_limit}" - ))) - } else { - Ok(response.json().await?) - } - } else { - // If no content length found, then check chunk-by-chunk - handle_response_by_chunks_with_size_limit(response, size_limit).await - } -} - -/// Handle response by chunks. For each chunk consumed, check if the total size exceeds the limit. -/// -/// This logic is separated in a function to allow testing. -async fn handle_response_by_chunks_with_size_limit serde::Deserialize<'de>>( - response: reqwest::Response, - size_limit: usize, -) -> Result { - let mut size = 0; - let mut buf = bytes::BytesMut::new(); - let mut response = response; - while let Some(chunk) = response.chunk().await? { - size += chunk.len(); - if size > size_limit { - return Err(ndc_client::Error::ResponseTooLarge(format!( - "Size exceeds the limit {size_limit}" - ))); - } - buf.extend_from_slice(&chunk); - } - Ok(serde_json::from_slice(&buf)?) -} - -#[cfg(test)] -mod test { - use pretty_assertions::assert_eq; - - #[tokio::test] - async fn test_content_length() { - let mut server = mockito::Server::new_async().await; - let test_api = server - .mock("GET", "/test") - .with_status(200) - .with_header("content-type", "application/json") - .with_body(r#"{"message": "hello"}"#) - .create(); - let response = reqwest::get(server.url() + "/test").await.unwrap(); - test_api.assert(); - let err = super::handle_response_with_size_limit::(response, 10) - .await - .unwrap_err(); - assert_eq!( - err.to_string(), - "response received from connector is too large: Received content length 20 exceeds the limit 10" - ); - } - - #[tokio::test] - async fn test_chunk_by_chunk() { - let mut server = mockito::Server::new_async().await; - let test_api = server - .mock("GET", "/test") - .with_status(200) - .with_header("content-type", "application/json") - .with_body(r#"{"message": "hello"}"#) - .create(); - let response = reqwest::get(server.url() + "/test").await.unwrap(); - test_api.assert(); - let err = - super::handle_response_by_chunks_with_size_limit::(response, 5) - .await - .unwrap_err(); - assert_eq!( - err.to_string(), - "response received from connector is too large: Size exceeds the limit 5" - ); - } - - #[tokio::test] - async fn test_success() { - let json = serde_json::json!( - [ - {"name": "Alice"}, - {"name": "Bob"}, - {"name": "Charlie"} - ] - ); - let mut server = mockito::Server::new_async().await; - let test_api = server - .mock("GET", "/test") - .with_status(200) - .with_header("content-type", "application/json") - .with_body(serde_json::to_vec(&json).unwrap()) - .create(); - let response = reqwest::get(server.url() + "/test").await.unwrap(); - test_api.assert(); - let res = super::handle_response_with_size_limit::(response, 100) - .await - .unwrap(); - assert_eq!(json, res); - } - - #[tokio::test] - async fn test_success_by_chunks() { - let json = serde_json::json!( - [ - {"name": "Alice"}, - {"name": "Bob"}, - {"name": "Charlie"} - ] - ); - let mut server = mockito::Server::new_async().await; - let test_api = server - .mock("GET", "/test") - .with_status(200) - .with_header("content-type", "application/json") - .with_body(serde_json::to_vec(&json).unwrap()) - .create(); - let response = reqwest::get(server.url() + "/test").await.unwrap(); - test_api.assert(); - let res = - super::handle_response_by_chunks_with_size_limit::(response, 100) - .await - .unwrap(); - assert_eq!(json, res); - } -} diff --git a/v3/crates/execute/src/ndc/types.rs b/v3/crates/execute/src/ndc/types.rs new file mode 100644 index 00000000000..48b900fead0 --- /dev/null +++ b/v3/crates/execute/src/ndc/types.rs @@ -0,0 +1,112 @@ +use std::collections::BTreeMap; + +use serde::Serialize; +use serde_json; + +use ndc_models as ndc_models_v02; +use ndc_models_v01; + +use super::migration; + +#[derive(Serialize, Debug, Clone, PartialEq)] +#[serde(tag = "version")] +pub enum NdcQueryRequest { + #[serde(rename = "v0.1.x")] + V01(ndc_models_v01::QueryRequest), + #[serde(rename = "v0.2.x")] + V02(ndc_models_v02::QueryRequest), +} + +impl NdcQueryRequest { + pub fn set_variables(&mut self, variables: Option>>) { + match self { + NdcQueryRequest::V01(request) => request.variables = variables, + NdcQueryRequest::V02(request) => request.variables = variables, + } + } +} + +#[derive(Serialize, Debug, Clone, PartialEq)] +#[serde(tag = "version")] +pub enum NdcQueryResponse { + #[serde(rename = "v0.1.x")] + V01(ndc_models_v01::QueryResponse), + #[serde(rename = "v0.2.x")] + V02(ndc_models_v02::QueryResponse), +} + +impl NdcQueryResponse { + pub fn as_latest_rowsets(self) -> Vec { + match self { + NdcQueryResponse::V01(response) => response + .0 + .into_iter() + .map(migration::v01::upgrade_rowset_to_v02) + .collect(), + NdcQueryResponse::V02(response) => response.0, + } + } +} + +#[derive(Serialize, Debug, Clone, PartialEq)] +#[serde(tag = "version")] +pub enum NdcExplainResponse { + #[serde(rename = "v0.1.x")] + V01(ndc_models_v01::ExplainResponse), + #[serde(rename = "v0.2.x")] + V02(ndc_models_v02::ExplainResponse), +} + +#[derive(Serialize, Debug, Clone, PartialEq)] +#[serde(tag = "version")] +pub enum NdcMutationRequest { + #[serde(rename = "v0.1.x")] + V01(ndc_models_v01::MutationRequest), + #[serde(rename = "v0.2.x")] + V02(ndc_models_v02::MutationRequest), +} + +#[derive(Serialize, Debug, Clone, PartialEq)] +#[serde(tag = "version")] +pub enum NdcMutationResponse { + #[serde(rename = "v0.1.x")] + V01(ndc_models_v01::MutationResponse), + #[serde(rename = "v0.2.x")] + V02(ndc_models_v02::MutationResponse), +} + +impl NdcMutationResponse { + pub fn as_latest(self) -> ndc_models_v02::MutationResponse { + match self { + NdcMutationResponse::V01(response) => { + migration::v01::upgrade_mutation_response_to_v02(response) + } + NdcMutationResponse::V02(response) => response, + } + } +} + +#[derive(Serialize, Debug, Clone, PartialEq)] +#[serde(tag = "version")] +pub enum NdcErrorResponse { + #[serde(rename = "v0.1.x")] + V01(ndc_models_v01::ErrorResponse), + #[serde(rename = "v0.2.x")] + V02(ndc_models_v02::ErrorResponse), +} + +impl NdcErrorResponse { + pub fn message(&self) -> &str { + match self { + NdcErrorResponse::V01(err) => err.message.as_str(), + NdcErrorResponse::V02(err) => err.message.as_str(), + } + } + + pub fn details(&self) -> &serde_json::Value { + match self { + NdcErrorResponse::V01(err) => &err.details, + NdcErrorResponse::V02(err) => &err.details, + } + } +} diff --git a/v3/crates/execute/src/plan.rs b/v3/crates/execute/src/plan.rs index c8cb65ab30c..9cf44157d1a 100644 --- a/v3/crates/execute/src/plan.rs +++ b/v3/crates/execute/src/plan.rs @@ -1,6 +1,7 @@ mod commands; pub(crate) mod error; mod model_selection; +mod ndc_request; mod relationships; pub(crate) mod selection_set; @@ -103,7 +104,7 @@ pub enum ApolloFederationSelect<'n, 's, 'ir> { #[derive(Debug)] pub struct NDCMutationExecution<'n, 's, 'ir> { - pub query: ndc_models::MutationRequest, + pub query: ndc::NdcMutationRequest, pub join_locations: JoinLocations<(RemoteJoin<'s, 'ir>, JoinId)>, pub data_connector: &'s metadata_resolve::DataConnectorLink, pub execution_span_attribute: String, @@ -120,7 +121,7 @@ pub struct ExecutionTree<'s, 'ir> { #[derive(Debug)] pub struct ExecutionNode<'s> { - pub query: ndc_models::QueryRequest, + pub query: ndc::NdcQueryRequest, pub data_connector: &'s metadata_resolve::DataConnectorLink, } @@ -200,7 +201,7 @@ fn plan_mutation<'n, 's, 'ir>( ) -> Result, error::Error> { let mut join_id_counter = MonotonicCounter::new(); let (ndc_ir, join_locations) = - commands::ndc_mutation_ir(ir.procedure_name, ir, &mut join_id_counter)?; + ndc_request::ndc_procedure_mutation_request(ir.procedure_name, ir, &mut join_id_counter)?; let join_locations_ids = assign_with_join_ids(join_locations)?; Ok(NDCMutationExecution { query: ndc_ir, @@ -295,7 +296,8 @@ fn plan_query<'n, 's, 'ir>( None => NodeQueryPlan::RelayNodeSelect(None), }, root_field::QueryRootField::FunctionBasedCommand { ir, selection_set } => { - let (ndc_ir, join_locations) = commands::ndc_query_ir(ir, &mut counter)?; + let (ndc_ir, join_locations) = + ndc_request::make_ndc_function_query_request(ir, &mut counter)?; let join_locations_ids = assign_with_join_ids(join_locations)?; let execution_tree = ExecutionTree { root_node: ExecutionNode { @@ -357,7 +359,7 @@ fn generate_execution_tree<'s, 'ir>( ir: &'ir ModelSelection<'s>, ) -> Result, error::Error> { let mut counter = MonotonicCounter::new(); - let (ndc_ir, join_locations) = model_selection::ndc_ir(ir, &mut counter)?; + let (ndc_ir, join_locations) = ndc_request::make_ndc_model_query_request(ir, &mut counter)?; let join_locations_with_ids = assign_with_join_ids(join_locations)?; Ok(ExecutionTree { root_node: ExecutionNode { @@ -890,7 +892,7 @@ async fn resolve_ndc_query_execution( process_response_as, } = ndc_query; - let mut response = ndc::execute_ndc_query( + let response = ndc::execute_ndc_query( http_context, &execution_tree.root_node.query, execution_tree.root_node.data_connector, @@ -900,19 +902,21 @@ async fn resolve_ndc_query_execution( ) .await?; + let mut response_rowsets = response.as_latest_rowsets(); + // TODO: Failures in remote joins should result in partial response // https://github.com/hasura/v3-engine/issues/229 execute_join_locations( http_context, execution_span_attribute, - &mut response, + &mut response_rowsets, process_response_as, &execution_tree.remote_executions, project_id, ) .await?; - process_response(selection_set, response, process_response_as) + process_response(selection_set, response_rowsets, process_response_as) } async fn resolve_ndc_mutation_execution( @@ -939,7 +943,8 @@ async fn resolve_ndc_mutation_execution( field_span_attribute, project_id, ) - .await?; + .await? + .as_latest(); process_mutation_response(selection_set, response, &process_response_as) } diff --git a/v3/crates/execute/src/plan/error.rs b/v3/crates/execute/src/plan/error.rs index 4a2bfb9d264..a80b95c9caa 100644 --- a/v3/crates/execute/src/plan/error.rs +++ b/v3/crates/execute/src/plan/error.rs @@ -2,6 +2,8 @@ use open_dds::{relationships::RelationshipName, types::FieldName}; use thiserror::Error; use tracing_util::TraceableError; +use crate::ndc; + #[derive(Error, Debug)] pub enum Error { #[error("internal: {0}")] @@ -29,4 +31,7 @@ pub enum InternalError { #[error("generic error: {description}")] InternalGeneric { description: String }, + + #[error("error when downgrading ndc request: {0}")] + NdcRequestDowngradeError(ndc::migration::NdcDowngradeError), } diff --git a/v3/crates/execute/src/plan/model_selection.rs b/v3/crates/execute/src/plan/model_selection.rs index 02ad7afd18c..675de54b01f 100644 --- a/v3/crates/execute/src/plan/model_selection.rs +++ b/v3/crates/execute/src/plan/model_selection.rs @@ -4,11 +4,11 @@ use std::collections::BTreeMap; use indexmap::IndexMap; -use super::error; use super::relationships; use super::selection_set; use crate::ir::aggregates::{AggregateFieldSelection, AggregateSelectionSet}; use crate::ir::model_selection::ModelSelection; +use crate::plan::error; use crate::remote_joins::types::{JoinLocations, MonotonicCounter, RemoteJoin}; /// Create an NDC `Query` based on the internal IR `ModelSelection` settings diff --git a/v3/crates/execute/src/plan/ndc_request.rs b/v3/crates/execute/src/plan/ndc_request.rs new file mode 100644 index 00000000000..ad3efa5c7f8 --- /dev/null +++ b/v3/crates/execute/src/plan/ndc_request.rs @@ -0,0 +1,79 @@ +use metadata_resolve::data_connectors::NdcVersion; +use open_dds::commands::ProcedureName; + +use crate::ir::commands::FunctionBasedCommand; +use crate::ir::commands::ProcedureBasedCommand; +use crate::ir::model_selection::ModelSelection; +use crate::ndc; +use crate::remote_joins::types::{JoinLocations, MonotonicCounter, RemoteJoin}; + +use super::commands; +use super::error; +use super::model_selection; + +pub(crate) fn make_ndc_model_query_request<'s, 'ir>( + ir: &'ir ModelSelection<'s>, + join_id_counter: &mut MonotonicCounter, +) -> Result<(ndc::NdcQueryRequest, JoinLocations>), error::Error> { + match ir.data_connector.capabilities.supported_ndc_version { + NdcVersion::V01 => { + let (request, join_locations) = model_selection::ndc_ir(ir, join_id_counter)?; + let v01_request = ndc::migration::v01::downgrade_v02_query_request(request) + .map_err(error::InternalError::NdcRequestDowngradeError)?; + Ok((ndc::NdcQueryRequest::V01(v01_request), join_locations)) + } + NdcVersion::V02 => { + let (request, join_locations) = model_selection::ndc_ir(ir, join_id_counter)?; + Ok((ndc::NdcQueryRequest::V02(request), join_locations)) + } + } +} + +pub(crate) fn make_ndc_function_query_request<'s, 'ir>( + ir: &'ir FunctionBasedCommand<'s>, + join_id_counter: &mut MonotonicCounter, +) -> Result<(ndc::NdcQueryRequest, JoinLocations>), error::Error> { + match ir + .command_info + .data_connector + .capabilities + .supported_ndc_version + { + NdcVersion::V01 => { + let (request, join_locations) = commands::ndc_query_ir(ir, join_id_counter)?; + let v01_request = ndc::migration::v01::downgrade_v02_query_request(request) + .map_err(error::InternalError::NdcRequestDowngradeError)?; + Ok((ndc::NdcQueryRequest::V01(v01_request), join_locations)) + } + NdcVersion::V02 => { + let (request, join_locations) = commands::ndc_query_ir(ir, join_id_counter)?; + Ok((ndc::NdcQueryRequest::V02(request), join_locations)) + } + } +} + +pub(crate) fn ndc_procedure_mutation_request<'s, 'ir>( + procedure_name: &ProcedureName, + ir: &'ir ProcedureBasedCommand<'s>, + join_id_counter: &mut MonotonicCounter, +) -> Result<(ndc::NdcMutationRequest, JoinLocations>), error::Error> { + match ir + .command_info + .data_connector + .capabilities + .supported_ndc_version + { + NdcVersion::V01 => { + let (request, join_locations) = + commands::ndc_mutation_ir(procedure_name, ir, join_id_counter)?; + let v01_request = ndc::migration::v01::downgrade_v02_mutation_request(request) + .map_err(error::InternalError::NdcRequestDowngradeError)?; + Ok((ndc::NdcMutationRequest::V01(v01_request), join_locations)) + } + NdcVersion::V02 => { + let (request, join_locations) = + commands::ndc_mutation_ir(procedure_name, ir, join_id_counter)?; + Ok((ndc::NdcMutationRequest::V02(request), join_locations)) + } + } +} diff --git a/v3/crates/execute/src/plan/relationships.rs b/v3/crates/execute/src/plan/relationships.rs index 2ab297a918f..f74acc2d4af 100644 --- a/v3/crates/execute/src/plan/relationships.rs +++ b/v3/crates/execute/src/plan/relationships.rs @@ -3,11 +3,11 @@ use open_dds::relationships::RelationshipType; use std::collections::BTreeMap; -use super::error; use super::selection_set; use crate::ir::model_selection::ModelSelection; use crate::ir::relationship::{self, LocalCommandRelationshipInfo, LocalModelRelationshipInfo}; use crate::ir::selection_set::FieldSelection; +use crate::plan::error; /// collect relationships recursively from IR components containing relationships, /// and create NDC relationship definitions which will be added to the `relationships` diff --git a/v3/crates/execute/src/plan/selection_set.rs b/v3/crates/execute/src/plan/selection_set.rs index ffb1e77335f..bd83f7e75bb 100644 --- a/v3/crates/execute/src/plan/selection_set.rs +++ b/v3/crates/execute/src/plan/selection_set.rs @@ -1,9 +1,10 @@ use super::commands; -use super::error; use super::model_selection; use super::relationships; -use super::ProcessResponseAs; use crate::ir::selection_set::{FieldSelection, NestedSelection, ResultSelectionSet}; +use crate::plan::error; +use crate::plan::ndc_request; +use crate::plan::ProcessResponseAs; use crate::remote_joins::types::SourceFieldAlias; use crate::remote_joins::types::{ JoinLocations, JoinNode, LocationKind, MonotonicCounter, RemoteJoinType, TargetField, @@ -173,7 +174,8 @@ pub(crate) fn process_selection_set_ir<'s, 'ir>( ); } // Construct the `JoinLocations` tree - let (ndc_ir, sub_join_locations) = model_selection::ndc_ir(ir, join_id_counter)?; + let (ndc_ir, sub_join_locations) = + ndc_request::make_ndc_model_query_request(ir, join_id_counter)?; let rj_info = RemoteJoin { target_ndc_ir: ndc_ir, target_data_connector: ir.data_connector, @@ -211,7 +213,8 @@ pub(crate) fn process_selection_set_ir<'s, 'ir>( ); } // Construct the `JoinLocations` tree - let (ndc_ir, sub_join_locations) = commands::ndc_query_ir(ir, join_id_counter)?; + let (ndc_ir, sub_join_locations) = + ndc_request::make_ndc_function_query_request(ir, join_id_counter)?; let rj_info = RemoteJoin { target_ndc_ir: ndc_ir, target_data_connector: ir.command_info.data_connector, diff --git a/v3/crates/execute/src/remote_joins.rs b/v3/crates/execute/src/remote_joins.rs index ab9b4ce8199..fd01491f367 100644 --- a/v3/crates/execute/src/remote_joins.rs +++ b/v3/crates/execute/src/remote_joins.rs @@ -148,7 +148,9 @@ where .collect() }) .collect(); - join_node.target_ndc_ir.variables = Some(foreach_variables); + join_node + .target_ndc_ir + .set_variables(Some(foreach_variables)); // execute the remote query let mut target_response = tracer @@ -167,7 +169,8 @@ where )) }, ) - .await?; + .await? + .as_latest_rowsets(); // if the sub-tree is not empty, recursively process the sub-tree; which // will modify the `target_response` with all joins down the tree diff --git a/v3/crates/execute/src/remote_joins/types.rs b/v3/crates/execute/src/remote_joins/types.rs index a05d7a4caa6..b74eedd839e 100644 --- a/v3/crates/execute/src/remote_joins/types.rs +++ b/v3/crates/execute/src/remote_joins/types.rs @@ -7,6 +7,7 @@ use json_ext::ValueExt; use open_dds::arguments::ArgumentName; use open_dds::types::FieldName; +use crate::ndc; use crate::plan::ProcessResponseAs; /// This tree structure captures all the locations (in the selection set IR) where @@ -108,7 +109,7 @@ pub struct RemoteJoin<'s, 'ir> { /// target data connector to execute query on pub target_data_connector: &'s metadata_resolve::DataConnectorLink, /// NDC IR to execute on a data connector - pub target_ndc_ir: ndc_models::QueryRequest, + pub target_ndc_ir: ndc::NdcQueryRequest, /// Mapping of the fields in source to fields in target. /// The HashMap has the following info - /// - key: is the field name in the source diff --git a/v3/crates/execute/tests/generate_ir/field_argument/expected.json b/v3/crates/execute/tests/generate_ir/field_argument/expected.json index 293fd733a9c..c2060d00e11 100644 --- a/v3/crates/execute/tests/generate_ir/field_argument/expected.json +++ b/v3/crates/execute/tests/generate_ir/field_argument/expected.json @@ -145,6 +145,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, diff --git a/v3/crates/execute/tests/generate_ir/get_by_id/expected.json b/v3/crates/execute/tests/generate_ir/get_by_id/expected.json index e5e990576ee..da94581d96e 100644 --- a/v3/crates/execute/tests/generate_ir/get_by_id/expected.json +++ b/v3/crates/execute/tests/generate_ir/get_by_id/expected.json @@ -114,6 +114,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, diff --git a/v3/crates/execute/tests/generate_ir/get_many/expected.json b/v3/crates/execute/tests/generate_ir/get_many/expected.json index ca09407c85a..0f865227567 100644 --- a/v3/crates/execute/tests/generate_ir/get_many/expected.json +++ b/v3/crates/execute/tests/generate_ir/get_many/expected.json @@ -100,6 +100,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, diff --git a/v3/crates/execute/tests/generate_ir/get_many_model_count/expected.json b/v3/crates/execute/tests/generate_ir/get_many_model_count/expected.json index aa5865859f6..97cd48a2425 100644 --- a/v3/crates/execute/tests/generate_ir/get_many_model_count/expected.json +++ b/v3/crates/execute/tests/generate_ir/get_many_model_count/expected.json @@ -38,6 +38,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, @@ -162,6 +163,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, @@ -286,6 +288,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, @@ -511,6 +514,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, @@ -536,6 +540,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, @@ -568,6 +573,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, @@ -600,6 +606,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, @@ -655,6 +662,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, @@ -711,6 +719,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, @@ -809,6 +818,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, @@ -865,6 +875,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, @@ -963,6 +974,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, @@ -1019,6 +1031,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, diff --git a/v3/crates/execute/tests/generate_ir/get_many_user_2/expected.json b/v3/crates/execute/tests/generate_ir/get_many_user_2/expected.json index a805cb46d23..1e069da48d4 100644 --- a/v3/crates/execute/tests/generate_ir/get_many_user_2/expected.json +++ b/v3/crates/execute/tests/generate_ir/get_many_user_2/expected.json @@ -114,6 +114,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, diff --git a/v3/crates/execute/tests/generate_ir/get_many_where/expected.json b/v3/crates/execute/tests/generate_ir/get_many_where/expected.json index 3c8d5a4f63e..3006c163558 100644 --- a/v3/crates/execute/tests/generate_ir/get_many_where/expected.json +++ b/v3/crates/execute/tests/generate_ir/get_many_where/expected.json @@ -100,6 +100,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, @@ -300,6 +301,7 @@ "hasura-m-auth-token": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.~!#$&'()*+,/:;=?@[]\"" }, "capabilities": { + "supported_ndc_version": "V01", "supports_explaining_queries": true, "supports_explaining_mutations": false, "supports_nested_object_filtering": true, diff --git a/v3/crates/metadata-resolve/src/stages/data_connectors/mod.rs b/v3/crates/metadata-resolve/src/stages/data_connectors/mod.rs index ffeebfd6579..7d69234a3ca 100644 --- a/v3/crates/metadata-resolve/src/stages/data_connectors/mod.rs +++ b/v3/crates/metadata-resolve/src/stages/data_connectors/mod.rs @@ -6,7 +6,7 @@ mod types; use std::collections::BTreeMap; pub use types::{ ArgumentPreset, CommandsResponseConfig, DataConnectorCapabilities, DataConnectorContext, - DataConnectorLink, DataConnectorSchema, DataConnectors, + DataConnectorLink, DataConnectorSchema, DataConnectors, NdcVersion, }; /// Resolve data connectors. diff --git a/v3/crates/metadata-resolve/src/stages/data_connectors/types.rs b/v3/crates/metadata-resolve/src/stages/data_connectors/types.rs index 0a1cfd01e23..6ebeae9dc87 100644 --- a/v3/crates/metadata-resolve/src/stages/data_connectors/types.rs +++ b/v3/crates/metadata-resolve/src/stages/data_connectors/types.rs @@ -20,7 +20,7 @@ use open_dds::{ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; -#[derive(Debug, Eq, PartialEq, Clone, Copy)] +#[derive(Debug, Eq, PartialEq, Clone, Copy, Serialize, Deserialize)] pub enum NdcVersion { V01, V02, @@ -224,6 +224,7 @@ impl DataConnectorLink { }, })?; let capabilities = DataConnectorCapabilities { + supported_ndc_version: context.supported_ndc_version, supports_explaining_queries: context.capabilities.query.explain.is_some(), supports_explaining_mutations: context.capabilities.mutation.explain.is_some(), supports_nested_object_filtering: context @@ -392,6 +393,7 @@ impl CommandsResponseConfig { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] #[allow(clippy::struct_excessive_bools)] pub struct DataConnectorCapabilities { + pub supported_ndc_version: NdcVersion, pub supports_explaining_queries: bool, pub supports_explaining_mutations: bool, pub supports_nested_object_filtering: bool, diff --git a/v3/crates/metadata-resolve/tests/passing/aggregate_expressions/relationship/resolved.snap b/v3/crates/metadata-resolve/tests/passing/aggregate_expressions/relationship/resolved.snap index b88b88b895b..5d83d5d8bec 100644 --- a/v3/crates/metadata-resolve/tests/passing/aggregate_expressions/relationship/resolved.snap +++ b/v3/crates/metadata-resolve/tests/passing/aggregate_expressions/relationship/resolved.snap @@ -1190,6 +1190,7 @@ Metadata { argument_presets: [], response_config: None, capabilities: DataConnectorCapabilities { + supported_ndc_version: V01, supports_explaining_queries: true, supports_explaining_mutations: true, supports_nested_object_filtering: false, @@ -1834,6 +1835,7 @@ Metadata { argument_presets: [], response_config: None, capabilities: DataConnectorCapabilities { + supported_ndc_version: V01, supports_explaining_queries: true, supports_explaining_mutations: true, supports_nested_object_filtering: false, diff --git a/v3/crates/metadata-resolve/tests/passing/aggregate_expressions/root_field/resolved.snap b/v3/crates/metadata-resolve/tests/passing/aggregate_expressions/root_field/resolved.snap index 4db4ce8ad82..36f63846888 100644 --- a/v3/crates/metadata-resolve/tests/passing/aggregate_expressions/root_field/resolved.snap +++ b/v3/crates/metadata-resolve/tests/passing/aggregate_expressions/root_field/resolved.snap @@ -772,6 +772,7 @@ Metadata { argument_presets: [], response_config: None, capabilities: DataConnectorCapabilities { + supported_ndc_version: V01, supports_explaining_queries: true, supports_explaining_mutations: true, supports_nested_object_filtering: false, diff --git a/v3/crates/metadata-resolve/tests/passing/boolean_expression_type/nested_object/resolved.snap b/v3/crates/metadata-resolve/tests/passing/boolean_expression_type/nested_object/resolved.snap index 2e1765c8d0e..fe552cb969d 100644 --- a/v3/crates/metadata-resolve/tests/passing/boolean_expression_type/nested_object/resolved.snap +++ b/v3/crates/metadata-resolve/tests/passing/boolean_expression_type/nested_object/resolved.snap @@ -790,6 +790,7 @@ Metadata { argument_presets: [], response_config: None, capabilities: DataConnectorCapabilities { + supported_ndc_version: V01, supports_explaining_queries: true, supports_explaining_mutations: false, supports_nested_object_filtering: true, diff --git a/v3/crates/metadata-resolve/tests/passing/boolean_expression_type/range/resolved.snap b/v3/crates/metadata-resolve/tests/passing/boolean_expression_type/range/resolved.snap index 7db246add74..81e19866d2b 100644 --- a/v3/crates/metadata-resolve/tests/passing/boolean_expression_type/range/resolved.snap +++ b/v3/crates/metadata-resolve/tests/passing/boolean_expression_type/range/resolved.snap @@ -401,6 +401,7 @@ Metadata { argument_presets: [], response_config: None, capabilities: DataConnectorCapabilities { + supported_ndc_version: V01, supports_explaining_queries: true, supports_explaining_mutations: false, supports_nested_object_filtering: true, diff --git a/v3/crates/metadata-resolve/tests/passing/missing_subgraph_when_ignoring_unknown_subgraphs/resolved.snap b/v3/crates/metadata-resolve/tests/passing/missing_subgraph_when_ignoring_unknown_subgraphs/resolved.snap index 5c395d9b128..e3f0997c2e4 100644 --- a/v3/crates/metadata-resolve/tests/passing/missing_subgraph_when_ignoring_unknown_subgraphs/resolved.snap +++ b/v3/crates/metadata-resolve/tests/passing/missing_subgraph_when_ignoring_unknown_subgraphs/resolved.snap @@ -334,6 +334,7 @@ Metadata { argument_presets: [], response_config: None, capabilities: DataConnectorCapabilities { + supported_ndc_version: V01, supports_explaining_queries: true, supports_explaining_mutations: false, supports_nested_object_filtering: true, diff --git a/v3/crates/sql/src/plan.rs b/v3/crates/sql/src/plan.rs index 7e70add96b2..6c705af422f 100644 --- a/v3/crates/sql/src/plan.rs +++ b/v3/crates/sql/src/plan.rs @@ -23,6 +23,9 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum ExecutionPlanError { + #[error("{0}")] + NDCDowngradeError(#[from] execute::ndc::migration::NdcDowngradeError), + #[error("{0}")] NDCExecutionError(#[from] execute::ndc::client::Error), @@ -224,7 +227,17 @@ pub async fn fetch_from_data_connector( data_connector: Arc, ) -> Result { let tracer = tracing_util::global_tracer(); - let mut ndc_response = + let query_request = match data_connector.capabilities.supported_ndc_version { + metadata_resolve::data_connectors::NdcVersion::V01 => execute::ndc::NdcQueryRequest::V01( + execute::ndc::migration::v01::downgrade_v02_query_request( + query_request.as_ref().clone(), + )?, + ), + metadata_resolve::data_connectors::NdcVersion::V02 => { + execute::ndc::NdcQueryRequest::V02(query_request.as_ref().clone()) + } + }; + let ndc_response = execute::fetch_from_data_connector(&http_context, &query_request, &data_connector, None) .await?; let batch = tracer.in_span( @@ -233,7 +246,7 @@ pub async fn fetch_from_data_connector( SpanVisibility::Internal, || { let rows = ndc_response - .0 + .as_latest_rowsets() .pop() .ok_or_else(|| { ExecutionPlanError::NDCResponseFormat("no row sets found".to_string())