From b1edee94f8941ce864a1199f4033b27e11bea172 Mon Sep 17 00:00:00 2001 From: Nicu Reut Date: Tue, 15 Nov 2022 09:50:21 +0100 Subject: [PATCH] Add gRPC server interceptor with golden metrics to all gRPC services [PLEN-10] (#15477) --- .../com/daml/metrics/HttpJsonApiMetrics.scala | 7 +--- .../main/scala/com/daml/metrics/Metrics.scala | 39 +++++-------------- .../metrics/OpenTelemetryMeterOwner.scala | 30 +++++++++++++- .../opentelemetry/OpenTelemetryFactory.scala | 12 +++--- .../metrics/grpc/DamlGrpcServerMetrics.scala | 36 +++++++++++++++++ .../grpc/GrpcMetricsServerInterceptor.scala | 4 +- .../GrpcMetricsServerInterceptorSpec.scala | 6 +-- .../scala/platform/apiserver/GrpcServer.scala | 5 ++- 8 files changed, 89 insertions(+), 50 deletions(-) create mode 100644 ledger/metrics/src/main/scala/com/daml/metrics/grpc/DamlGrpcServerMetrics.scala diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/HttpJsonApiMetrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/HttpJsonApiMetrics.scala index a7ced6e764..b47d14cada 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/HttpJsonApiMetrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/HttpJsonApiMetrics.scala @@ -8,12 +8,11 @@ import com.daml.metrics.api.MetricHandle.{Counter, Meter, Timer} import com.daml.metrics.api.MetricName import com.daml.metrics.api.dropwizard.DropwizardFactory import com.daml.metrics.api.opentelemetry.OpenTelemetryFactory -import io.opentelemetry.api.metrics.{Meter => OtelMeter} class HttpJsonApiMetrics( val prefix: MetricName, val registry: MetricRegistry, - val otelMeter: OtelMeter, + val openTelemetryFactory: OpenTelemetryFactory, ) { object Db extends DropwizardFactory { @@ -83,10 +82,6 @@ class HttpJsonApiMetrics( val allocatePartyThroughput: Meter = dropwizardFactory.meter(prefix :+ "allocation_party_throughput") - val openTelemetryFactory = new OpenTelemetryFactory { - override val otelMeter: OtelMeter = HttpJsonApiMetrics.this.otelMeter - } - // golden signals val httpRequestsTotal: Meter = openTelemetryFactory.meter(prefix :+ "requests_total") val httpErrorsTotal: Meter = openTelemetryFactory.meter(prefix :+ "errors_total") diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala index b289405f75..f0e83837a7 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/Metrics.scala @@ -4,12 +4,12 @@ package com.daml.metrics import com.codahale.metrics.MetricRegistry -import io.opentelemetry.api.GlobalOpenTelemetry -import io.opentelemetry.api.metrics.{Meter => OtelMeter} -import com.daml.metrics.api.{MetricHandle, MetricName} +import com.daml.metrics.api.MetricName import com.daml.metrics.api.dropwizard.DropwizardFactory import com.daml.metrics.api.opentelemetry.OpenTelemetryFactory -import com.daml.metrics.grpc.GrpcServerMetrics +import com.daml.metrics.grpc.DamlGrpcServerMetrics +import io.opentelemetry.api.GlobalOpenTelemetry +import io.opentelemetry.api.metrics.{Meter => OtelMeter} object Metrics { lazy val ForTesting = new Metrics(new MetricRegistry, GlobalOpenTelemetry.getMeter("test")) @@ -18,6 +18,8 @@ object Metrics { final class Metrics(override val registry: MetricRegistry, val otelMeter: OtelMeter) extends DropwizardFactory { + val openTelemetryFactory: OpenTelemetryFactory = new OpenTelemetryFactory(otelMeter) + object test { private val prefix: MetricName = MetricName("test") @@ -47,33 +49,10 @@ final class Metrics(override val registry: MetricRegistry, val otelMeter: OtelMe object services extends ServicesMetrics(prefix :+ "services", registry) - object HttpJsonApi extends HttpJsonApiMetrics(prefix :+ "http_json_api", registry, otelMeter) + object HttpJsonApi + extends HttpJsonApiMetrics(prefix :+ "http_json_api", registry, openTelemetryFactory) - object grpc extends OpenTelemetryFactory with GrpcServerMetrics { - - private val grpcServerMetricsPrefix = prefix :+ "grpc" :+ "server" - - override def otelMeter: OtelMeter = Metrics.this.otelMeter - override val callTimer: MetricHandle.Timer = timer(grpcServerMetricsPrefix) - override val messagesSent: MetricHandle.Meter = meter( - grpcServerMetricsPrefix :+ "messages" :+ "sent" - ) - override val messagesReceived: MetricHandle.Meter = meter( - grpcServerMetricsPrefix :+ "messages" :+ "received" - ) - override val messagesSentSize: MetricHandle.Histogram = histogram( - grpcServerMetricsPrefix :+ "messages" :+ "sent" :+ "bytes" - ) - override val messagesReceivedSize: MetricHandle.Histogram = histogram( - grpcServerMetricsPrefix :+ "messages" :+ "received" :+ "bytes" - ) - override val callsStarted: MetricHandle.Meter = meter( - grpcServerMetricsPrefix :+ "started" - ) - override val callsFinished: MetricHandle.Meter = meter( - grpcServerMetricsPrefix :+ "handled" - ) - } + object grpc extends DamlGrpcServerMetrics(openTelemetryFactory, "participant") } } diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/OpenTelemetryMeterOwner.scala b/ledger/metrics/src/main/scala/com/daml/metrics/OpenTelemetryMeterOwner.scala index 42787390a4..73a8dc9f21 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/OpenTelemetryMeterOwner.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/OpenTelemetryMeterOwner.scala @@ -4,14 +4,18 @@ package com.daml.metrics import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner} +import com.daml.metrics.OpenTelemetryMeterOwner.buildProviderWithViews import com.daml.metrics.api.reporters.MetricsReporter import com.daml.metrics.api.reporters.MetricsReporter.Prometheus import io.opentelemetry.api.metrics.Meter import io.opentelemetry.exporter.prometheus.PrometheusCollector -import io.opentelemetry.sdk.metrics.SdkMeterProvider +import io.opentelemetry.sdk.metrics.common.InstrumentType +import io.opentelemetry.sdk.metrics.view.{Aggregation, InstrumentSelector, View} +import io.opentelemetry.sdk.metrics.{SdkMeterProvider, SdkMeterProviderBuilder} import scala.annotation.nowarn import scala.concurrent.Future +import scala.jdk.CollectionConverters.SeqHasAsJava @nowarn("msg=deprecated") case class OpenTelemetryMeterOwner(enabled: Boolean, reporter: Option[MetricsReporter]) @@ -20,7 +24,7 @@ case class OpenTelemetryMeterOwner(enabled: Boolean, reporter: Option[MetricsRep override def acquire()(implicit context: ResourceContext ): Resource[Meter] = { - val meterProviderBuilder = SdkMeterProvider.builder() + val meterProviderBuilder = buildProviderWithViews /* To integrate with prometheus we're using the deprecated [[PrometheusCollector]]. * More details about the deprecation here: https://github.com/open-telemetry/opentelemetry-java/issues/4284 @@ -41,3 +45,25 @@ case class OpenTelemetryMeterOwner(enabled: Boolean, reporter: Option[MetricsRep } } +object OpenTelemetryMeterOwner { + + def buildProviderWithViews: SdkMeterProviderBuilder = + SdkMeterProvider + .builder() + .registerView( + InstrumentSelector + .builder() + .setType(InstrumentType.HISTOGRAM) + .setName((t: String) => t.endsWith("duration.ms")) + .build(), + View + .builder() + .setAggregation( + Aggregation.explicitBucketHistogram( + Seq(5d, 10d, 15d, 25d, 35d, 50d, 75d, 100d, 150d, 200d, 250d, 350d, 500d, 750d, + 1_000d, 2_500d, 5_000d, 7_500d, 10_000d, 25_000d, 50_000d).map(Double.box).asJava + ) + ) + .build(), + ) +} diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/api/opentelemetry/OpenTelemetryFactory.scala b/ledger/metrics/src/main/scala/com/daml/metrics/api/opentelemetry/OpenTelemetryFactory.scala index b639ead330..edcc7b4581 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/api/opentelemetry/OpenTelemetryFactory.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/api/opentelemetry/OpenTelemetryFactory.scala @@ -19,24 +19,24 @@ import io.opentelemetry.api.metrics.{ Meter => OtelMeter, } -trait OpenTelemetryFactory extends Factory { +class OpenTelemetryFactory(otelMeter: OtelMeter) extends Factory { val globalMetricsContext: MetricsContext = MetricsContext( Map("daml_version" -> BuildInfo.Version) ) - def otelMeter: OtelMeter - override def timer( name: MetricName )(implicit context: MetricsContext = MetricsContext.Empty - ): MetricHandle.Timer = + ): MetricHandle.Timer = { + val nameWithSuffix = name :+ "duration" :+ "ms" OpenTelemetryTimer( - name, - otelMeter.histogramBuilder(name).ofLongs().setUnit("ms").build(), + nameWithSuffix, + otelMeter.histogramBuilder(nameWithSuffix).ofLongs().setUnit("ms").build(), globalMetricsContext.merge(context), ) + } override def gauge[T](name: MetricName, initial: T)(implicit context: MetricsContext = MetricsContext.Empty ): MetricHandle.Gauge[T] = { diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/grpc/DamlGrpcServerMetrics.scala b/ledger/metrics/src/main/scala/com/daml/metrics/grpc/DamlGrpcServerMetrics.scala new file mode 100644 index 0000000000..47b721e087 --- /dev/null +++ b/ledger/metrics/src/main/scala/com/daml/metrics/grpc/DamlGrpcServerMetrics.scala @@ -0,0 +1,36 @@ +// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.metrics.grpc + +import com.daml.metrics.api.MetricHandle.Factory +import com.daml.metrics.api.{MetricHandle, MetricName, MetricsContext} + +class DamlGrpcServerMetrics(metricsFactory: Factory, component: String) extends GrpcServerMetrics { + + private val grpcServerMetricsPrefix = MetricName.Daml :+ "grpc" :+ "server" + private implicit val metricsContext: MetricsContext = MetricsContext( + Map("service" -> component) + ) + + override val callTimer: MetricHandle.Timer = metricsFactory.timer(grpcServerMetricsPrefix) + override val messagesSent: MetricHandle.Meter = metricsFactory.meter( + grpcServerMetricsPrefix :+ "messages" :+ "sent" + ) + override val messagesReceived: MetricHandle.Meter = metricsFactory.meter( + grpcServerMetricsPrefix :+ "messages" :+ "received" + ) + override val messagesSentSize: MetricHandle.Histogram = metricsFactory.histogram( + grpcServerMetricsPrefix :+ "messages" :+ "sent" :+ "bytes" + ) + override val messagesReceivedSize: MetricHandle.Histogram = metricsFactory.histogram( + grpcServerMetricsPrefix :+ "messages" :+ "received" :+ "bytes" + ) + override val callsStarted: MetricHandle.Meter = metricsFactory.meter( + grpcServerMetricsPrefix :+ "started" + ) + override val callsHandled: MetricHandle.Meter = metricsFactory.meter( + grpcServerMetricsPrefix :+ "handled" + ) + +} diff --git a/ledger/metrics/src/main/scala/com/daml/metrics/grpc/GrpcMetricsServerInterceptor.scala b/ledger/metrics/src/main/scala/com/daml/metrics/grpc/GrpcMetricsServerInterceptor.scala index 852659be91..052a6da5a0 100644 --- a/ledger/metrics/src/main/scala/com/daml/metrics/grpc/GrpcMetricsServerInterceptor.scala +++ b/ledger/metrics/src/main/scala/com/daml/metrics/grpc/GrpcMetricsServerInterceptor.scala @@ -56,7 +56,7 @@ class GrpcMetricsServerInterceptor(metrics: GrpcServerMetrics) extends ServerInt timerHandle, metrics.messagesSent, metrics.messagesSentSize, - metrics.callsFinished, + metrics.callsHandled, ) new MonitoringServerCallListener( serverCallHandler.startCall(metricsServerCall, metadata), @@ -138,5 +138,5 @@ trait GrpcServerMetrics { val messagesReceived: Meter val messagesReceivedSize: Histogram val callsStarted: Meter - val callsFinished: Meter + val callsHandled: Meter } diff --git a/ledger/metrics/src/test/suite/scala/com/daml/metrics/grpc/GrpcMetricsServerInterceptorSpec.scala b/ledger/metrics/src/test/suite/scala/com/daml/metrics/grpc/GrpcMetricsServerInterceptorSpec.scala index 9bc5c7c76f..946f55c590 100644 --- a/ledger/metrics/src/test/suite/scala/com/daml/metrics/grpc/GrpcMetricsServerInterceptorSpec.scala +++ b/ledger/metrics/src/test/suite/scala/com/daml/metrics/grpc/GrpcMetricsServerInterceptorSpec.scala @@ -59,7 +59,7 @@ class GrpcMetricsServerInterceptorSpec metrics.callsStarted.valueWithContext should contain theSameElementsAs Map( labelsForSimpleRequest -> 1 ) - metrics.callsFinished.valueWithContext should contain theSameElementsAs Map( + metrics.callsHandled.valueWithContext should contain theSameElementsAs Map( labelsForSimpleRequestWithStatusCode -> 1 ) } @@ -84,7 +84,7 @@ class GrpcMetricsServerInterceptorSpec } meterHasValueForStreaming(metrics.callsStarted) meterHasValueForStreaming( - metrics.callsFinished, + metrics.callsHandled, withStreamingLabels(labelsForSimpleRequestWithStatusCode), ) meterHasValueForStreaming(metrics.messagesSent, value = 3) @@ -145,6 +145,6 @@ object GrpcMetricsServerInterceptorSpec { override val messagesReceivedSize: MetricHandle.Histogram = InMemoryMetricsFactory.histogram(metricName) override val callsStarted: MetricHandle.Meter = InMemoryMetricsFactory.meter(metricName) - override val callsFinished: MetricHandle.Meter = InMemoryMetricsFactory.meter(metricName) + override val callsHandled: MetricHandle.Meter = InMemoryMetricsFactory.meter(metricName) } } diff --git a/ledger/participant-integration-api/src/main/scala/platform/apiserver/GrpcServer.scala b/ledger/participant-integration-api/src/main/scala/platform/apiserver/GrpcServer.scala index d9b805f4de..360b4868aa 100644 --- a/ledger/participant-integration-api/src/main/scala/platform/apiserver/GrpcServer.scala +++ b/ledger/participant-integration-api/src/main/scala/platform/apiserver/GrpcServer.scala @@ -11,11 +11,13 @@ import com.google.protobuf.Message import io.grpc._ import io.grpc.netty.NettyServerBuilder import io.netty.handler.ssl.SslContext - import java.io.IOException import java.net.{BindException, InetAddress, InetSocketAddress} import java.util.concurrent.Executor import java.util.concurrent.TimeUnit.SECONDS + +import com.daml.metrics.grpc.GrpcMetricsServerInterceptor + import scala.concurrent.duration.DurationInt import scala.util.Failure import scala.util.control.NoStackTrace @@ -48,6 +50,7 @@ private[apiserver] object GrpcServer { builder.maxInboundMessageSize(maxInboundMessageSize) // NOTE: Interceptors run in the reverse order in which they were added. interceptors.foreach(builder.intercept) + builder.intercept(new GrpcMetricsServerInterceptor(metrics.daml.grpc)) builder.intercept(new MetricsInterceptor(metrics)) builder.intercept(new TruncatedStatusInterceptor(MaximumStatusDescriptionLength)) builder.intercept(new ErrorInterceptor)