mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Add gRPC server interceptor with golden metrics to all gRPC services [PLEN-10] (#15477)
This commit is contained in:
parent
d071e5e505
commit
b1edee94f8
@ -8,12 +8,11 @@ import com.daml.metrics.api.MetricHandle.{Counter, Meter, Timer}
|
||||
import com.daml.metrics.api.MetricName
|
||||
import com.daml.metrics.api.dropwizard.DropwizardFactory
|
||||
import com.daml.metrics.api.opentelemetry.OpenTelemetryFactory
|
||||
import io.opentelemetry.api.metrics.{Meter => OtelMeter}
|
||||
|
||||
class HttpJsonApiMetrics(
|
||||
val prefix: MetricName,
|
||||
val registry: MetricRegistry,
|
||||
val otelMeter: OtelMeter,
|
||||
val openTelemetryFactory: OpenTelemetryFactory,
|
||||
) {
|
||||
|
||||
object Db extends DropwizardFactory {
|
||||
@ -83,10 +82,6 @@ class HttpJsonApiMetrics(
|
||||
val allocatePartyThroughput: Meter =
|
||||
dropwizardFactory.meter(prefix :+ "allocation_party_throughput")
|
||||
|
||||
val openTelemetryFactory = new OpenTelemetryFactory {
|
||||
override val otelMeter: OtelMeter = HttpJsonApiMetrics.this.otelMeter
|
||||
}
|
||||
|
||||
// golden signals
|
||||
val httpRequestsTotal: Meter = openTelemetryFactory.meter(prefix :+ "requests_total")
|
||||
val httpErrorsTotal: Meter = openTelemetryFactory.meter(prefix :+ "errors_total")
|
||||
|
@ -4,12 +4,12 @@
|
||||
package com.daml.metrics
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry
|
||||
import io.opentelemetry.api.metrics.{Meter => OtelMeter}
|
||||
import com.daml.metrics.api.{MetricHandle, MetricName}
|
||||
import com.daml.metrics.api.MetricName
|
||||
import com.daml.metrics.api.dropwizard.DropwizardFactory
|
||||
import com.daml.metrics.api.opentelemetry.OpenTelemetryFactory
|
||||
import com.daml.metrics.grpc.GrpcServerMetrics
|
||||
import com.daml.metrics.grpc.DamlGrpcServerMetrics
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry
|
||||
import io.opentelemetry.api.metrics.{Meter => OtelMeter}
|
||||
|
||||
object Metrics {
|
||||
lazy val ForTesting = new Metrics(new MetricRegistry, GlobalOpenTelemetry.getMeter("test"))
|
||||
@ -18,6 +18,8 @@ object Metrics {
|
||||
final class Metrics(override val registry: MetricRegistry, val otelMeter: OtelMeter)
|
||||
extends DropwizardFactory {
|
||||
|
||||
val openTelemetryFactory: OpenTelemetryFactory = new OpenTelemetryFactory(otelMeter)
|
||||
|
||||
object test {
|
||||
private val prefix: MetricName = MetricName("test")
|
||||
|
||||
@ -47,33 +49,10 @@ final class Metrics(override val registry: MetricRegistry, val otelMeter: OtelMe
|
||||
|
||||
object services extends ServicesMetrics(prefix :+ "services", registry)
|
||||
|
||||
object HttpJsonApi extends HttpJsonApiMetrics(prefix :+ "http_json_api", registry, otelMeter)
|
||||
object HttpJsonApi
|
||||
extends HttpJsonApiMetrics(prefix :+ "http_json_api", registry, openTelemetryFactory)
|
||||
|
||||
object grpc extends OpenTelemetryFactory with GrpcServerMetrics {
|
||||
|
||||
private val grpcServerMetricsPrefix = prefix :+ "grpc" :+ "server"
|
||||
|
||||
override def otelMeter: OtelMeter = Metrics.this.otelMeter
|
||||
override val callTimer: MetricHandle.Timer = timer(grpcServerMetricsPrefix)
|
||||
override val messagesSent: MetricHandle.Meter = meter(
|
||||
grpcServerMetricsPrefix :+ "messages" :+ "sent"
|
||||
)
|
||||
override val messagesReceived: MetricHandle.Meter = meter(
|
||||
grpcServerMetricsPrefix :+ "messages" :+ "received"
|
||||
)
|
||||
override val messagesSentSize: MetricHandle.Histogram = histogram(
|
||||
grpcServerMetricsPrefix :+ "messages" :+ "sent" :+ "bytes"
|
||||
)
|
||||
override val messagesReceivedSize: MetricHandle.Histogram = histogram(
|
||||
grpcServerMetricsPrefix :+ "messages" :+ "received" :+ "bytes"
|
||||
)
|
||||
override val callsStarted: MetricHandle.Meter = meter(
|
||||
grpcServerMetricsPrefix :+ "started"
|
||||
)
|
||||
override val callsFinished: MetricHandle.Meter = meter(
|
||||
grpcServerMetricsPrefix :+ "handled"
|
||||
)
|
||||
}
|
||||
object grpc extends DamlGrpcServerMetrics(openTelemetryFactory, "participant")
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -4,14 +4,18 @@
|
||||
package com.daml.metrics
|
||||
|
||||
import com.daml.ledger.resources.{Resource, ResourceContext, ResourceOwner}
|
||||
import com.daml.metrics.OpenTelemetryMeterOwner.buildProviderWithViews
|
||||
import com.daml.metrics.api.reporters.MetricsReporter
|
||||
import com.daml.metrics.api.reporters.MetricsReporter.Prometheus
|
||||
import io.opentelemetry.api.metrics.Meter
|
||||
import io.opentelemetry.exporter.prometheus.PrometheusCollector
|
||||
import io.opentelemetry.sdk.metrics.SdkMeterProvider
|
||||
import io.opentelemetry.sdk.metrics.common.InstrumentType
|
||||
import io.opentelemetry.sdk.metrics.view.{Aggregation, InstrumentSelector, View}
|
||||
import io.opentelemetry.sdk.metrics.{SdkMeterProvider, SdkMeterProviderBuilder}
|
||||
|
||||
import scala.annotation.nowarn
|
||||
import scala.concurrent.Future
|
||||
import scala.jdk.CollectionConverters.SeqHasAsJava
|
||||
|
||||
@nowarn("msg=deprecated")
|
||||
case class OpenTelemetryMeterOwner(enabled: Boolean, reporter: Option[MetricsReporter])
|
||||
@ -20,7 +24,7 @@ case class OpenTelemetryMeterOwner(enabled: Boolean, reporter: Option[MetricsRep
|
||||
override def acquire()(implicit
|
||||
context: ResourceContext
|
||||
): Resource[Meter] = {
|
||||
val meterProviderBuilder = SdkMeterProvider.builder()
|
||||
val meterProviderBuilder = buildProviderWithViews
|
||||
|
||||
/* To integrate with prometheus we're using the deprecated [[PrometheusCollector]].
|
||||
* More details about the deprecation here: https://github.com/open-telemetry/opentelemetry-java/issues/4284
|
||||
@ -41,3 +45,25 @@ case class OpenTelemetryMeterOwner(enabled: Boolean, reporter: Option[MetricsRep
|
||||
}
|
||||
|
||||
}
|
||||
object OpenTelemetryMeterOwner {
|
||||
|
||||
def buildProviderWithViews: SdkMeterProviderBuilder =
|
||||
SdkMeterProvider
|
||||
.builder()
|
||||
.registerView(
|
||||
InstrumentSelector
|
||||
.builder()
|
||||
.setType(InstrumentType.HISTOGRAM)
|
||||
.setName((t: String) => t.endsWith("duration.ms"))
|
||||
.build(),
|
||||
View
|
||||
.builder()
|
||||
.setAggregation(
|
||||
Aggregation.explicitBucketHistogram(
|
||||
Seq(5d, 10d, 15d, 25d, 35d, 50d, 75d, 100d, 150d, 200d, 250d, 350d, 500d, 750d,
|
||||
1_000d, 2_500d, 5_000d, 7_500d, 10_000d, 25_000d, 50_000d).map(Double.box).asJava
|
||||
)
|
||||
)
|
||||
.build(),
|
||||
)
|
||||
}
|
||||
|
@ -19,24 +19,24 @@ import io.opentelemetry.api.metrics.{
|
||||
Meter => OtelMeter,
|
||||
}
|
||||
|
||||
trait OpenTelemetryFactory extends Factory {
|
||||
class OpenTelemetryFactory(otelMeter: OtelMeter) extends Factory {
|
||||
|
||||
val globalMetricsContext: MetricsContext = MetricsContext(
|
||||
Map("daml_version" -> BuildInfo.Version)
|
||||
)
|
||||
|
||||
def otelMeter: OtelMeter
|
||||
|
||||
override def timer(
|
||||
name: MetricName
|
||||
)(implicit
|
||||
context: MetricsContext = MetricsContext.Empty
|
||||
): MetricHandle.Timer =
|
||||
): MetricHandle.Timer = {
|
||||
val nameWithSuffix = name :+ "duration" :+ "ms"
|
||||
OpenTelemetryTimer(
|
||||
name,
|
||||
otelMeter.histogramBuilder(name).ofLongs().setUnit("ms").build(),
|
||||
nameWithSuffix,
|
||||
otelMeter.histogramBuilder(nameWithSuffix).ofLongs().setUnit("ms").build(),
|
||||
globalMetricsContext.merge(context),
|
||||
)
|
||||
}
|
||||
override def gauge[T](name: MetricName, initial: T)(implicit
|
||||
context: MetricsContext = MetricsContext.Empty
|
||||
): MetricHandle.Gauge[T] = {
|
||||
|
@ -0,0 +1,36 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.metrics.grpc
|
||||
|
||||
import com.daml.metrics.api.MetricHandle.Factory
|
||||
import com.daml.metrics.api.{MetricHandle, MetricName, MetricsContext}
|
||||
|
||||
class DamlGrpcServerMetrics(metricsFactory: Factory, component: String) extends GrpcServerMetrics {
|
||||
|
||||
private val grpcServerMetricsPrefix = MetricName.Daml :+ "grpc" :+ "server"
|
||||
private implicit val metricsContext: MetricsContext = MetricsContext(
|
||||
Map("service" -> component)
|
||||
)
|
||||
|
||||
override val callTimer: MetricHandle.Timer = metricsFactory.timer(grpcServerMetricsPrefix)
|
||||
override val messagesSent: MetricHandle.Meter = metricsFactory.meter(
|
||||
grpcServerMetricsPrefix :+ "messages" :+ "sent"
|
||||
)
|
||||
override val messagesReceived: MetricHandle.Meter = metricsFactory.meter(
|
||||
grpcServerMetricsPrefix :+ "messages" :+ "received"
|
||||
)
|
||||
override val messagesSentSize: MetricHandle.Histogram = metricsFactory.histogram(
|
||||
grpcServerMetricsPrefix :+ "messages" :+ "sent" :+ "bytes"
|
||||
)
|
||||
override val messagesReceivedSize: MetricHandle.Histogram = metricsFactory.histogram(
|
||||
grpcServerMetricsPrefix :+ "messages" :+ "received" :+ "bytes"
|
||||
)
|
||||
override val callsStarted: MetricHandle.Meter = metricsFactory.meter(
|
||||
grpcServerMetricsPrefix :+ "started"
|
||||
)
|
||||
override val callsHandled: MetricHandle.Meter = metricsFactory.meter(
|
||||
grpcServerMetricsPrefix :+ "handled"
|
||||
)
|
||||
|
||||
}
|
@ -56,7 +56,7 @@ class GrpcMetricsServerInterceptor(metrics: GrpcServerMetrics) extends ServerInt
|
||||
timerHandle,
|
||||
metrics.messagesSent,
|
||||
metrics.messagesSentSize,
|
||||
metrics.callsFinished,
|
||||
metrics.callsHandled,
|
||||
)
|
||||
new MonitoringServerCallListener(
|
||||
serverCallHandler.startCall(metricsServerCall, metadata),
|
||||
@ -138,5 +138,5 @@ trait GrpcServerMetrics {
|
||||
val messagesReceived: Meter
|
||||
val messagesReceivedSize: Histogram
|
||||
val callsStarted: Meter
|
||||
val callsFinished: Meter
|
||||
val callsHandled: Meter
|
||||
}
|
||||
|
@ -59,7 +59,7 @@ class GrpcMetricsServerInterceptorSpec
|
||||
metrics.callsStarted.valueWithContext should contain theSameElementsAs Map(
|
||||
labelsForSimpleRequest -> 1
|
||||
)
|
||||
metrics.callsFinished.valueWithContext should contain theSameElementsAs Map(
|
||||
metrics.callsHandled.valueWithContext should contain theSameElementsAs Map(
|
||||
labelsForSimpleRequestWithStatusCode -> 1
|
||||
)
|
||||
}
|
||||
@ -84,7 +84,7 @@ class GrpcMetricsServerInterceptorSpec
|
||||
}
|
||||
meterHasValueForStreaming(metrics.callsStarted)
|
||||
meterHasValueForStreaming(
|
||||
metrics.callsFinished,
|
||||
metrics.callsHandled,
|
||||
withStreamingLabels(labelsForSimpleRequestWithStatusCode),
|
||||
)
|
||||
meterHasValueForStreaming(metrics.messagesSent, value = 3)
|
||||
@ -145,6 +145,6 @@ object GrpcMetricsServerInterceptorSpec {
|
||||
override val messagesReceivedSize: MetricHandle.Histogram =
|
||||
InMemoryMetricsFactory.histogram(metricName)
|
||||
override val callsStarted: MetricHandle.Meter = InMemoryMetricsFactory.meter(metricName)
|
||||
override val callsFinished: MetricHandle.Meter = InMemoryMetricsFactory.meter(metricName)
|
||||
override val callsHandled: MetricHandle.Meter = InMemoryMetricsFactory.meter(metricName)
|
||||
}
|
||||
}
|
||||
|
@ -11,11 +11,13 @@ import com.google.protobuf.Message
|
||||
import io.grpc._
|
||||
import io.grpc.netty.NettyServerBuilder
|
||||
import io.netty.handler.ssl.SslContext
|
||||
|
||||
import java.io.IOException
|
||||
import java.net.{BindException, InetAddress, InetSocketAddress}
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.TimeUnit.SECONDS
|
||||
|
||||
import com.daml.metrics.grpc.GrpcMetricsServerInterceptor
|
||||
|
||||
import scala.concurrent.duration.DurationInt
|
||||
import scala.util.Failure
|
||||
import scala.util.control.NoStackTrace
|
||||
@ -48,6 +50,7 @@ private[apiserver] object GrpcServer {
|
||||
builder.maxInboundMessageSize(maxInboundMessageSize)
|
||||
// NOTE: Interceptors run in the reverse order in which they were added.
|
||||
interceptors.foreach(builder.intercept)
|
||||
builder.intercept(new GrpcMetricsServerInterceptor(metrics.daml.grpc))
|
||||
builder.intercept(new MetricsInterceptor(metrics))
|
||||
builder.intercept(new TruncatedStatusInterceptor(MaximumStatusDescriptionLength))
|
||||
builder.intercept(new ErrorInterceptor)
|
||||
|
Loading…
Reference in New Issue
Block a user