mirror of
https://github.com/digital-asset/daml.git
synced 2024-09-20 01:07:18 +03:00
Use the testing metrics instead of opentelemetry testing sdk [PLEN-43] (#15555)
This commit is contained in:
parent
a10ffa819c
commit
5b958ae9b7
@ -5,7 +5,7 @@ package com.daml.metrics.api.testing
|
||||
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong, AtomicReference}
|
||||
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
|
||||
|
||||
import com.daml.metrics.api.MetricHandle.{Counter, Factory, Gauge, Histogram, Meter, Timer}
|
||||
import com.daml.metrics.api.testing.InMemoryMetricsFactory.{
|
||||
@ -65,35 +65,33 @@ object InMemoryMetricsFactory extends InMemoryMetricsFactory {
|
||||
}
|
||||
|
||||
case class InMemoryTimer(initialContext: MetricsContext) extends Timer {
|
||||
val runTimers: collection.concurrent.Map[MetricsContext, AtomicInteger] = TrieMap()
|
||||
val data = InMemoryHistogram(initialContext)
|
||||
|
||||
override def name: String = "test"
|
||||
|
||||
override def update(duration: Long, unit: TimeUnit)(implicit context: MetricsContext): Unit =
|
||||
incrementForContext(context)
|
||||
data.update(TimeUnit.MILLISECONDS.convert(duration, unit))
|
||||
|
||||
override def update(duration: Duration)(implicit context: MetricsContext): Unit =
|
||||
incrementForContext(context)
|
||||
data.update(TimeUnit.MILLISECONDS.convert(duration))
|
||||
|
||||
override def time[T](call: => T)(implicit context: MetricsContext): T = {
|
||||
incrementForContext(context)
|
||||
call
|
||||
val start = System.nanoTime()
|
||||
val result = call
|
||||
val end = System.nanoTime()
|
||||
data.update(TimeUnit.MILLISECONDS.convert(end - start, TimeUnit.NANOSECONDS))
|
||||
result
|
||||
}
|
||||
override def startAsync()(implicit startContext: MetricsContext): Timer.TimerHandle =
|
||||
|
||||
override def startAsync()(implicit startContext: MetricsContext): Timer.TimerHandle = {
|
||||
val start = System.nanoTime()
|
||||
new Timer.TimerHandle {
|
||||
override def stop()(implicit context: MetricsContext): Unit =
|
||||
incrementForContext(startContext.merge(context))
|
||||
}
|
||||
|
||||
private def incrementForContext(context: MetricsContext): Unit =
|
||||
discard {
|
||||
runTimers.updateWith(initialContext.merge(context)) {
|
||||
case None => Some(new AtomicInteger(1))
|
||||
case data @ Some(existing) =>
|
||||
existing.incrementAndGet()
|
||||
data
|
||||
}
|
||||
data.update(
|
||||
TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS)
|
||||
)(startContext.merge(context))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -53,6 +53,9 @@ trait MetricValues {
|
||||
|
||||
def value: Long = meter match {
|
||||
case DropwizardMeter(_, metric) => metric.getCount
|
||||
case meter: InMemoryMeter =>
|
||||
val contextWithValues = meter.markers.view.mapValues(_.get()).toMap
|
||||
singleValueFromContexts(contextWithValues)
|
||||
case other =>
|
||||
throw new IllegalArgumentException(s"Value not supported for $other")
|
||||
}
|
||||
@ -81,6 +84,13 @@ trait MetricValues {
|
||||
throw new IllegalArgumentException(s"Values not supported for $other")
|
||||
}
|
||||
|
||||
def values: Seq[Long] = histogram match {
|
||||
case histogram: InMemoryHistogram =>
|
||||
singleValueFromContexts(histogram.values.toMap)
|
||||
case other =>
|
||||
throw new IllegalArgumentException(s"Values not supported for $other")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class TimerValues(timer: Timer) {
|
||||
@ -93,17 +103,33 @@ trait MetricValues {
|
||||
|
||||
def getCount: Long = timer match {
|
||||
case DropwizardTimer(_, metric) => metric.getCount
|
||||
case timer: InMemoryTimer =>
|
||||
singleValueFromContexts(timer.data.values.toMap.view.mapValues(_.size.toLong).toMap)
|
||||
case other =>
|
||||
throw new IllegalArgumentException(s"Count not supported for $other")
|
||||
}
|
||||
|
||||
def getCounts: Map[MetricsContext, Long] = timer match {
|
||||
case timer: InMemoryTimer =>
|
||||
timer.runTimers.toMap.view.mapValues(_.get().toLong).toMap
|
||||
timer.data.values.toMap.view.mapValues(_.size.toLong).toMap
|
||||
case other =>
|
||||
throw new IllegalArgumentException(s"Counts not supported for $other")
|
||||
}
|
||||
|
||||
def getValues: Seq[Long] = timer match {
|
||||
case timer: InMemoryTimer =>
|
||||
singleValueFromContexts(timer.data.values.toMap)
|
||||
case other =>
|
||||
throw new IllegalArgumentException(s"Count not supported for $other")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private def singleValueFromContexts[T](
|
||||
contextToValueMapping: Map[MetricsContext, T]
|
||||
) = if (contextToValueMapping.size == 1)
|
||||
contextToValueMapping.head._2
|
||||
else
|
||||
throw new IllegalArgumentException("Cannot get value with multi context metrics.")
|
||||
|
||||
}
|
||||
|
@ -44,12 +44,6 @@ da_scala_library(
|
||||
],
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//ledger/metrics",
|
||||
"@maven//:io_opentelemetry_opentelemetry_api",
|
||||
"@maven//:io_opentelemetry_opentelemetry_sdk_common",
|
||||
"@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
|
||||
"@maven//:io_opentelemetry_opentelemetry_sdk_metrics_testing",
|
||||
"@maven//:io_opentelemetry_opentelemetry_sdk_testing",
|
||||
],
|
||||
)
|
||||
|
||||
@ -75,6 +69,7 @@ da_scala_test_suite(
|
||||
"//ledger-api/rs-grpc-bridge",
|
||||
"//ledger-api/testing-utils",
|
||||
"//ledger/metrics",
|
||||
"//ledger/metrics:metrics-test-lib",
|
||||
"@maven//:com_google_guava_guava",
|
||||
"@maven//:com_typesafe_config",
|
||||
"@maven//:io_opentelemetry_opentelemetry_api",
|
||||
|
@ -5,13 +5,13 @@ package com.daml.metrics.akkahttp
|
||||
|
||||
import akka.util.ByteString
|
||||
import akka.http.scaladsl.model.{
|
||||
ContentTypes,
|
||||
HttpEntity,
|
||||
HttpRequest,
|
||||
HttpResponse,
|
||||
HttpEntity,
|
||||
RequestEntity,
|
||||
ResponseEntity,
|
||||
StatusCodes,
|
||||
ContentTypes,
|
||||
}
|
||||
import akka.http.scaladsl.server.Route
|
||||
import akka.http.scaladsl.server.Directives
|
||||
@ -22,12 +22,18 @@ import com.daml.metrics.akkahttp.AkkaUtils._
|
||||
import com.daml.metrics.api.MetricsContext
|
||||
import com.daml.metrics.api.MetricName
|
||||
import com.daml.metrics.api.MetricHandle.{Meter, Timer}
|
||||
import com.daml.metrics.api.testing.{InMemoryMetricsFactory, MetricValues}
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AnyWordSpec
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class AkkaHttpMetricsSpec extends AnyWordSpec with Matchers with ScalatestRouteTest {
|
||||
class AkkaHttpMetricsSpec
|
||||
extends AnyWordSpec
|
||||
with Matchers
|
||||
with ScalatestRouteTest
|
||||
with MetricValues {
|
||||
|
||||
import AkkaHttpMetricsSpec._
|
||||
|
||||
@ -44,7 +50,7 @@ class AkkaHttpMetricsSpec extends AnyWordSpec with Matchers with ScalatestRouteT
|
||||
|
||||
// The route used for testing
|
||||
// extractStrictEntity is used to force reading the request entity
|
||||
val testRoute = concat(
|
||||
private val testRoute = concat(
|
||||
pathSingleSlash {
|
||||
Directives.extractStrictEntity(2.seconds) { _ =>
|
||||
Directives.complete("root")
|
||||
@ -529,9 +535,9 @@ class AkkaHttpMetricsSpec extends AnyWordSpec with Matchers with ScalatestRouteT
|
||||
"record duration of any request" in {
|
||||
withRouteAndMetrics { (route, metrics) =>
|
||||
Get("/") ~> route ~> check {
|
||||
val value = metrics.httpLatencyValue
|
||||
value.count should be(1L)
|
||||
value.sum should be >= 0L
|
||||
val value = metrics.httpLatency
|
||||
value.getCount should be(1L)
|
||||
value.getValues.sum should be >= 0L
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -539,9 +545,9 @@ class AkkaHttpMetricsSpec extends AnyWordSpec with Matchers with ScalatestRouteT
|
||||
"record meaningful duration for a request" in {
|
||||
withRouteAndMetrics { (route, metrics) =>
|
||||
Get("/delay/300") ~> route ~> check {
|
||||
val value = metrics.httpLatencyValue
|
||||
value.count should be(1L)
|
||||
value.sum should be >= 300L
|
||||
val value = metrics.httpLatency
|
||||
value.getCount should be(1L)
|
||||
value.getValues.sum should be >= 300L
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -550,9 +556,9 @@ class AkkaHttpMetricsSpec extends AnyWordSpec with Matchers with ScalatestRouteT
|
||||
withRouteAndMetrics { (route, metrics) =>
|
||||
Get("/delay/300") ~> route
|
||||
Get("/delay/600") ~> route ~> check {
|
||||
val value = metrics.httpLatencyValue
|
||||
value.count should be(2L)
|
||||
value.sum should be >= 900L
|
||||
val value = metrics.httpLatency
|
||||
value.getCount should be(2L)
|
||||
value.getValues.sum should be >= 900L
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -574,8 +580,9 @@ class AkkaHttpMetricsSpec extends AnyWordSpec with Matchers with ScalatestRouteT
|
||||
}
|
||||
}
|
||||
|
||||
object AkkaHttpMetricsSpec {
|
||||
object AkkaHttpMetricsSpec extends MetricValues {
|
||||
|
||||
private val metricsFactory = InMemoryMetricsFactory
|
||||
// The metrics being tested
|
||||
case class TestMetrics(
|
||||
httpRequestsTotal: Meter,
|
||||
@ -585,34 +592,23 @@ object AkkaHttpMetricsSpec {
|
||||
httpResponsesBytesTotal: Meter,
|
||||
) {
|
||||
|
||||
import TestMetrics._
|
||||
|
||||
def httpRequestsTotalValue: Long = getCurrentValue(httpRequestsTotal)
|
||||
def httpErrorsTotalValue: Long = getCurrentValue(httpErrorsTotal)
|
||||
def httpLatencyValue: HistogramData = getHistogramValues(httpLatency)
|
||||
def httpRequestsBytesTotalValue: Long = getCurrentValue(httpRequestsBytesTotal)
|
||||
def httpResponsesBytesTotalValue: Long = getCurrentValue(httpResponsesBytesTotal)
|
||||
def httpRequestsTotalValue: Long = httpRequestsTotal.value
|
||||
def httpErrorsTotalValue: Long = httpErrorsTotal.value
|
||||
def httpRequestsBytesTotalValue: Long = httpRequestsBytesTotal.value
|
||||
def httpResponsesBytesTotalValue: Long = httpResponsesBytesTotal.value
|
||||
|
||||
}
|
||||
|
||||
object TestMetrics extends OpenTelemetryTestMetrics {
|
||||
object TestMetrics {
|
||||
|
||||
// Creates a new set of metrics, for one test
|
||||
def apply(): TestMetrics = {
|
||||
val testNumber = testNumbers.getAndIncrement()
|
||||
val baseName = MetricName(s"test-$testNumber")
|
||||
|
||||
val httpRequestsTotalName = baseName :+ "requests_total"
|
||||
val httpErrorsTotalName = baseName :+ "errors_total"
|
||||
val httpLatencyName = baseName :+ "requests_duration_seconds"
|
||||
val httpRequestsBytesTotalName = baseName :+ "requests_bytes_total"
|
||||
val httpResponsesBytesTotalName = baseName :+ "responses_bytes_total"
|
||||
|
||||
val httpRequestsTotal = metricFactory.meter(httpRequestsTotalName)
|
||||
val httpErrorsTotal = metricFactory.meter(httpErrorsTotalName)
|
||||
val httpLatency = metricFactory.timer(httpLatencyName)
|
||||
val httpRequestsBytesTotal = metricFactory.meter(httpRequestsBytesTotalName)
|
||||
val httpResponsesBytesTotal = metricFactory.meter(httpResponsesBytesTotalName)
|
||||
val baseName = MetricName("test")
|
||||
val httpRequestsTotal = metricsFactory.meter(baseName)
|
||||
val httpErrorsTotal = metricsFactory.meter(baseName)
|
||||
val httpLatency = metricsFactory.timer(baseName)
|
||||
val httpRequestsBytesTotal = metricsFactory.meter(baseName)
|
||||
val httpResponsesBytesTotal = metricsFactory.meter(baseName)
|
||||
|
||||
TestMetrics(
|
||||
httpRequestsTotal,
|
||||
|
@ -1,58 +0,0 @@
|
||||
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package com.daml.metrics.akkahttp
|
||||
|
||||
import com.daml.metrics.api.MetricHandle
|
||||
import com.daml.metrics.api.opentelemetry.OpenTelemetryFactory
|
||||
import io.opentelemetry.api.metrics.{Meter => OtelMeter}
|
||||
import io.opentelemetry.sdk.metrics.SdkMeterProvider
|
||||
import io.opentelemetry.sdk.metrics.data.MetricData
|
||||
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader
|
||||
import io.opentelemetry.sdk.testing.time.TestClock
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
/** Base class to manage metrics in a test.
|
||||
* Use the metric factory to create the metrics.
|
||||
* Use the getCurrentValue and getHistogramValues to extract the current data from a metric.
|
||||
* @see WebSocketMetricsSpec.TestMetrics
|
||||
*/
|
||||
abstract class OpenTelemetryTestMetrics {
|
||||
final val SecondNanos = 1_000_000_000L;
|
||||
|
||||
final val testNumbers = new AtomicInteger()
|
||||
|
||||
val metricReader: InMemoryMetricReader = InMemoryMetricReader.create();
|
||||
val testClock: TestClock = TestClock.create();
|
||||
val sdkMeterProvider: SdkMeterProvider =
|
||||
SdkMeterProvider.builder().setClock(testClock).registerMetricReader(metricReader).build();
|
||||
val metricFactory = new OpenTelemetryFactory {
|
||||
override val otelMeter: OtelMeter = sdkMeterProvider.get("test")
|
||||
}
|
||||
|
||||
private def metricData(metric: MetricHandle): MetricData = {
|
||||
// required to force the in memory reader to report the recent updates
|
||||
testClock.advance(Duration.ofNanos(SecondNanos))
|
||||
import scala.jdk.CollectionConverters._
|
||||
metricReader.collectAllMetrics.asScala.filter(_.getName == metric.name).head
|
||||
}
|
||||
|
||||
def getCurrentValue(metric: MetricHandle): Long = {
|
||||
import scala.jdk.CollectionConverters._
|
||||
metricData(metric).getLongSumData().getPoints().asScala.headOption.map(_.getValue).getOrElse(0L)
|
||||
}
|
||||
|
||||
def getHistogramValues(metric: MetricHandle): HistogramData = {
|
||||
import scala.jdk.CollectionConverters._
|
||||
val data = metricData(metric)
|
||||
val point = data.getHistogramData().getPoints().asScala.head
|
||||
|
||||
HistogramData(
|
||||
point.getCount,
|
||||
point.getSum.toLong,
|
||||
point.getCounts.asScala.map(_.longValue).toList,
|
||||
)
|
||||
}
|
||||
|
||||
}
|
@ -3,21 +3,23 @@
|
||||
|
||||
package com.daml.metrics.akkahttp
|
||||
|
||||
import akka.stream.scaladsl.{Source, Flow, Sink}
|
||||
import akka.http.scaladsl.model.ws.{Message, TextMessage, BinaryMessage}
|
||||
import akka.http.scaladsl.model.ws.{BinaryMessage, Message, TextMessage}
|
||||
import akka.stream.scaladsl.{Flow, Sink, Source}
|
||||
import akka.util.ByteString
|
||||
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
|
||||
import com.daml.metrics.api.{MetricsContext, MetricName}
|
||||
import com.daml.metrics.api.MetricHandle.Meter
|
||||
import com.daml.metrics.akkahttp.AkkaUtils._
|
||||
import com.daml.metrics.api.MetricHandle.Meter
|
||||
import com.daml.metrics.api.testing.{InMemoryMetricsFactory, MetricValues}
|
||||
import com.daml.metrics.api.{MetricName, MetricsContext}
|
||||
import org.scalatest.Assertion
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AsyncWordSpec
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
class AkkaHttpMetricsSpec extends AsyncWordSpec with AkkaBeforeAndAfterAll with Matchers {
|
||||
class WebSocketMetricsSpec extends AsyncWordSpec with AkkaBeforeAndAfterAll with Matchers {
|
||||
|
||||
import AkkaHttpMetricsSpec._
|
||||
import WebSocketMetricsSpec._
|
||||
|
||||
// test data
|
||||
private val strictTextMessageData = "test01"
|
||||
@ -240,21 +242,21 @@ class AkkaHttpMetricsSpec extends AsyncWordSpec with AkkaBeforeAndAfterAll with
|
||||
}
|
||||
}
|
||||
|
||||
private val getMessagesReceivedTotalValue: TestMetrics => Long = (metrics: TestMetrics) =>
|
||||
private def getMessagesReceivedTotalValue(metrics: TestMetrics): Long =
|
||||
metrics.messagesReceivedTotalValue
|
||||
|
||||
private val getMessagesSentTotalValue: TestMetrics => Long = (metrics: TestMetrics) =>
|
||||
private def getMessagesSentTotalValue(metrics: TestMetrics): Long =
|
||||
metrics.messagesSentTotalValue
|
||||
|
||||
private val getMessagesReceivedBytesTotalValue: TestMetrics => Long =
|
||||
(metrics: TestMetrics) => metrics.messagesReceivedBytesTotalValue
|
||||
private def getMessagesReceivedBytesTotalValue(metrics: TestMetrics): Long =
|
||||
metrics.messagesReceivedBytesTotalValue
|
||||
|
||||
private val getMessagesSentBytesTotalValue: TestMetrics => Long = (metrics: TestMetrics) =>
|
||||
private def getMessagesSentBytesTotalValue(metrics: TestMetrics): Long =
|
||||
metrics.messagesSentBytesTotalValue
|
||||
|
||||
private def checkCountThroughFlow(
|
||||
messages: List[Message],
|
||||
metricExtractor: (TestMetrics) => Long,
|
||||
metricExtractor: TestMetrics => Long,
|
||||
expected: Long,
|
||||
): Future[Assertion] = {
|
||||
withDuplicatingFlowAndMetrics { (flow, metrics) =>
|
||||
@ -321,8 +323,9 @@ class AkkaHttpMetricsSpec extends AsyncWordSpec with AkkaBeforeAndAfterAll with
|
||||
|
||||
}
|
||||
|
||||
object AkkaHttpMetricsSpec {
|
||||
object WebSocketMetricsSpec extends MetricValues {
|
||||
|
||||
private val metricsFactory = InMemoryMetricsFactory
|
||||
// The metrics being tested
|
||||
case class TestMetrics(
|
||||
messagesReceivedTotal: Meter,
|
||||
@ -331,31 +334,23 @@ object AkkaHttpMetricsSpec {
|
||||
messagesSentBytesTotal: Meter,
|
||||
) {
|
||||
|
||||
import TestMetrics._
|
||||
|
||||
def messagesReceivedTotalValue: Long = getCurrentValue(messagesReceivedTotal)
|
||||
def messagesReceivedBytesTotalValue: Long = getCurrentValue(messagesReceivedBytesTotal)
|
||||
def messagesSentTotalValue: Long = getCurrentValue(messagesSentTotal)
|
||||
def messagesSentBytesTotalValue: Long = getCurrentValue(messagesSentBytesTotal)
|
||||
def messagesReceivedTotalValue: Long = messagesReceivedTotal.value
|
||||
def messagesReceivedBytesTotalValue: Long = messagesReceivedBytesTotal.value
|
||||
def messagesSentTotalValue: Long = messagesSentTotal.value
|
||||
def messagesSentBytesTotalValue: Long = messagesSentBytesTotal.value
|
||||
|
||||
}
|
||||
|
||||
object TestMetrics extends OpenTelemetryTestMetrics {
|
||||
object TestMetrics {
|
||||
|
||||
// Creates a new set of metrics, for one test
|
||||
def apply(): TestMetrics = {
|
||||
val testNumber = testNumbers.getAndIncrement()
|
||||
val baseName = MetricName(s"test-$testNumber")
|
||||
val baseName = MetricName("test")
|
||||
|
||||
val receivedTotalName = baseName :+ "messages_received_total"
|
||||
val receivedBytesTotalName = baseName :+ "messages_received_bytes_total"
|
||||
val sentTotalName = baseName :+ "messages_sent_total"
|
||||
val sentBytesTotalName = baseName :+ "messages_sent_bytes_total"
|
||||
|
||||
val receivedTotal = metricFactory.meter(receivedTotalName)
|
||||
val receivedBytesTotal = metricFactory.meter(receivedBytesTotalName)
|
||||
val sentTotal = metricFactory.meter(sentTotalName)
|
||||
val sentBytesTotal = metricFactory.meter(sentBytesTotalName)
|
||||
val receivedTotal = metricsFactory.meter(baseName)
|
||||
val receivedBytesTotal = metricsFactory.meter(baseName)
|
||||
val sentTotal = metricsFactory.meter(baseName)
|
||||
val sentBytesTotal = metricsFactory.meter(baseName)
|
||||
|
||||
TestMetrics(receivedTotal, receivedBytesTotal, sentTotal, sentBytesTotal)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user