Make subscriptions use new execution steps (#1393)

<!-- The PR description should answer 2 important questions: -->

### What

Stacks on top of #1392

Make subscriptions use new execution steps. Functional no-op. We don't
delete much yet because we need to move `explain` over first.

V3_GIT_ORIGIN_REV_ID: acd88105599953b377225542eea6e4518e1501d0
This commit is contained in:
Daniel Harvey 2024-12-01 20:43:28 +00:00 committed by hasura-bot
parent 39778d95c7
commit f088c127d7
7 changed files with 54 additions and 17 deletions

1
v3/Cargo.lock generated
View File

@ -2293,6 +2293,7 @@ dependencies = [
"execute",
"futures-util",
"graphql-frontend",
"graphql-ir",
"graphql-schema",
"hasura-authn",
"hasura-authn-core",

View File

@ -2,6 +2,7 @@
// ideally we'll bin off everything around GraphQL-specific nodes and leave those to the GraphQL
// frontend
use std::sync::Arc;
use uuid::Uuid;
mod ndc_request;
mod remote_joins;
@ -12,8 +13,9 @@ use async_recursion::async_recursion;
use engine_types::{HttpContext, ProjectId};
pub use ndc_request::{make_ndc_mutation_request, make_ndc_query_request};
use plan_types::{
ExecutionTree, JoinLocations, NDCMutationExecution, NDCQueryExecution, PredicateQueryTrees,
ProcessResponseAs, QueryExecutionPlan, ResolvedFilterExpression,
ExecutionTree, JoinLocations, NDCMutationExecution, NDCQueryExecution,
NDCSubscriptionExecution, PredicateQueryTrees, ProcessResponseAs, QueryExecutionPlan,
ResolvedFilterExpression,
};
use std::collections::BTreeMap;
@ -240,3 +242,35 @@ pub async fn resolve_ndc_mutation_execution(
Ok(mutation_response)
}
/// A subscription NDC query.
/// Contains required information to execute a NDC query for a subscription in a polling loop.
pub struct NDCSubscriptionQuery {
pub query_request: ndc::NdcQueryRequest,
pub data_connector: Arc<metadata_resolve::DataConnectorLink>,
pub process_response_as: ProcessResponseAs,
pub polling_interval_ms: u64,
}
/// Resolve a subscription execution plan to a NDC query.
pub async fn resolve_ndc_subscription_execution<'s, 'ir>(
execution: NDCSubscriptionExecution,
) -> Result<NDCSubscriptionQuery, FieldError> {
let NDCSubscriptionExecution {
query_execution_plan,
execution_span_attribute: _,
field_span_attribute: _,
process_response_as,
polling_interval_ms,
} = execution;
// Remote relationships and relationships without NDC comparison capability are not allowed in predicates for subscriptions.
// Only allow local relationships and fields that can be pushed down to NDC.
let data_connector = query_execution_plan.data_connector.clone();
let query_request = make_ndc_query_request(query_execution_plan)?;
Ok(NDCSubscriptionQuery {
query_request,
data_connector,
process_response_as,
polling_interval_ms,
})
}

View File

@ -10,7 +10,7 @@ mod remote_joins;
pub use error::{FieldError, QueryUsageAnalyzeError, RequestError};
pub use execute::{
make_ndc_mutation_request, make_ndc_query_request, resolve_ndc_mutation_execution,
resolve_ndc_query_execution,
resolve_ndc_query_execution, resolve_ndc_subscription_execution,
};
pub use ndc::fetch_from_data_connector;
pub use plan::error::Error as PlanError;

View File

@ -5,16 +5,14 @@ mod steps;
mod to_opendd_ir;
mod types;
pub use execute::{execute_mutation_plan, execute_query_plan};
pub use explain::execute_explain;
pub use explain::types::{redact_ndc_explain, ExplainResponse};
pub use steps::{
build_ir, build_request_plan_with_old, generate_ir, normalize_request, parse_query,
};
pub use to_opendd_ir::to_opendd_ir;
pub use query::{
execute_query, execute_query_internal, set_request_metadata_attributes, set_usage_attributes,
};
pub use steps::{build_ir, build_request_plan, generate_ir, normalize_request, parse_query};
pub use to_opendd_ir::to_opendd_ir;
pub use types::{GraphQLErrors, GraphQLResponse};
#[cfg(test)]

View File

@ -8,6 +8,7 @@ license.workspace = true
engine-types = { path = "../../engine-types" }
execute = { path = "../../execute" }
graphql-frontend = { path = "../frontend" }
graphql-ir = { path = "../ir" }
graphql-schema = { path = "../schema" }
hasura-authn = { path = "../../auth/hasura-authn" }
hasura-authn-core = { path = "../../auth/hasura-authn-core" }

View File

@ -2,6 +2,7 @@ use axum::http;
use blake2::{Blake2b, Digest};
use engine_types::ExposeInternalErrors;
use execute::{self, plan};
use graphql_ir::RequestPlan;
use hasura_authn_core::Session;
use indexmap::IndexMap;
use nonempty::NonEmpty;
@ -270,7 +271,7 @@ pub async fn execute_query_internal<M: WebSocketMetrics>(
// Generate Intermediate Representation (IR) from the query.
let ir = graphql_frontend::build_ir(schema, &session, &headers, &normalized_request)?;
// Build a request plan based on the IR.
let request_plan = graphql_frontend::build_request_plan_with_old(&ir)?;
let request_plan = graphql_frontend::build_request_plan(&ir)?;
let display_name = match normalized_request.name {
Some(ref name) => std::borrow::Cow::Owned(format!("Execute {name}")),
@ -310,16 +311,17 @@ async fn execute<M: WebSocketMetrics>(
session: Session,
headers: http::HeaderMap,
raw_request: lang_graphql::http::RawRequest,
request_plan: execute::RequestPlan<'_, '_, '_>,
request_plan: RequestPlan<'_, '_, '_>,
) {
let project_id = connection.context.project_id.as_ref();
let http_context = &connection.context.http_context;
let expose_internal_errors = connection.context.expose_internal_errors;
match request_plan {
// Handle mutations.
plan::RequestPlan::MutationPlan(mutation_plan) => {
RequestPlan::MutationPlan(mutation_plan) => {
let execute_query_result =
plan::execute_mutation_plan(http_context, mutation_plan, project_id).await;
graphql_frontend::execute_mutation_plan(http_context, mutation_plan, project_id)
.await;
send_single_result_operation_response(
operation_id,
&raw_request,
@ -332,9 +334,9 @@ async fn execute<M: WebSocketMetrics>(
.await;
}
// Handle queries.
plan::RequestPlan::QueryPlan(query_plan) => {
RequestPlan::QueryPlan(query_plan) => {
let execute_query_result =
plan::execute_query_plan(http_context, query_plan, project_id).await;
graphql_frontend::execute_query_plan(http_context, query_plan, project_id).await;
send_single_result_operation_response(
operation_id,
&raw_request,
@ -347,12 +349,12 @@ async fn execute<M: WebSocketMetrics>(
.await;
}
// Handle subscriptions by starting a polling loop to repeatedly fetch data.
plan::RequestPlan::SubscriptionPlan(alias, plan) => {
match plan::resolve_ndc_subscription_execution(plan).await {
RequestPlan::SubscriptionPlan(alias, plan) => {
match execute::resolve_ndc_subscription_execution(plan.subscription_execution).await {
Ok(ndc_subscription) => {
let query_request = ndc_subscription.query_request;
let data_connector = ndc_subscription.data_connector;
let selection_set = ndc_subscription.selection_set;
let selection_set = plan.selection_set;
let process_response_as = ndc_subscription.process_response_as;
let is_nullable = process_response_as.is_nullable();
let polling_interval_duration =

View File

@ -32,6 +32,7 @@ pub(crate) fn plan_query_node(
ir.data_connector.capabilities.supported_ndc_version,
relationships,
)?;
query_fields = Some(fields);
join_locations = locations;
remote_predicates = selection_set_remote_predicates;