send project-id as header to ndc for metrics (#292)

V3_GIT_ORIGIN_REV_ID: 723bd361f8e7d136a02f58873b5edcf5089acf03
This commit is contained in:
Krushan Bauva 2024-01-19 16:48:48 +05:30 committed by hasura-bot
parent 3a468d409b
commit bd80f8d81a
9 changed files with 97 additions and 24 deletions

View File

@ -130,7 +130,7 @@ pub fn bench_execute(
&(&runtime),
|b, runtime| {
b.to_async(*runtime).iter(|| async {
execute_query_plan(&http_client, generate_query_plan(&ir).unwrap()).await
execute_query_plan(&http_client, generate_query_plan(&ir).unwrap(), None).await
})
},
);
@ -141,7 +141,7 @@ pub fn bench_execute(
&(&runtime, &schema, raw_request),
|b, (runtime, schema, request)| {
b.to_async(*runtime).iter(|| async {
execute_query_internal(&http_client, schema, &session, request.clone())
execute_query_internal(&http_client, schema, &session, request.clone(), None)
.await
.unwrap()
})

View File

@ -298,6 +298,7 @@ async fn handle_request(
&state.schema,
&session,
request,
None,
))
})
.await;

View File

@ -79,13 +79,17 @@ impl Traceable for ExecuteOrExplainResponse {
}
}
#[derive(Clone, Debug)]
pub struct ProjectId(pub String);
pub async fn execute_query(
http_client: &reqwest::Client,
schema: &Schema<GDS>,
session: &Session,
request: RawRequest,
project_id: Option<ProjectId>,
) -> GraphQLResponse {
execute_query_internal(http_client, schema, session, request)
execute_query_internal(http_client, schema, session, request, project_id)
.await
.unwrap_or_else(|e| GraphQLResponse(Response::error(e.to_graphql_error(None))))
}
@ -115,6 +119,7 @@ pub async fn execute_query_internal(
schema: &gql::schema::Schema<GDS>,
session: &Session,
raw_request: gql::http::RawRequest,
project_id: Option<ProjectId>,
) -> Result<GraphQLResponse, error::Error> {
let query_response = execute_request_internal(
http_client,
@ -122,6 +127,7 @@ pub async fn execute_query_internal(
session,
raw_request,
explain::types::RequestMode::Execute,
project_id,
)
.await?;
match query_response {
@ -141,6 +147,7 @@ pub async fn execute_request_internal(
session: &Session,
raw_request: gql::http::RawRequest,
request_mode: explain::types::RequestMode,
project_id: Option<ProjectId>,
) -> Result<ExecuteOrExplainResponse, error::Error> {
let tracer = tracing_util::global_tracer();
let mode_query_span_name = match request_mode {
@ -217,9 +224,12 @@ pub async fn execute_request_internal(
);
Box::pin(async {
let execute_query_result =
query_plan::execute_query_plan(http_client, query_plan)
.await;
let execute_query_result = query_plan::execute_query_plan(
http_client,
query_plan,
project_id,
)
.await;
ExecuteOrExplainResponse::Execute(GraphQLResponse(
execute_query_result.to_graphql_response(),
))

View File

@ -5,7 +5,7 @@ use open_dds::{
session_variables::SessionVariable,
types::{CustomTypeName, FieldName},
};
use reqwest::StatusCode;
use reqwest::{header::InvalidHeaderValue, StatusCode};
use serde_json as json;
use thiserror::Error;
use tracing_util::{ErrorVisibility, TraceableError};
@ -163,6 +163,8 @@ pub enum Error {
InternalError(#[from] InternalError),
#[error("explain error: {0}")]
ExplainError(String),
#[error("invalid header value characters in project_id: {0}")]
ProjectIdConversionError(InvalidHeaderValue),
}
impl Error {

View File

@ -36,6 +36,7 @@ pub async fn execute_explain_internal(
session,
raw_request,
types::RequestMode::Explain,
None,
)
.await?;
match query_response {

View File

@ -1,3 +1,4 @@
use axum::http::HeaderMap;
use serde_json as json;
use gql::normalized_ast;
@ -6,9 +7,9 @@ use lang_graphql::ast::common as ast;
use ndc_client as ndc;
use tracing_util::{set_attribute_on_active_span, AttributeVisibility, SpanVisibility};
use super::error;
use super::process_response::process_command_rows;
use super::query_plan::ProcessResponseAs;
use super::{error, ProjectId};
use crate::metadata::resolved;
use crate::schema::GDS;
@ -19,6 +20,7 @@ pub async fn execute_ndc_query<'n, 's>(
data_connector: &resolved::data_connector::DataConnector,
execution_span_attribute: String,
field_span_attribute: String,
project_id: Option<ProjectId>,
) -> Result<Vec<ndc::models::RowSet>, error::Error> {
let tracer = tracing_util::global_tracer();
tracer
@ -35,7 +37,8 @@ pub async fn execute_ndc_query<'n, 's>(
field_span_attribute,
);
let connector_response =
fetch_from_data_connector(http_client, query, data_connector).await?;
fetch_from_data_connector(http_client, query, data_connector, project_id)
.await?;
Ok(connector_response.0)
})
})
@ -46,6 +49,7 @@ pub(crate) async fn fetch_from_data_connector<'s>(
http_client: &reqwest::Client,
query_request: ndc::models::QueryRequest,
data_connector: &resolved::data_connector::DataConnector,
project_id: Option<ProjectId>,
) -> Result<ndc::models::QueryResponse, error::Error> {
let tracer = tracing_util::global_tracer();
tracer
@ -54,12 +58,14 @@ pub(crate) async fn fetch_from_data_connector<'s>(
SpanVisibility::Internal,
|| {
Box::pin(async {
let headers =
append_project_id_to_headers(data_connector.headers.0.clone(), project_id)?;
let ndc_config = ndc::apis::configuration::Configuration {
base_path: data_connector.url.get_url(ast::OperationType::Query),
user_agent: None,
// This is isn't expensive, reqwest::Client is behind an Arc
client: http_client.clone(),
headers: data_connector.headers.0.clone(),
headers,
};
ndc::apis::default_api::query_post(&ndc_config, query_request)
.await
@ -70,6 +76,24 @@ pub(crate) async fn fetch_from_data_connector<'s>(
.await
}
// This function appends project-id (if present) to the HeaderMap defined by the data_connector object
pub fn append_project_id_to_headers(
mut headers: HeaderMap,
project_id: Option<ProjectId>,
) -> Result<HeaderMap, error::Error> {
match project_id {
None => Ok(headers),
Some(project_id) => {
headers.append(
"project-id",
reqwest::header::HeaderValue::from_str(&project_id.0)
.map_err(error::Error::ProjectIdConversionError)?,
);
Ok(headers)
}
}
}
/// Executes a NDC mutation
pub(crate) async fn execute_ndc_mutation<'n, 's, 'ir>(
http_client: &reqwest::Client,
@ -79,6 +103,7 @@ pub(crate) async fn execute_ndc_mutation<'n, 's, 'ir>(
execution_span_attribute: String,
field_span_attribute: String,
process_response_as: ProcessResponseAs<'ir>,
project_id: Option<ProjectId>,
) -> Result<json::Value, error::Error> {
let tracer = tracing_util::global_tracer();
tracer
@ -94,8 +119,13 @@ pub(crate) async fn execute_ndc_mutation<'n, 's, 'ir>(
"field",
field_span_attribute,
);
let connector_response =
fetch_from_data_connector_mutation(http_client, query, data_connector).await?;
let connector_response = fetch_from_data_connector_mutation(
http_client,
query,
data_connector,
project_id,
)
.await?;
// Post process the response to add the `__typename` fields
tracer.in_span("process_response", SpanVisibility::Internal, || {
// NOTE: NDC returns a `Vec<RowSet>` (to account for
@ -138,6 +168,7 @@ pub(crate) async fn fetch_from_data_connector_mutation<'s>(
http_client: &reqwest::Client,
query_request: ndc::models::MutationRequest,
data_connector: &resolved::data_connector::DataConnector,
project_id: Option<ProjectId>,
) -> Result<ndc::models::MutationResponse, error::Error> {
let tracer = tracing_util::global_tracer();
tracer
@ -146,14 +177,16 @@ pub(crate) async fn fetch_from_data_connector_mutation<'s>(
SpanVisibility::Internal,
|| {
Box::pin(async {
let gdc_config = ndc::apis::configuration::Configuration {
let headers =
append_project_id_to_headers(data_connector.headers.0.clone(), project_id)?;
let ndc_config = ndc::apis::configuration::Configuration {
base_path: data_connector.url.get_url(ast::OperationType::Mutation),
user_agent: None,
// This is isn't expensive, reqwest::Client is behind an Arc
client: http_client.clone(),
headers: data_connector.headers.0.clone(),
headers,
};
ndc::apis::default_api::mutation_post(&gdc_config, query_request)
ndc::apis::default_api::mutation_post(&ndc_config, query_request)
.await
.map_err(error::Error::from) // ndc_client::apis::Error -> InternalError -> Error
})

View File

@ -18,6 +18,7 @@ use super::ndc;
use super::process_response::process_response;
use super::remote_joins::execute_join_locations;
use super::remote_joins::types::{JoinId, JoinLocations, Location, MonotonicCounter, RemoteJoin};
use super::ProjectId;
use crate::metadata::resolved::{self, subgraph};
use crate::schema::GDS;
@ -414,6 +415,7 @@ impl ExecuteQueryResult {
async fn execute_field_plan<'n, 's, 'ir>(
http_client: &reqwest::Client,
field_plan: NodeQueryPlan<'n, 's, 'ir>,
project_id: Option<ProjectId>,
) -> RootFieldResult {
let tracer = tracing_util::global_tracer();
tracer
@ -467,17 +469,19 @@ async fn execute_field_plan<'n, 's, 'ir>(
}
NodeQueryPlan::NDCQueryExecution(ndc_query) => RootFieldResult::new(
&ndc_query.process_response_as.is_nullable(),
resolve_ndc_query_execution(http_client, ndc_query).await,
resolve_ndc_query_execution(http_client, ndc_query, project_id).await,
),
NodeQueryPlan::NDCMutationExecution(ndc_query) => RootFieldResult::new(
&ndc_query.process_response_as.is_nullable(),
resolve_ndc_mutation_execution(http_client, ndc_query).await,
resolve_ndc_mutation_execution(http_client, ndc_query, project_id)
.await,
),
NodeQueryPlan::RelayNodeSelect(optional_query) => RootFieldResult::new(
&optional_query.as_ref().map_or(true, |ndc_query| {
ndc_query.process_response_as.is_nullable()
}),
resolve_relay_node_select(http_client, optional_query).await,
resolve_relay_node_select(http_client, optional_query, project_id)
.await,
),
}
})
@ -489,6 +493,7 @@ async fn execute_field_plan<'n, 's, 'ir>(
pub async fn execute_query_plan<'n, 's, 'ir>(
http_client: &reqwest::Client,
query_plan: QueryPlan<'n, 's, 'ir>,
project_id: Option<ProjectId>,
) -> ExecuteQueryResult {
let mut root_fields = IndexMap::new();
@ -497,7 +502,12 @@ pub async fn execute_query_plan<'n, 's, 'ir>(
for (alias, field_plan) in query_plan.into_iter() {
// We are not running the field plans parallely here, we are just running them concurrently on a single thread.
// To run the field plans parallely, we will need to use tokio::spawn for each field plan.
let task = async { (alias, execute_field_plan(http_client, field_plan).await) };
let task = async {
(
alias,
execute_field_plan(http_client, field_plan, project_id.clone()).await,
)
};
tasks.push(task);
}
@ -548,6 +558,7 @@ fn resolve_schema_field(
async fn resolve_ndc_query_execution(
http_client: &reqwest::Client,
ndc_query: NDCQueryExecution<'_, '_>,
project_id: Option<ProjectId>,
) -> Result<json::Value, error::Error> {
let NDCQueryExecution {
execution_tree,
@ -562,6 +573,7 @@ async fn resolve_ndc_query_execution(
execution_tree.root_node.data_connector,
execution_span_attribute.clone(),
field_span_attribute.clone(),
project_id.clone(),
)
.await?;
// TODO: Failures in remote joins should result in partial response
@ -573,6 +585,7 @@ async fn resolve_ndc_query_execution(
&mut response,
&process_response_as,
execution_tree.remote_executions,
project_id,
)
.await?;
let result = process_response(selection_set, response, process_response_as)?;
@ -582,6 +595,7 @@ async fn resolve_ndc_query_execution(
async fn resolve_ndc_mutation_execution(
http_client: &reqwest::Client,
ndc_query: NDCMutationExecution<'_, '_, '_>,
project_id: Option<ProjectId>,
) -> Result<json::Value, error::Error> {
let NDCMutationExecution {
query,
@ -601,6 +615,7 @@ async fn resolve_ndc_mutation_execution(
execution_span_attribute,
field_span_attribute,
process_response_as,
project_id,
)
.await?;
Ok(json::to_value(response)?)
@ -609,9 +624,10 @@ async fn resolve_ndc_mutation_execution(
async fn resolve_relay_node_select(
http_client: &reqwest::Client,
optional_query: Option<NDCQueryExecution<'_, '_>>,
project_id: Option<ProjectId>,
) -> Result<json::Value, error::Error> {
match optional_query {
None => Ok(json::Value::Null),
Some(ndc_query) => resolve_ndc_query_execution(http_client, ndc_query).await,
Some(ndc_query) => resolve_ndc_query_execution(http_client, ndc_query, project_id).await,
}
}

View File

@ -10,9 +10,9 @@ use ndc_client as ndc;
use self::types::TargetField;
use super::error;
use super::ndc::execute_ndc_query;
use super::query_plan::ProcessResponseAs;
use super::{error, ProjectId};
use types::{
ArgumentId, Arguments, JoinId, JoinLocations, Location, MonotonicCounter, RemoteJoin,
@ -78,6 +78,7 @@ pub async fn execute_join_locations<'ir>(
lhs_response: &mut Vec<ndc::models::RowSet>,
lhs_response_type: &ProcessResponseAs,
join_locations: JoinLocations<(RemoteJoin<'async_recursion, 'ir>, JoinId)>,
project_id: Option<ProjectId>,
) -> Result<(), error::Error>
where
'ir: 'async_recursion,
@ -124,6 +125,7 @@ where
join_node.target_data_connector,
execution_span_attribute.clone(),
remote_alias.clone(),
project_id.clone(),
))
},
)
@ -140,6 +142,7 @@ where
&mut target_response,
&join_node.process_response_as,
sub_tree,
project_id.clone(),
)
.await?;
}

View File

@ -85,7 +85,8 @@ pub fn test_execution_expectation_legacy(test_path_string: &str, common_metadata
// Execute the test
let response = execute_query(&test_ctx.http_client, &schema, &session, raw_request).await;
let response =
execute_query(&test_ctx.http_client, &schema, &session, raw_request, None).await;
let mut expected = test_ctx
.mint
@ -156,8 +157,14 @@ pub fn test_execution_expectation(test_path_string: &str, common_metadata_paths:
// Execute the test
let mut responses = Vec::new();
for session in sessions.iter() {
let response =
execute_query(&test_ctx.http_client, &schema, session, raw_request.clone()).await;
let response = execute_query(
&test_ctx.http_client,
&schema,
session,
raw_request.clone(),
None,
)
.await;
responses.push(response.0);
}