From f088c127d788806114ddbeb7e63ea6c5ca182d9c Mon Sep 17 00:00:00 2001 From: Daniel Harvey Date: Sun, 1 Dec 2024 20:43:28 +0000 Subject: [PATCH] Make subscriptions use new execution steps (#1393) ### What Stacks on top of #1392 Make subscriptions use new execution steps. Functional no-op. We don't delete much yet because we need to move `explain` over first. V3_GIT_ORIGIN_REV_ID: acd88105599953b377225542eea6e4518e1501d0 --- v3/Cargo.lock | 1 + v3/crates/execute/src/execute.rs | 38 ++++++++++++++++++- v3/crates/execute/src/lib.rs | 2 +- v3/crates/graphql/frontend/src/lib.rs | 8 ++-- v3/crates/graphql/graphql-ws/Cargo.toml | 1 + .../graphql-ws/src/protocol/subscribe.rs | 20 +++++----- .../graphql/ir/src/plan/model_selection.rs | 1 + 7 files changed, 54 insertions(+), 17 deletions(-) diff --git a/v3/Cargo.lock b/v3/Cargo.lock index 0d3ca07377f..f91b69d3b94 100644 --- a/v3/Cargo.lock +++ b/v3/Cargo.lock @@ -2293,6 +2293,7 @@ dependencies = [ "execute", "futures-util", "graphql-frontend", + "graphql-ir", "graphql-schema", "hasura-authn", "hasura-authn-core", diff --git a/v3/crates/execute/src/execute.rs b/v3/crates/execute/src/execute.rs index 63d80d3e8e6..e7622b6d58b 100644 --- a/v3/crates/execute/src/execute.rs +++ b/v3/crates/execute/src/execute.rs @@ -2,6 +2,7 @@ // ideally we'll bin off everything around GraphQL-specific nodes and leave those to the GraphQL // frontend +use std::sync::Arc; use uuid::Uuid; mod ndc_request; mod remote_joins; @@ -12,8 +13,9 @@ use async_recursion::async_recursion; use engine_types::{HttpContext, ProjectId}; pub use ndc_request::{make_ndc_mutation_request, make_ndc_query_request}; use plan_types::{ - ExecutionTree, JoinLocations, NDCMutationExecution, NDCQueryExecution, PredicateQueryTrees, - ProcessResponseAs, QueryExecutionPlan, ResolvedFilterExpression, + ExecutionTree, JoinLocations, NDCMutationExecution, NDCQueryExecution, + NDCSubscriptionExecution, PredicateQueryTrees, ProcessResponseAs, QueryExecutionPlan, + ResolvedFilterExpression, }; use std::collections::BTreeMap; @@ -240,3 +242,35 @@ pub async fn resolve_ndc_mutation_execution( Ok(mutation_response) } + +/// A subscription NDC query. +/// Contains required information to execute a NDC query for a subscription in a polling loop. +pub struct NDCSubscriptionQuery { + pub query_request: ndc::NdcQueryRequest, + pub data_connector: Arc, + pub process_response_as: ProcessResponseAs, + pub polling_interval_ms: u64, +} + +/// Resolve a subscription execution plan to a NDC query. +pub async fn resolve_ndc_subscription_execution<'s, 'ir>( + execution: NDCSubscriptionExecution, +) -> Result { + let NDCSubscriptionExecution { + query_execution_plan, + execution_span_attribute: _, + field_span_attribute: _, + process_response_as, + polling_interval_ms, + } = execution; + // Remote relationships and relationships without NDC comparison capability are not allowed in predicates for subscriptions. + // Only allow local relationships and fields that can be pushed down to NDC. + let data_connector = query_execution_plan.data_connector.clone(); + let query_request = make_ndc_query_request(query_execution_plan)?; + Ok(NDCSubscriptionQuery { + query_request, + data_connector, + process_response_as, + polling_interval_ms, + }) +} diff --git a/v3/crates/execute/src/lib.rs b/v3/crates/execute/src/lib.rs index 2ce44d8db10..2d55cf829e4 100644 --- a/v3/crates/execute/src/lib.rs +++ b/v3/crates/execute/src/lib.rs @@ -10,7 +10,7 @@ mod remote_joins; pub use error::{FieldError, QueryUsageAnalyzeError, RequestError}; pub use execute::{ make_ndc_mutation_request, make_ndc_query_request, resolve_ndc_mutation_execution, - resolve_ndc_query_execution, + resolve_ndc_query_execution, resolve_ndc_subscription_execution, }; pub use ndc::fetch_from_data_connector; pub use plan::error::Error as PlanError; diff --git a/v3/crates/graphql/frontend/src/lib.rs b/v3/crates/graphql/frontend/src/lib.rs index 05b28990316..0bd272cd67c 100644 --- a/v3/crates/graphql/frontend/src/lib.rs +++ b/v3/crates/graphql/frontend/src/lib.rs @@ -5,16 +5,14 @@ mod steps; mod to_opendd_ir; mod types; +pub use execute::{execute_mutation_plan, execute_query_plan}; pub use explain::execute_explain; pub use explain::types::{redact_ndc_explain, ExplainResponse}; -pub use steps::{ - build_ir, build_request_plan_with_old, generate_ir, normalize_request, parse_query, -}; -pub use to_opendd_ir::to_opendd_ir; - pub use query::{ execute_query, execute_query_internal, set_request_metadata_attributes, set_usage_attributes, }; +pub use steps::{build_ir, build_request_plan, generate_ir, normalize_request, parse_query}; +pub use to_opendd_ir::to_opendd_ir; pub use types::{GraphQLErrors, GraphQLResponse}; #[cfg(test)] diff --git a/v3/crates/graphql/graphql-ws/Cargo.toml b/v3/crates/graphql/graphql-ws/Cargo.toml index b786c0a3677..6c81bd46d5d 100644 --- a/v3/crates/graphql/graphql-ws/Cargo.toml +++ b/v3/crates/graphql/graphql-ws/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true engine-types = { path = "../../engine-types" } execute = { path = "../../execute" } graphql-frontend = { path = "../frontend" } +graphql-ir = { path = "../ir" } graphql-schema = { path = "../schema" } hasura-authn = { path = "../../auth/hasura-authn" } hasura-authn-core = { path = "../../auth/hasura-authn-core" } diff --git a/v3/crates/graphql/graphql-ws/src/protocol/subscribe.rs b/v3/crates/graphql/graphql-ws/src/protocol/subscribe.rs index 8cc6c125fc1..c6ee188fc04 100644 --- a/v3/crates/graphql/graphql-ws/src/protocol/subscribe.rs +++ b/v3/crates/graphql/graphql-ws/src/protocol/subscribe.rs @@ -2,6 +2,7 @@ use axum::http; use blake2::{Blake2b, Digest}; use engine_types::ExposeInternalErrors; use execute::{self, plan}; +use graphql_ir::RequestPlan; use hasura_authn_core::Session; use indexmap::IndexMap; use nonempty::NonEmpty; @@ -270,7 +271,7 @@ pub async fn execute_query_internal( // Generate Intermediate Representation (IR) from the query. let ir = graphql_frontend::build_ir(schema, &session, &headers, &normalized_request)?; // Build a request plan based on the IR. - let request_plan = graphql_frontend::build_request_plan_with_old(&ir)?; + let request_plan = graphql_frontend::build_request_plan(&ir)?; let display_name = match normalized_request.name { Some(ref name) => std::borrow::Cow::Owned(format!("Execute {name}")), @@ -310,16 +311,17 @@ async fn execute( session: Session, headers: http::HeaderMap, raw_request: lang_graphql::http::RawRequest, - request_plan: execute::RequestPlan<'_, '_, '_>, + request_plan: RequestPlan<'_, '_, '_>, ) { let project_id = connection.context.project_id.as_ref(); let http_context = &connection.context.http_context; let expose_internal_errors = connection.context.expose_internal_errors; match request_plan { // Handle mutations. - plan::RequestPlan::MutationPlan(mutation_plan) => { + RequestPlan::MutationPlan(mutation_plan) => { let execute_query_result = - plan::execute_mutation_plan(http_context, mutation_plan, project_id).await; + graphql_frontend::execute_mutation_plan(http_context, mutation_plan, project_id) + .await; send_single_result_operation_response( operation_id, &raw_request, @@ -332,9 +334,9 @@ async fn execute( .await; } // Handle queries. - plan::RequestPlan::QueryPlan(query_plan) => { + RequestPlan::QueryPlan(query_plan) => { let execute_query_result = - plan::execute_query_plan(http_context, query_plan, project_id).await; + graphql_frontend::execute_query_plan(http_context, query_plan, project_id).await; send_single_result_operation_response( operation_id, &raw_request, @@ -347,12 +349,12 @@ async fn execute( .await; } // Handle subscriptions by starting a polling loop to repeatedly fetch data. - plan::RequestPlan::SubscriptionPlan(alias, plan) => { - match plan::resolve_ndc_subscription_execution(plan).await { + RequestPlan::SubscriptionPlan(alias, plan) => { + match execute::resolve_ndc_subscription_execution(plan.subscription_execution).await { Ok(ndc_subscription) => { let query_request = ndc_subscription.query_request; let data_connector = ndc_subscription.data_connector; - let selection_set = ndc_subscription.selection_set; + let selection_set = plan.selection_set; let process_response_as = ndc_subscription.process_response_as; let is_nullable = process_response_as.is_nullable(); let polling_interval_duration = diff --git a/v3/crates/graphql/ir/src/plan/model_selection.rs b/v3/crates/graphql/ir/src/plan/model_selection.rs index 8168df14a90..52fa1573852 100644 --- a/v3/crates/graphql/ir/src/plan/model_selection.rs +++ b/v3/crates/graphql/ir/src/plan/model_selection.rs @@ -32,6 +32,7 @@ pub(crate) fn plan_query_node( ir.data_connector.capabilities.supported_ndc_version, relationships, )?; + query_fields = Some(fields); join_locations = locations; remote_predicates = selection_set_remote_predicates;