From bd80f8d81ada62d7cb4a32f567c0744d52272be7 Mon Sep 17 00:00:00 2001 From: Krushan Bauva Date: Fri, 19 Jan 2024 16:48:48 +0530 Subject: [PATCH] send project-id as header to ndc for metrics (#292) V3_GIT_ORIGIN_REV_ID: 723bd361f8e7d136a02f58873b5edcf5089acf03 --- v3/engine/benches/execute.rs | 4 +-- v3/engine/bin/engine/main.rs | 1 + v3/engine/src/execute.rs | 18 +++++++--- v3/engine/src/execute/error.rs | 4 ++- v3/engine/src/execute/explain.rs | 1 + v3/engine/src/execute/ndc.rs | 49 ++++++++++++++++++++++----- v3/engine/src/execute/query_plan.rs | 26 +++++++++++--- v3/engine/src/execute/remote_joins.rs | 5 ++- v3/engine/tests/common.rs | 13 +++++-- 9 files changed, 97 insertions(+), 24 deletions(-) diff --git a/v3/engine/benches/execute.rs b/v3/engine/benches/execute.rs index 0ffcd56d750..a2b77a64f08 100644 --- a/v3/engine/benches/execute.rs +++ b/v3/engine/benches/execute.rs @@ -130,7 +130,7 @@ pub fn bench_execute( &(&runtime), |b, runtime| { b.to_async(*runtime).iter(|| async { - execute_query_plan(&http_client, generate_query_plan(&ir).unwrap()).await + execute_query_plan(&http_client, generate_query_plan(&ir).unwrap(), None).await }) }, ); @@ -141,7 +141,7 @@ pub fn bench_execute( &(&runtime, &schema, raw_request), |b, (runtime, schema, request)| { b.to_async(*runtime).iter(|| async { - execute_query_internal(&http_client, schema, &session, request.clone()) + execute_query_internal(&http_client, schema, &session, request.clone(), None) .await .unwrap() }) diff --git a/v3/engine/bin/engine/main.rs b/v3/engine/bin/engine/main.rs index 334ab6b9865..4af97a53186 100644 --- a/v3/engine/bin/engine/main.rs +++ b/v3/engine/bin/engine/main.rs @@ -298,6 +298,7 @@ async fn handle_request( &state.schema, &session, request, + None, )) }) .await; diff --git a/v3/engine/src/execute.rs b/v3/engine/src/execute.rs index 829d7f64f96..20c0ed23c2b 100644 --- a/v3/engine/src/execute.rs +++ b/v3/engine/src/execute.rs @@ -79,13 +79,17 @@ impl Traceable for ExecuteOrExplainResponse { } } +#[derive(Clone, Debug)] +pub struct ProjectId(pub String); + pub async fn execute_query( http_client: &reqwest::Client, schema: &Schema, session: &Session, request: RawRequest, + project_id: Option, ) -> GraphQLResponse { - execute_query_internal(http_client, schema, session, request) + execute_query_internal(http_client, schema, session, request, project_id) .await .unwrap_or_else(|e| GraphQLResponse(Response::error(e.to_graphql_error(None)))) } @@ -115,6 +119,7 @@ pub async fn execute_query_internal( schema: &gql::schema::Schema, session: &Session, raw_request: gql::http::RawRequest, + project_id: Option, ) -> Result { let query_response = execute_request_internal( http_client, @@ -122,6 +127,7 @@ pub async fn execute_query_internal( session, raw_request, explain::types::RequestMode::Execute, + project_id, ) .await?; match query_response { @@ -141,6 +147,7 @@ pub async fn execute_request_internal( session: &Session, raw_request: gql::http::RawRequest, request_mode: explain::types::RequestMode, + project_id: Option, ) -> Result { let tracer = tracing_util::global_tracer(); let mode_query_span_name = match request_mode { @@ -217,9 +224,12 @@ pub async fn execute_request_internal( ); Box::pin(async { - let execute_query_result = - query_plan::execute_query_plan(http_client, query_plan) - .await; + let execute_query_result = query_plan::execute_query_plan( + http_client, + query_plan, + project_id, + ) + .await; ExecuteOrExplainResponse::Execute(GraphQLResponse( execute_query_result.to_graphql_response(), )) diff --git a/v3/engine/src/execute/error.rs b/v3/engine/src/execute/error.rs index 35c640c0596..58d10bd3ac5 100644 --- a/v3/engine/src/execute/error.rs +++ b/v3/engine/src/execute/error.rs @@ -5,7 +5,7 @@ use open_dds::{ session_variables::SessionVariable, types::{CustomTypeName, FieldName}, }; -use reqwest::StatusCode; +use reqwest::{header::InvalidHeaderValue, StatusCode}; use serde_json as json; use thiserror::Error; use tracing_util::{ErrorVisibility, TraceableError}; @@ -163,6 +163,8 @@ pub enum Error { InternalError(#[from] InternalError), #[error("explain error: {0}")] ExplainError(String), + #[error("invalid header value characters in project_id: {0}")] + ProjectIdConversionError(InvalidHeaderValue), } impl Error { diff --git a/v3/engine/src/execute/explain.rs b/v3/engine/src/execute/explain.rs index 99d583c2a14..9f907988094 100644 --- a/v3/engine/src/execute/explain.rs +++ b/v3/engine/src/execute/explain.rs @@ -36,6 +36,7 @@ pub async fn execute_explain_internal( session, raw_request, types::RequestMode::Explain, + None, ) .await?; match query_response { diff --git a/v3/engine/src/execute/ndc.rs b/v3/engine/src/execute/ndc.rs index b506c3a2be6..5a3d776ee73 100644 --- a/v3/engine/src/execute/ndc.rs +++ b/v3/engine/src/execute/ndc.rs @@ -1,3 +1,4 @@ +use axum::http::HeaderMap; use serde_json as json; use gql::normalized_ast; @@ -6,9 +7,9 @@ use lang_graphql::ast::common as ast; use ndc_client as ndc; use tracing_util::{set_attribute_on_active_span, AttributeVisibility, SpanVisibility}; -use super::error; use super::process_response::process_command_rows; use super::query_plan::ProcessResponseAs; +use super::{error, ProjectId}; use crate::metadata::resolved; use crate::schema::GDS; @@ -19,6 +20,7 @@ pub async fn execute_ndc_query<'n, 's>( data_connector: &resolved::data_connector::DataConnector, execution_span_attribute: String, field_span_attribute: String, + project_id: Option, ) -> Result, error::Error> { let tracer = tracing_util::global_tracer(); tracer @@ -35,7 +37,8 @@ pub async fn execute_ndc_query<'n, 's>( field_span_attribute, ); let connector_response = - fetch_from_data_connector(http_client, query, data_connector).await?; + fetch_from_data_connector(http_client, query, data_connector, project_id) + .await?; Ok(connector_response.0) }) }) @@ -46,6 +49,7 @@ pub(crate) async fn fetch_from_data_connector<'s>( http_client: &reqwest::Client, query_request: ndc::models::QueryRequest, data_connector: &resolved::data_connector::DataConnector, + project_id: Option, ) -> Result { let tracer = tracing_util::global_tracer(); tracer @@ -54,12 +58,14 @@ pub(crate) async fn fetch_from_data_connector<'s>( SpanVisibility::Internal, || { Box::pin(async { + let headers = + append_project_id_to_headers(data_connector.headers.0.clone(), project_id)?; let ndc_config = ndc::apis::configuration::Configuration { base_path: data_connector.url.get_url(ast::OperationType::Query), user_agent: None, // This is isn't expensive, reqwest::Client is behind an Arc client: http_client.clone(), - headers: data_connector.headers.0.clone(), + headers, }; ndc::apis::default_api::query_post(&ndc_config, query_request) .await @@ -70,6 +76,24 @@ pub(crate) async fn fetch_from_data_connector<'s>( .await } +// This function appends project-id (if present) to the HeaderMap defined by the data_connector object +pub fn append_project_id_to_headers( + mut headers: HeaderMap, + project_id: Option, +) -> Result { + match project_id { + None => Ok(headers), + Some(project_id) => { + headers.append( + "project-id", + reqwest::header::HeaderValue::from_str(&project_id.0) + .map_err(error::Error::ProjectIdConversionError)?, + ); + Ok(headers) + } + } +} + /// Executes a NDC mutation pub(crate) async fn execute_ndc_mutation<'n, 's, 'ir>( http_client: &reqwest::Client, @@ -79,6 +103,7 @@ pub(crate) async fn execute_ndc_mutation<'n, 's, 'ir>( execution_span_attribute: String, field_span_attribute: String, process_response_as: ProcessResponseAs<'ir>, + project_id: Option, ) -> Result { let tracer = tracing_util::global_tracer(); tracer @@ -94,8 +119,13 @@ pub(crate) async fn execute_ndc_mutation<'n, 's, 'ir>( "field", field_span_attribute, ); - let connector_response = - fetch_from_data_connector_mutation(http_client, query, data_connector).await?; + let connector_response = fetch_from_data_connector_mutation( + http_client, + query, + data_connector, + project_id, + ) + .await?; // Post process the response to add the `__typename` fields tracer.in_span("process_response", SpanVisibility::Internal, || { // NOTE: NDC returns a `Vec` (to account for @@ -138,6 +168,7 @@ pub(crate) async fn fetch_from_data_connector_mutation<'s>( http_client: &reqwest::Client, query_request: ndc::models::MutationRequest, data_connector: &resolved::data_connector::DataConnector, + project_id: Option, ) -> Result { let tracer = tracing_util::global_tracer(); tracer @@ -146,14 +177,16 @@ pub(crate) async fn fetch_from_data_connector_mutation<'s>( SpanVisibility::Internal, || { Box::pin(async { - let gdc_config = ndc::apis::configuration::Configuration { + let headers = + append_project_id_to_headers(data_connector.headers.0.clone(), project_id)?; + let ndc_config = ndc::apis::configuration::Configuration { base_path: data_connector.url.get_url(ast::OperationType::Mutation), user_agent: None, // This is isn't expensive, reqwest::Client is behind an Arc client: http_client.clone(), - headers: data_connector.headers.0.clone(), + headers, }; - ndc::apis::default_api::mutation_post(&gdc_config, query_request) + ndc::apis::default_api::mutation_post(&ndc_config, query_request) .await .map_err(error::Error::from) // ndc_client::apis::Error -> InternalError -> Error }) diff --git a/v3/engine/src/execute/query_plan.rs b/v3/engine/src/execute/query_plan.rs index c0816ce6721..9a51895a802 100644 --- a/v3/engine/src/execute/query_plan.rs +++ b/v3/engine/src/execute/query_plan.rs @@ -18,6 +18,7 @@ use super::ndc; use super::process_response::process_response; use super::remote_joins::execute_join_locations; use super::remote_joins::types::{JoinId, JoinLocations, Location, MonotonicCounter, RemoteJoin}; +use super::ProjectId; use crate::metadata::resolved::{self, subgraph}; use crate::schema::GDS; @@ -414,6 +415,7 @@ impl ExecuteQueryResult { async fn execute_field_plan<'n, 's, 'ir>( http_client: &reqwest::Client, field_plan: NodeQueryPlan<'n, 's, 'ir>, + project_id: Option, ) -> RootFieldResult { let tracer = tracing_util::global_tracer(); tracer @@ -467,17 +469,19 @@ async fn execute_field_plan<'n, 's, 'ir>( } NodeQueryPlan::NDCQueryExecution(ndc_query) => RootFieldResult::new( &ndc_query.process_response_as.is_nullable(), - resolve_ndc_query_execution(http_client, ndc_query).await, + resolve_ndc_query_execution(http_client, ndc_query, project_id).await, ), NodeQueryPlan::NDCMutationExecution(ndc_query) => RootFieldResult::new( &ndc_query.process_response_as.is_nullable(), - resolve_ndc_mutation_execution(http_client, ndc_query).await, + resolve_ndc_mutation_execution(http_client, ndc_query, project_id) + .await, ), NodeQueryPlan::RelayNodeSelect(optional_query) => RootFieldResult::new( &optional_query.as_ref().map_or(true, |ndc_query| { ndc_query.process_response_as.is_nullable() }), - resolve_relay_node_select(http_client, optional_query).await, + resolve_relay_node_select(http_client, optional_query, project_id) + .await, ), } }) @@ -489,6 +493,7 @@ async fn execute_field_plan<'n, 's, 'ir>( pub async fn execute_query_plan<'n, 's, 'ir>( http_client: &reqwest::Client, query_plan: QueryPlan<'n, 's, 'ir>, + project_id: Option, ) -> ExecuteQueryResult { let mut root_fields = IndexMap::new(); @@ -497,7 +502,12 @@ pub async fn execute_query_plan<'n, 's, 'ir>( for (alias, field_plan) in query_plan.into_iter() { // We are not running the field plans parallely here, we are just running them concurrently on a single thread. // To run the field plans parallely, we will need to use tokio::spawn for each field plan. - let task = async { (alias, execute_field_plan(http_client, field_plan).await) }; + let task = async { + ( + alias, + execute_field_plan(http_client, field_plan, project_id.clone()).await, + ) + }; tasks.push(task); } @@ -548,6 +558,7 @@ fn resolve_schema_field( async fn resolve_ndc_query_execution( http_client: &reqwest::Client, ndc_query: NDCQueryExecution<'_, '_>, + project_id: Option, ) -> Result { let NDCQueryExecution { execution_tree, @@ -562,6 +573,7 @@ async fn resolve_ndc_query_execution( execution_tree.root_node.data_connector, execution_span_attribute.clone(), field_span_attribute.clone(), + project_id.clone(), ) .await?; // TODO: Failures in remote joins should result in partial response @@ -573,6 +585,7 @@ async fn resolve_ndc_query_execution( &mut response, &process_response_as, execution_tree.remote_executions, + project_id, ) .await?; let result = process_response(selection_set, response, process_response_as)?; @@ -582,6 +595,7 @@ async fn resolve_ndc_query_execution( async fn resolve_ndc_mutation_execution( http_client: &reqwest::Client, ndc_query: NDCMutationExecution<'_, '_, '_>, + project_id: Option, ) -> Result { let NDCMutationExecution { query, @@ -601,6 +615,7 @@ async fn resolve_ndc_mutation_execution( execution_span_attribute, field_span_attribute, process_response_as, + project_id, ) .await?; Ok(json::to_value(response)?) @@ -609,9 +624,10 @@ async fn resolve_ndc_mutation_execution( async fn resolve_relay_node_select( http_client: &reqwest::Client, optional_query: Option>, + project_id: Option, ) -> Result { match optional_query { None => Ok(json::Value::Null), - Some(ndc_query) => resolve_ndc_query_execution(http_client, ndc_query).await, + Some(ndc_query) => resolve_ndc_query_execution(http_client, ndc_query, project_id).await, } } diff --git a/v3/engine/src/execute/remote_joins.rs b/v3/engine/src/execute/remote_joins.rs index 2e7d33eb4b9..45b4efdf51a 100644 --- a/v3/engine/src/execute/remote_joins.rs +++ b/v3/engine/src/execute/remote_joins.rs @@ -10,9 +10,9 @@ use ndc_client as ndc; use self::types::TargetField; -use super::error; use super::ndc::execute_ndc_query; use super::query_plan::ProcessResponseAs; +use super::{error, ProjectId}; use types::{ ArgumentId, Arguments, JoinId, JoinLocations, Location, MonotonicCounter, RemoteJoin, @@ -78,6 +78,7 @@ pub async fn execute_join_locations<'ir>( lhs_response: &mut Vec, lhs_response_type: &ProcessResponseAs, join_locations: JoinLocations<(RemoteJoin<'async_recursion, 'ir>, JoinId)>, + project_id: Option, ) -> Result<(), error::Error> where 'ir: 'async_recursion, @@ -124,6 +125,7 @@ where join_node.target_data_connector, execution_span_attribute.clone(), remote_alias.clone(), + project_id.clone(), )) }, ) @@ -140,6 +142,7 @@ where &mut target_response, &join_node.process_response_as, sub_tree, + project_id.clone(), ) .await?; } diff --git a/v3/engine/tests/common.rs b/v3/engine/tests/common.rs index f67776bf6f1..291f47a1572 100644 --- a/v3/engine/tests/common.rs +++ b/v3/engine/tests/common.rs @@ -85,7 +85,8 @@ pub fn test_execution_expectation_legacy(test_path_string: &str, common_metadata // Execute the test - let response = execute_query(&test_ctx.http_client, &schema, &session, raw_request).await; + let response = + execute_query(&test_ctx.http_client, &schema, &session, raw_request, None).await; let mut expected = test_ctx .mint @@ -156,8 +157,14 @@ pub fn test_execution_expectation(test_path_string: &str, common_metadata_paths: // Execute the test let mut responses = Vec::new(); for session in sessions.iter() { - let response = - execute_query(&test_ctx.http_client, &schema, session, raw_request.clone()).await; + let response = execute_query( + &test_ctx.http_client, + &schema, + session, + raw_request.clone(), + None, + ) + .await; responses.push(response.0); }